| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | 7 |
| 8 | 9 | 10 | 11 | 12 | 13 | 14 |
| 15 | 16 | 17 | 18 | 19 | 20 | 21 |
| 22 | 23 | 24 | 25 | 26 | 27 | 28 |
Tags
- java crawler
- 양자컴퓨터
- vavr
- java
- file download
- Aggregation
- dbeaver
- aqqle
- JPA
- IONQ
- api cache
- Analyzer
- mysql
- Elastic
- Cache
- request cache
- NORI
- redis
- API
- KNN
- TSLA
- elasticsearch cache
- aggs
- Selenium
- Query
- 아이온큐
- Elasticsearch
- 테슬라
- Docker
- ann
Archives
- Today
- Total
아빠는 개발자
[ES] 추천 시스템 본문
728x90
반응형
ES 8.x + Spring Boot(Java) + Python 임베딩 배치 + Jenkins + Redis
(내용: 인덱스 설계 → 임베딩 생성/색인 배치 → Java 서비스 쿼리(유사/공동선호/하이브리드) → 재정렬 로직 → 캐싱 → Jenkins 배포 파이프라인 → A/B 테스트 체크리스트)
1) 인덱스 설계
1-1. 아이템 인덱스 (item-index)
- 핵심: dense_vector(임베딩), rank_feature(인기도), 기본 속성, 매장/가용성 필터
- ES 8.x는 index:true, similarity:cosine로 kNN 사용. ES 7.x는 스크립트 점수(또는 opensearch/ES k-NN 플러그인)로 처리.
PUT item-index
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "30s" # 배치 색인량 많으면 일시적으로 -1→마무리 후 1s/30s
},
"mappings": {
"properties": {
"itemId": { "type": "keyword" },
"siteType": { "type": "keyword" }, # e.g., ONLINE/STORE
"storeId": { "type": "keyword" }, # 매장별 가용성 제어
"title": { "type": "text", "analyzer": "nori" },
"category": { "type": "keyword" },
"brand": { "type": "keyword" },
"price": { "type": "scaled_float", "scaling_factor": 100 },
"available": { "type": "boolean" },
"popScore": { "type": "rank_feature" }, # 클릭/구매 기반 인기도
"embedding": {
"type": "dense_vector",
"dims": 768,
"index": false # ES 7.x: false (script_score), ES 8.x: true + "similarity": "cosine"
},
"updatedAt": { "type": "date" }
}
}
}
Kibana DevTools

1-2. 세션/주문 로그 인덱스 (session-index)
PUT session-index
{
"settings": { "number_of_shards": 6, "number_of_replicas": 1 },
"mappings": {
"properties": {
"sessionId": { "type": "keyword" },
"userId": { "type": "keyword" },
"siteType": { "type": "keyword" },
"storeId": { "type": "keyword" },
"ts": { "type": "date" },
"itemIds": { "type": "keyword" } # ["A","B","C"]
}
}
}
1-3. 사전 계산된 유사 이웃 인덱스 (item-sim-index)
- 대규모 트래픽/저지연을 위해 오프라인에서 유사 이웃을 전개해 저장.
PUT item-sim-index
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"itemId": {
"type": "keyword"
},
"siteType": {
"type": "keyword"
},
"storeId": {
"type": "keyword"
},
"simItems": {
"type": "nested",
"properties": {
"to": {
"type": "keyword"
},
"score": {
"type": "half_float"
}
}
},
"updatedAt": {
"type": "date"
}
}
}
}
2) 임베딩 생성 & 색인 배치 (Python)
- 입력: items.csv(itemId, title, category, brand, price, siteType, storeId, available)
- 출력: item-index.ndjson(bulk 색인 파일)
# pip install -U sentence-transformers pandas
import pandas as pd
from sentence_transformers import SentenceTransformer
from datetime import datetime
import json
MODEL = "snunlp/KR-SBERT-V40K-klueNLI" # 또는 upskyy/bge-m3-korean
model = SentenceTransformer(MODEL)
df = pd.read_csv("./data/items.csv") # cols: itemId,title,category,brand,price,siteType,storeId,available
text = (df["title"].fillna("") + " [CAT] " + df["category"].fillna("") + " [BRAND] " + df["brand"].fillna("")).tolist()
emb = model.encode(text, normalize_embeddings=True).tolist()
with open("./bulk/item-index.ndjson", "w", encoding="utf-8") as out:
for row, vec in zip(df.to_dict("records"), emb):
out.write(json.dumps({ "index": { "_index": "item-index", "_id": row["itemId"] }}, ensure_ascii=False) + "\n")
doc = {
"itemId": row["itemId"],
"siteType": row.get("siteType", "ONLINE"),
"storeId": row.get("storeId", "000"),
"title": row["title"],
"category": row["category"],
"brand": row["brand"],
"price": float(row["price"]),
"available": bool(row["available"]),
"popScore": 1.0, # 초기값(배치로 업데이트)
"embedding": vec,
"updatedAt": datetime.utcnow().isoformat()
}
out.write(json.dumps(doc, ensure_ascii=False) + "\n")
# 대량이면 refresh_interval=-1 → 색인 후 1s/30s로 되돌리기 권장
curl -s -H "Content-Type: application/x-ndjson" -u elastic:password \
-XPOST "http://localhost:9200/_bulk?pipeline=_none" --data-binary @./bulk/item-index.ndjson
3) Java(Spring Boot) 검색/추천 쿼리
Elasticsearch 7.x High Level REST Client 또는 Elasticsearch Java API Client(8.x) 중 환경에 맞게 사용. 아래는 7.x 스타일 예시(가독성을 위해 핵심만):
3-1. 공통 유틸 – 코사인 스크립트 (ES 8.x)
- 임베딩은 정규화하여 색인했으니 내적 = 코사인.
// cosine = dotProduct for normalized vectors
private static final String COSINE_SCRIPT =
"double score = 0.0; " +
"for (int i=0; i<params.q.length; ++i) { score += params.q[i] * doc['embedding'][i]; } " +
"return score;"; // [-1,1]
3-2. “이 상품과 비슷한 상품” (콘텐츠 기반)
- 입력: itemId, (siteType, storeId) 필터
- 절차: 1) 해당 item의 embedding 조회 → 2) script_score로 Top K
public List<ItemDoc> similarItems(String itemId, String siteType, String storeId, int k) {
// 1) 쿼리 아이템 벡터 조회 (mget or get)
float[] qv = fetchEmbedding(itemId);
Script script = new Script(ScriptType.INLINE, "painless", COSINE_SCRIPT,
Collections.singletonMap("q", qv));
QueryBuilder filter = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("available", true))
.filter(QueryBuilders.termQuery("siteType", siteType))
.filter(QueryBuilders.termQuery("storeId", storeId))
.mustNot(QueryBuilders.termQuery("itemId", itemId));
ScriptScoreQueryBuilder ssq = QueryBuilders.scriptScoreQuery(filter, script);
SearchSourceBuilder ssb = new SearchSourceBuilder()
.query(ssq)
.size(k)
.fetchSource(new String[]{"itemId","title","category","price","popScore"}, null);
SearchRequest req = new SearchRequest("item-index").source(ssb);
SearchResponse resp = client.search(req, RequestOptions.DEFAULT);
return map(resp.getHits());
}
3-3. “이걸 본/산 사람이 함께 본/산” (공동선호)
- significant_terms로 동반 등장 아이템 후보를 추출하고, item-index에서 메타로 조인.
public List<String> coViewCandidates(String itemId, String siteType, String storeId, int size) {
SearchSourceBuilder ssb = new SearchSourceBuilder()
.query(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("itemIds", itemId))
.filter(QueryBuilders.termQuery("siteType", siteType))
.filter(QueryBuilders.termQuery("storeId", storeId)))
.size(0);
SignificantTermsAggregationBuilder sig = AggregationBuilders.significantTerms("co_items")
.field("itemIds")
.size(size)
.exclude(itemId);
ssb.aggregation(sig);
SearchRequest req = new SearchRequest("session-index").source(ssb);
SearchResponse resp = client.search(req, RequestOptions.DEFAULT);
ParsedSignificantTerms terms = resp.getAggregations().get("co_items");
return terms.getBuckets().stream().map(b -> b.getKeyAsString()).collect(Collectors.toList());
}
3-4. 하이브리드 재정렬 (script_score)
- 후보(콘텐츠/공동선호)를 terms로 제한하고, popScore/price 가중 혼합.
public List<ItemDoc> rerankHybrid(List<String> candidates, String siteType, String storeId, int size) {
Map<String,Object> params = new HashMap<>();
params.put("alphaText", 0.0); // 텍스트 점수 사용 안하면 0
params.put("alphaPop", 0.6);
params.put("alphaPrice", 0.4);
params.put("minP", 2000.0); // 2만원
params.put("maxP", 5000.0); // 5만원
String script =
"double pop = doc['popScore'].size()!=0 ? saturation(doc['popScore'], 3.0) : 0.0;" +
"double p = doc['price'].size()!=0 ? doc['price'].value : 0.0;" +
"double pf = (p>=params.minP && p<=params.maxP) ? 1.0 : 0.6;" +
"return params.alphaPop*pop + params.alphaPrice*pf;";
BoolQueryBuilder base = QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery("itemId", candidates))
.filter(QueryBuilders.termQuery("available", true))
.filter(QueryBuilders.termQuery("siteType", siteType))
.filter(QueryBuilders.termQuery("storeId", storeId));
ScriptScoreQueryBuilder q = QueryBuilders.scriptScoreQuery(base,
new Script(ScriptType.INLINE, "painless", script, params));
SearchSourceBuilder ssb = new SearchSourceBuilder()
.query(q)
.size(size)
.fetchSource(new String[]{"itemId","title","category","price","popScore"}, null);
SearchRequest req = new SearchRequest("item-index").source(ssb);
SearchResponse resp = client.search(req, RequestOptions.DEFAULT);
return map(resp.getHits());
}
3-5. 엔드투엔드 추천 API 예시
- 전략: 콘텐츠 유사 Top300 + 공동선호 Top200 → 합치고 중복 제거 → 재정렬 Top N
public List<ItemDoc> recommend(String itemId, String siteType, String storeId, int topN) {
List<ItemDoc> sim = similarItems(itemId, siteType, storeId, 300);
List<String> co = coViewCandidates(itemId, siteType, storeId, 200);
LinkedHashSet<String> candidates = new LinkedHashSet<>();
sim.forEach(d -> candidates.add(d.getItemId()));
candidates.addAll(co);
List<ItemDoc> reranked = rerankHybrid(new ArrayList<>(candidates), siteType, storeId, topN);
return reranked;
}
4) Redis 캐시 전략
- 키: rec:{siteType}:{storeId}:{itemId}:v1
- TTL: 5~15분 (카테고리/트래픽에 따라)
- 재고/가격 변동이 잦은 카테고리면 TTL 짧게. 품절 변경 시 키 삭제.
String key = String.format("rec:%s:%s:%s:v1", siteType, storeId, itemId);
List<ItemDoc> cached = redis.get(key);
if (cached != null) return cached;
List<ItemDoc> result = recommend(itemId, siteType, storeId, 40);
redis.setex(key, Duration.ofMinutes(10), result);
return result;
5) Jenkins 파이프라인 (요약 스켈레톤)
pipeline {
agent any
environment {
VENV = "${WORKSPACE}/.venv"
}
stages {
stage('Checkout') {
steps { checkout scm }
}
stage('Build Embeddings') {
steps {
sh """
python3 -m venv ${VENV}
. ${VENV}/bin/activate
pip install -U sentence-transformers pandas
python batch/build_embeddings.py # 위 Python 스크립트
"""
}
}
stage('Bulk Index to ES') {
steps {
sh """
curl -s -H 'Content-Type: application/x-ndjson' -u ${ES_USER}:${ES_PASS} \\
-XPOST '${ES_URL}/_bulk' --data-binary @bulk/item-index.ndjson
"""
}
}
stage('Warmup Cache (optional)') {
steps {
sh "python batch/warmup_reco.py" // 인기상품 몇 개로 캐시 예열
}
}
stage('Deploy API') {
steps {
// ECS/EKS 배포 스크립트(이미 사용 중인 방식으로)
}
}
}
}
6) 오프라인 “아이템→아이템” 전개 배치 (선택)
- 공동선호(세션 로그)와 콘텐츠(임베딩 코사인)를 가중 합산해서 K개 이웃 산출 → item-sim-index에 색인.
# pseudo
# co_occ[a,b] = PMI or lift or normalized co-count
# cos[a,b] = cosine(embedding[a], embedding[b])
# score = w1*cos + w2*co_occ
# 상위 K개만 저장
{ "index": { "_index": "item-sim-index", "_id": "A|ONLINE|001" } }
{
"itemId": "A",
"siteType": "ONLINE",
"storeId": "001",
"simItems": [
{ "to": "B", "score": 0.82 },
{ "to": "C", "score": 0.76 }
],
"updatedAt": "2025-09-09T13:00:00Z"
}
실시간 조회는 GET 한 번:
GET item-sim-index/_doc/A|ONLINE|001
7) 다양성 & 비즈니스 룰
- 다양성: 상위 카테고리별 상한(N개), 브랜드 중복 제한, collapse 활용
- 품절/가격/마진/딜: bool.filter로 강제 필터, script_score로 가중 조정
- 매장/배송가용성: siteType/storeId/available 고정 필터
8) 품질 측정 & A/B
- 오프라인: Hit@K / MRR@K / NDCG@K (최근 7~30일 클릭/구매 로그)
- 온라인: CTR, ATC, 전환율, 매출/세션, 다양성 지표(카테고리 분포)
- 실험설계: 유사도 가중(w1,w2), popScore 스케일, 가격 적합 범위 등 파라미터 튜닝
- 관측성: 추천 사유 태깅(“비슷한 상품”, “함께 본 상품”), 노출→클릭→재학습 루프
바로 적용 포인트
- ES 7.x라면: dense_vector.index=false + script_score(위 코드처럼) 사용.
- ES 8.x 가능하면: dense_vector.index=true, similarity:cosine + _knn_search로 후보 추출 → rescore/script_score로 블렌딩.
- **현업 제약(매장/재고/딜)**은 filter로 강제하고, 스코어엔 인기도/가격/마진 등 소프트 룰만 반영.
728x90
반응형