아빠는 개발자

[ES] 추천 시스템 본문

카테고리 없음

[ES] 추천 시스템

father6019 2025. 9. 9. 22:55
728x90
반응형

ES 8.x + Spring Boot(Java) + Python 임베딩 배치 + Jenkins + Redis

(내용: 인덱스 설계 → 임베딩 생성/색인 배치 → Java 서비스 쿼리(유사/공동선호/하이브리드) → 재정렬 로직 → 캐싱 → Jenkins 배포 파이프라인 → A/B 테스트 체크리스트)

 

1) 인덱스 설계

1-1. 아이템 인덱스 (item-index)

  • 핵심: dense_vector(임베딩), rank_feature(인기도), 기본 속성, 매장/가용성 필터
  • ES 8.x는 index:true, similarity:cosine로 kNN 사용. ES 7.x는 스크립트 점수(또는 opensearch/ES k-NN 플러그인)로 처리.
PUT item-index
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1,
    "refresh_interval": "30s"      # 배치 색인량 많으면 일시적으로 -1→마무리 후 1s/30s
  },
  "mappings": {
    "properties": {
      "itemId":     { "type": "keyword" },
      "siteType":   { "type": "keyword" },          # e.g., ONLINE/STORE
      "storeId":    { "type": "keyword" },          # 매장별 가용성 제어
      "title":      { "type": "text", "analyzer": "nori" },
      "category":   { "type": "keyword" },
      "brand":      { "type": "keyword" },
      "price":      { "type": "scaled_float", "scaling_factor": 100 },
      "available":  { "type": "boolean" },
      "popScore":   { "type": "rank_feature" },     # 클릭/구매 기반 인기도
      "embedding":  {
        "type": "dense_vector",
        "dims": 768,
        "index": false                               # ES 7.x: false (script_score), ES 8.x: true + "similarity": "cosine"
      },
      "updatedAt":  { "type": "date" }
    }
  }
}

Kibana DevTools

1-2. 세션/주문 로그 인덱스 (session-index)

PUT session-index
{
  "settings": { "number_of_shards": 6, "number_of_replicas": 1 },
  "mappings": {
    "properties": {
      "sessionId": { "type": "keyword" },
      "userId":    { "type": "keyword" },
      "siteType":  { "type": "keyword" },
      "storeId":   { "type": "keyword" },
      "ts":        { "type": "date" },
      "itemIds":   { "type": "keyword" }   # ["A","B","C"]
    }
  }
}

 

1-3. 사전 계산된 유사 이웃 인덱스 (item-sim-index)

  • 대규모 트래픽/저지연을 위해 오프라인에서 유사 이웃을 전개해 저장.
PUT item-sim-index
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "itemId": {
        "type": "keyword"
      },
      "siteType": {
        "type": "keyword"
      },
      "storeId": {
        "type": "keyword"
      },
      "simItems": {
        "type": "nested",
        "properties": {
          "to": {
            "type": "keyword"
          },
          "score": {
            "type": "half_float"
          }
        }
      },
      "updatedAt": {
        "type": "date"
      }
    }
  }
}

 

2) 임베딩 생성 & 색인 배치 (Python)

  • 입력: items.csv(itemId, title, category, brand, price, siteType, storeId, available)
  • 출력: item-index.ndjson(bulk 색인 파일)
# pip install -U sentence-transformers pandas
import pandas as pd
from sentence_transformers import SentenceTransformer
from datetime import datetime
import json

MODEL = "snunlp/KR-SBERT-V40K-klueNLI"  # 또는 upskyy/bge-m3-korean
model = SentenceTransformer(MODEL)

df = pd.read_csv("./data/items.csv")  # cols: itemId,title,category,brand,price,siteType,storeId,available
text = (df["title"].fillna("") + " [CAT] " + df["category"].fillna("") + " [BRAND] " + df["brand"].fillna("")).tolist()
emb = model.encode(text, normalize_embeddings=True).tolist()

with open("./bulk/item-index.ndjson", "w", encoding="utf-8") as out:
    for row, vec in zip(df.to_dict("records"), emb):
        out.write(json.dumps({ "index": { "_index": "item-index", "_id": row["itemId"] }}, ensure_ascii=False) + "\n")
        doc = {
            "itemId": row["itemId"],
            "siteType": row.get("siteType", "ONLINE"),
            "storeId": row.get("storeId", "000"),
            "title": row["title"],
            "category": row["category"],
            "brand": row["brand"],
            "price": float(row["price"]),
            "available": bool(row["available"]),
            "popScore": 1.0,                 # 초기값(배치로 업데이트)
            "embedding": vec,
            "updatedAt": datetime.utcnow().isoformat()
        }
        out.write(json.dumps(doc, ensure_ascii=False) + "\n")

 

 

# 대량이면 refresh_interval=-1 → 색인 후 1s/30s로 되돌리기 권장
curl -s -H "Content-Type: application/x-ndjson" -u elastic:password \
  -XPOST "http://localhost:9200/_bulk?pipeline=_none" --data-binary @./bulk/item-index.ndjson

 

3) Java(Spring Boot) 검색/추천 쿼리

Elasticsearch 7.x High Level REST Client 또는 Elasticsearch Java API Client(8.x) 중 환경에 맞게 사용. 아래는 7.x 스타일 예시(가독성을 위해 핵심만):

3-1. 공통 유틸 – 코사인 스크립트 (ES 8.x)

  • 임베딩은 정규화하여 색인했으니 내적 = 코사인.
// cosine = dotProduct for normalized vectors
private static final String COSINE_SCRIPT =
    "double score = 0.0; " +
    "for (int i=0; i<params.q.length; ++i) { score += params.q[i] * doc['embedding'][i]; } " +
    "return score;";  // [-1,1]

 

3-2. “이 상품과 비슷한 상품” (콘텐츠 기반)

  • 입력: itemId, (siteType, storeId) 필터
  • 절차: 1) 해당 item의 embedding 조회 → 2) script_score로 Top K
 
public List<ItemDoc> similarItems(String itemId, String siteType, String storeId, int k) {
    // 1) 쿼리 아이템 벡터 조회 (mget or get)
    float[] qv = fetchEmbedding(itemId);

    Script script = new Script(ScriptType.INLINE, "painless", COSINE_SCRIPT,
        Collections.singletonMap("q", qv));

    QueryBuilder filter = QueryBuilders.boolQuery()
        .filter(QueryBuilders.termQuery("available", true))
        .filter(QueryBuilders.termQuery("siteType", siteType))
        .filter(QueryBuilders.termQuery("storeId", storeId))
        .mustNot(QueryBuilders.termQuery("itemId", itemId));

    ScriptScoreQueryBuilder ssq = QueryBuilders.scriptScoreQuery(filter, script);

    SearchSourceBuilder ssb = new SearchSourceBuilder()
        .query(ssq)
        .size(k)
        .fetchSource(new String[]{"itemId","title","category","price","popScore"}, null);

    SearchRequest req = new SearchRequest("item-index").source(ssb);
    SearchResponse resp = client.search(req, RequestOptions.DEFAULT);

    return map(resp.getHits());
}

 

 

3-3. “이걸 본/산 사람이 함께 본/산” (공동선호)

  • significant_terms로 동반 등장 아이템 후보를 추출하고, item-index에서 메타로 조인.
public List<String> coViewCandidates(String itemId, String siteType, String storeId, int size) {
    SearchSourceBuilder ssb = new SearchSourceBuilder()
        .query(QueryBuilders.boolQuery()
            .filter(QueryBuilders.termQuery("itemIds", itemId))
            .filter(QueryBuilders.termQuery("siteType", siteType))
            .filter(QueryBuilders.termQuery("storeId", storeId)))
        .size(0);

    SignificantTermsAggregationBuilder sig = AggregationBuilders.significantTerms("co_items")
        .field("itemIds")
        .size(size)
        .exclude(itemId);

    ssb.aggregation(sig);
    SearchRequest req = new SearchRequest("session-index").source(ssb);
    SearchResponse resp = client.search(req, RequestOptions.DEFAULT);

    ParsedSignificantTerms terms = resp.getAggregations().get("co_items");
    return terms.getBuckets().stream().map(b -> b.getKeyAsString()).collect(Collectors.toList());
}

 

3-4. 하이브리드 재정렬 (script_score)

  • 후보(콘텐츠/공동선호)를 terms로 제한하고, popScore/price 가중 혼합.
public List<ItemDoc> rerankHybrid(List<String> candidates, String siteType, String storeId, int size) {
    Map<String,Object> params = new HashMap<>();
    params.put("alphaText", 0.0);     // 텍스트 점수 사용 안하면 0
    params.put("alphaPop", 0.6);
    params.put("alphaPrice", 0.4);
    params.put("minP", 2000.0);       // 2만원
    params.put("maxP", 5000.0);       // 5만원

    String script =
        "double pop = doc['popScore'].size()!=0 ? saturation(doc['popScore'], 3.0) : 0.0;" +
        "double p   = doc['price'].size()!=0 ? doc['price'].value : 0.0;" +
        "double pf  = (p>=params.minP && p<=params.maxP) ? 1.0 : 0.6;" +
        "return params.alphaPop*pop + params.alphaPrice*pf;";

    BoolQueryBuilder base = QueryBuilders.boolQuery()
        .filter(QueryBuilders.termsQuery("itemId", candidates))
        .filter(QueryBuilders.termQuery("available", true))
        .filter(QueryBuilders.termQuery("siteType", siteType))
        .filter(QueryBuilders.termQuery("storeId", storeId));

    ScriptScoreQueryBuilder q = QueryBuilders.scriptScoreQuery(base,
        new Script(ScriptType.INLINE, "painless", script, params));

    SearchSourceBuilder ssb = new SearchSourceBuilder()
        .query(q)
        .size(size)
        .fetchSource(new String[]{"itemId","title","category","price","popScore"}, null);

    SearchRequest req = new SearchRequest("item-index").source(ssb);
    SearchResponse resp = client.search(req, RequestOptions.DEFAULT);
    return map(resp.getHits());
}

 

3-5. 엔드투엔드 추천 API 예시

  • 전략: 콘텐츠 유사 Top300 + 공동선호 Top200 → 합치고 중복 제거 → 재정렬 Top N
public List<ItemDoc> recommend(String itemId, String siteType, String storeId, int topN) {
    List<ItemDoc> sim = similarItems(itemId, siteType, storeId, 300);
    List<String> co  = coViewCandidates(itemId, siteType, storeId, 200);

    LinkedHashSet<String> candidates = new LinkedHashSet<>();
    sim.forEach(d -> candidates.add(d.getItemId()));
    candidates.addAll(co);

    List<ItemDoc> reranked = rerankHybrid(new ArrayList<>(candidates), siteType, storeId, topN);
    return reranked;
}

 

4) Redis 캐시 전략

  • 키: rec:{siteType}:{storeId}:{itemId}:v1
  • TTL: 5~15분 (카테고리/트래픽에 따라)
  • 재고/가격 변동이 잦은 카테고리면 TTL 짧게. 품절 변경 시 키 삭제.
String key = String.format("rec:%s:%s:%s:v1", siteType, storeId, itemId);
List<ItemDoc> cached = redis.get(key);
if (cached != null) return cached;

List<ItemDoc> result = recommend(itemId, siteType, storeId, 40);
redis.setex(key, Duration.ofMinutes(10), result);
return result;

 

5) Jenkins 파이프라인 (요약 스켈레톤)

pipeline {
  agent any
  environment {
    VENV = "${WORKSPACE}/.venv"
  }
  stages {
    stage('Checkout') {
      steps { checkout scm }
    }
    stage('Build Embeddings') {
      steps {
        sh """
          python3 -m venv ${VENV}
          . ${VENV}/bin/activate
          pip install -U sentence-transformers pandas
          python batch/build_embeddings.py   # 위 Python 스크립트
        """
      }
    }
    stage('Bulk Index to ES') {
      steps {
        sh """
          curl -s -H 'Content-Type: application/x-ndjson' -u ${ES_USER}:${ES_PASS} \\
            -XPOST '${ES_URL}/_bulk' --data-binary @bulk/item-index.ndjson
        """
      }
    }
    stage('Warmup Cache (optional)') {
      steps {
        sh "python batch/warmup_reco.py"  // 인기상품 몇 개로 캐시 예열
      }
    }
    stage('Deploy API') {
      steps {
        // ECS/EKS 배포 스크립트(이미 사용 중인 방식으로)
      }
    }
  }
}

 

6) 오프라인 “아이템→아이템” 전개 배치 (선택)

  • 공동선호(세션 로그)와 콘텐츠(임베딩 코사인)를 가중 합산해서 K개 이웃 산출 → item-sim-index에 색인.
# pseudo
# co_occ[a,b] = PMI or lift or normalized co-count
# cos[a,b]    = cosine(embedding[a], embedding[b])
# score = w1*cos + w2*co_occ
# 상위 K개만 저장

 

{ "index": { "_index": "item-sim-index", "_id": "A|ONLINE|001" } }
{
  "itemId": "A",
  "siteType": "ONLINE",
  "storeId": "001",
  "simItems": [
    { "to": "B", "score": 0.82 },
    { "to": "C", "score": 0.76 }
  ],
  "updatedAt": "2025-09-09T13:00:00Z"
}

 

실시간 조회는 GET 한 번:

GET item-sim-index/_doc/A|ONLINE|001

 

7) 다양성 & 비즈니스 룰

  • 다양성: 상위 카테고리별 상한(N개), 브랜드 중복 제한, collapse 활용
  • 품절/가격/마진/딜: bool.filter로 강제 필터, script_score로 가중 조정
  • 매장/배송가용성: siteType/storeId/available 고정 필터

8) 품질 측정 & A/B

  • 오프라인: Hit@K / MRR@K / NDCG@K (최근 7~30일 클릭/구매 로그)
  • 온라인: CTR, ATC, 전환율, 매출/세션, 다양성 지표(카테고리 분포)
  • 실험설계: 유사도 가중(w1,w2), popScore 스케일, 가격 적합 범위 등 파라미터 튜닝
  • 관측성: 추천 사유 태깅(“비슷한 상품”, “함께 본 상품”), 노출→클릭→재학습 루프

바로 적용 포인트

  • ES 7.x라면: dense_vector.index=false + script_score(위 코드처럼) 사용.
  • ES 8.x 가능하면: dense_vector.index=true, similarity:cosine + _knn_search로 후보 추출 → rescore/script_score로 블렌딩.
  • **현업 제약(매장/재고/딜)**은 filter로 강제하고, 스코어엔 인기도/가격/마진 등 소프트 룰만 반영.

 

728x90
반응형