RAG
목표
AIOS에서 제공하는 AI모델을 활용해 GIT 로그, PR 설명, 리뷰 코멘트 등을 벡터화하고, 이를 기반으로 RAG 기반의 PR리뷰 보조 챗봇을 구현합니다.
RAG(Retrieval-Augmented Generation, 검색 증강 생성)는 대규모 언어 모델(LLM)이 응답을 생성하기 전에 외부의 신뢰할 수 있는 지식 베이스나 데이터베이스에서 관련 정보를 검색(Retrieval)하고, 그 검색된 정보를 바탕으로 답변을 생성(Generation)하는 자연어 처리 기술입니다. 기존 LLM은 훈련된 데이터에만 의존하기 때문에 최신 정보나 특정 도메인에 특화된 지식을 반영하는 데 한계가 있습니다. RAG는 이 한계를 보완하여, 사용자의 질문에 대해 먼저 관련 문서나 데이터를 벡터 검색 등의 방법으로 찾아내고, 그 정보를 활용해 더 정확하고 맥락에 맞는 답변을 생성합니다.
환경
이 튜토리얼을 진행하려면 아래와 같은 환경이 준비되어 있어야 합니다.
시스템 환경
- Python 3.10 +
- pip
설치 필요 패키지
pip install streamlit
pip install opensearch-pypip install streamlit
pip install opensearch-py사전 준비 사항
- 사용자 지식 베이스나 데이터베이스
- 이 튜토리얼에서는 VM 내부에 OpenSearch를 구성하여 벡터 데이터베이스로 활용하였습니다.
- 사용자의 기존 저장소를 사용하거나, SCP의 Search Engine 상품을 활용 할 수 있습니다.
시스템 아키텍처
GitHub PR 데이터를 수집하여 RAG 기반 QA 시스템을 구성하고, AIOS 모델을 활용해 임베딩 및 응답 생성을 수행하는 전체 흐름을 보여줍니다.
RAG Flow
- Git 저장소에서 PR 데이터를 수집하여 pr_dataset.jsonl 생성
- RAG 입력에 적합하도록 텍스트 정제 → rag_ready.jsonl
- AIOS Embedding 모델을 통해 벡터 생성 후 rag_embedded.jsonl 파일로 저장
- 해당 벡터 파일을 OpenSearch에 업로드하여 검색 가능한 형태로 구성
RAG QA Application Flow
- 사용자의 질의(예: “이 PR을 분석해줘.")를 임베딩하여 검색 질의로 변환
- OpenSearch에서 KNN 검색 또는 AIOS Embedding 모델(score API) 호출을 통해 연관 문서 추출
- 추출된 문서 기반으로 프롬프트를 구성하고 AIOS Chat 모델로 전송
- 응답을 생성하여 최종 결과 출력
구현
- 이 튜토리얼에서는 kubeflow 프로젝트 github 을 활용하였습니다.
- 벡터 데이터베이스 데이터는 일회성으로 구성하였으며, 실제 서비스 시에는 실시간 연동 등으로 커스터마이징하여 사용하실 수 있습니다.
프로젝트 구조
rag-tutorial
├── app.py # streamlit 메인 웹 앱 파일
├── generate_pr_dateset_from_branch.py # 1. Github PR 데이터 수집
├── generate_rag_data_from_pr_dataset.py # 2. RAG 입력용 텍스트 구성 (RAG 입력에 적합하도록 요약하여 텍스트 정제)
├── embed_prs.py # 3. RAG 입력용 텍스트 구성 (AIOS Embedding 모델을 통해 벡터 생성)
└── upload_rag_documnets.py # 4. OpenSearch에 업로드
Github PR 데이터 수집
Git 저장소에서 PR 데이터를 수집하여 pr_dataset.jsonl 생성합니다.
- 아래 코드는 git 디렉토리 내에서 실행합니다.
- 추가 PR 병합 기록이 없거나, PR 병합이 rebase 방식 또는 squash-merge 방식으로 이루어져 정규 merge 커밋이 생성되지 않으면 데이터 수집이 되지 않습니다.
- 데이터 수집 시 각 커밋의 diff 항목은 최대 3000자로 제한하였습니다. 실제 시스템을 구성할 때는 효율적인 검색과 응답 생성을 위해, 내용의 길이나 구조에 따라 적절한 청킹(chunking) 작업이 추가적으로 필요합니다.
$ git branch
* (HEAD detached at v1.9.1)
master
$ python3 generate_pr_dateset_from_branch.py
🔍 Searching for merged PRs...
✅ Generated pr_dataset.jsonl with 43 merged PRs.
$ head -n 1 pr_dataset.jsonl | jq
{
"merge_sha": "167e162ef7dffc033ddc82e55b0a108db27fc340",
"author": "Ricardo Martinelli de Oliveira",
"date": "Tue Mar 5 11:46:36 2024 -0300",
"title": "Merge pull request #7461 from rimolive/kf-1.9",
"pr_id": null,
"commits": [
{
"sha": "68e4d10bbf976bb89810b4e16e8b765a2a0e68b7",
"author": "Ricardo Martinelli de Oliveira",
"message": "Update ROADMAP.md",
"date": "Mon Feb 19 18:51:40 2024 -0300",
"files": [
"ROADMAP.md"
],
"diff": "commit 68e4d10bbf976bb89810b4e16e8b765a2a0e68b7\nAuthor: Ricardo Martinelli de Oliveira <rmartine@redhat.com>\nDate: Mon Feb 19 18:51:40 2024 -0300\n\n Update ROADMAP.md\n \n Co-authored-by: Tommy Li <Tommy.chaoping.li@ibm.com>\n\ndiff --git a/ROADMAP.md b/ROADMAP.md\nindex 35021954..cfd39558 100644\n--- a/ROADMAP.md\n+++ b/ROADMAP.md\n@@ -8,7 +8,7 @@ The Kubeflow Community plans to deliver its v1.9 release in Jul 2024 per this [t\n * CNCF Transition\n * LLM APIs\n * New component: Model Registry\n-* Kubeflow Pipelines and kfp-tekton merged in a single GitHub repository\n+* Kubeflow Pipelines and kfp-tekton V2 merged in a single GitHub repository\n \n ### Detailed features, bug fixes and enhancements are identified in the Working Group Roadmaps and Tracking Issues:\n * [Training Operators](https://github.com/kubeflow/training-operator/issues/1994)"
},
{
"sha": "5c3404782fa2700f8547b37132ff7ab2d1ed99fe",
"author": "Ricardo M. Oliveira",
"message": "Add Kubeflow 1.9 release roadmap",
"date": "Mon Feb 5 14:43:45 2024 -0300",
"files": [
"ROADMAP.md"
],
"diff": "commit 5c3404782fa2700f8547b37132ff7ab2d1ed99fe\nAuthor: Ricardo M. Oliveira <rmartine@redhat.com>\nDate: Mon Feb 5 14:43:45 2024 -0300\n\n Add Kubeflow 1.9 release roadmap\n \n Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>\n\ndiff --git a/ROADMAP.md b/ROADMAP.md\nindex de3c8951..35021954 100644\n--- a/ROADMAP.md\n+++ b/ROADMAP.md\n@@ -1,6 +1,26 @@\n # Kubeflow Roadmap\n \n-## Kubeflow 1.8 Release, Planned for release: Oct 2023\n+## Kubeflow 1.9 Release, Planned for release: Jul 2024\n+The Kubeflow Community plans to deliver its v1.9 release in Jul 2024 per this [timeline](https://github.com/kubeflow/community/blob/master/releases/release-1.9/README.md#timeline). The high level deliverables are tracked in the [v1.9 Release](https://github.com/orgs/kubeflow/projects/61) Github project board. The v1.9 release process will be managed by the v1.9 [release team](https://github.com/kubeflow/community/blob/master/releases/release-1.9/release-team.md) using the best practices in the [Release Handbook](https://github.com/kubeflow/community/blob/master/releases/handbook.md).\n+\n+### Themes\n+* Kubernetes 1.29 support\n+* CNCF Transition\n+* LLM APIs\n+* New component: Model Registry\n+* Kubeflow Pipelines and kfp-tekton merged in a single GitHub repository\n+\n+### Detailed features, bug fixes and enhancements are identified in the Working Group Roadmaps and Tracking Issues:\n+* [Training Operators](https://github.com/kubeflow/training-operator/issues/1994)\n+* [KServe](https://github.com/orgs/kserve/projects/12)\n+* [Katib](https://github.com/kubeflow/katib/issues/2255)\n+* [Kubeflow Pipelines](https://github.com/kubeflow/pipelines/issues/10402)\n+* [Notebooks](https://github.com/kubeflow/kubeflow/issues/7459)\n+* [Manifests](https://github.com/kubeflow/manifests/issues/2592)\n+* [Security](https://github.com/kubeflow/manifests/issues/2598)\n+* [Model Registry](https://github.com/kubeflow/model-registry/issues/3)\n+\n+## Kubeflow 1.8 Release, Delivered: Nov 2023\n The Kubeflow Community plans to deliver its v1.8 release in Oct 2023 per this [timeline](https://github.com/kubeflow/community/tree/master/releases/release-1.8#timeline). The high level deliverables are tracked in the [v1.8 Release](https://github.com/orgs/kubeflow/projects/58/) Github project board. The v1.8 release process will be managed by the v1.8 [release team](https://github.com/kubeflow/community/blob/a956b3f6f15c49f928e37eaafec40d7f73ee1d5b/releases/release-team.md) using the best practices in the [Release Handbook](https://github.com/kubeflow/community/blob/master/releases/handbook.md).\n \n ### Themes"
}
]
}
generate_pr_dateset_from_branch.py
import subprocess
import json
def run(cmd):
return subprocess.check_output(cmd, shell=True, text=True).strip()
def extract_pr_commits(merge_sha):
try:
parent1 = run(f"git rev-parse {merge_sha}^1")
parent2 = run(f"git rev-parse {merge_sha}^2")
except subprocess.CalledProcessError:
return []
try:
lines = run(f"git log {parent1}..{parent2} --pretty=format:'%H|%an|%s|%ad'").splitlines()
except subprocess.CalledProcessError:
return []
commits = []
for line in lines:
try:
sha, author, msg, date = line.split("|", 3)
files = run(f"git show --pretty=format:'' --name-only {sha}").splitlines()
diff = run(f"git show {sha}")
commits.append({
"sha": sha,
"author": author,
"message": msg,
"date": date,
"files": files,
"diff": diff[:3000] # diff가 너무 길면 자름
})
except:
continue
return commits
def extract_pr_id(title):
if "# " in title:
try:
return title.split("#")[1].split()[0]
except:
return None
return None
output = []
print("🔍 Searching for merged PRs...")
log_lines = run("git log --merges --pretty=format:'%H|%an|%ad|%s'").splitlines()
for line in log_lines:
try:
merge_sha, author, date, title = line.split("|", 3)
except ValueError:
continue
commits = extract_pr_commits(merge_sha)
if not commits:
continue
pr_doc = {
"merge_sha": merge_sha,
"author": author,
"date": date,
"title": title,
"pr_id": extract_pr_id(title),
"commits": commits
}
output.append(pr_doc)
with open("pr_dataset.jsonl", "w") as f:
for item in output:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"✅ Generated pr_dataset.jsonl with {len(output)} merged PRs.")import subprocess
import json
def run(cmd):
return subprocess.check_output(cmd, shell=True, text=True).strip()
def extract_pr_commits(merge_sha):
try:
parent1 = run(f"git rev-parse {merge_sha}^1")
parent2 = run(f"git rev-parse {merge_sha}^2")
except subprocess.CalledProcessError:
return []
try:
lines = run(f"git log {parent1}..{parent2} --pretty=format:'%H|%an|%s|%ad'").splitlines()
except subprocess.CalledProcessError:
return []
commits = []
for line in lines:
try:
sha, author, msg, date = line.split("|", 3)
files = run(f"git show --pretty=format:'' --name-only {sha}").splitlines()
diff = run(f"git show {sha}")
commits.append({
"sha": sha,
"author": author,
"message": msg,
"date": date,
"files": files,
"diff": diff[:3000] # diff가 너무 길면 자름
})
except:
continue
return commits
def extract_pr_id(title):
if "# " in title:
try:
return title.split("#")[1].split()[0]
except:
return None
return None
output = []
print("🔍 Searching for merged PRs...")
log_lines = run("git log --merges --pretty=format:'%H|%an|%ad|%s'").splitlines()
for line in log_lines:
try:
merge_sha, author, date, title = line.split("|", 3)
except ValueError:
continue
commits = extract_pr_commits(merge_sha)
if not commits:
continue
pr_doc = {
"merge_sha": merge_sha,
"author": author,
"date": date,
"title": title,
"pr_id": extract_pr_id(title),
"commits": commits
}
output.append(pr_doc)
with open("pr_dataset.jsonl", "w") as f:
for item in output:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"✅ Generated pr_dataset.jsonl with {len(output)} merged PRs.")RAG 입력용 텍스트 구성
RAG 입력에 적합하도록 요약하여 텍스트 정제후, AIOS Embedding 모델을 통해 벡터를 생성합니다.
$ python3 generate_rag_data_from_pr_dataset.py
✅ RAG용 텍스트 생성 완료 → rag_ready.jsonl
$ head -n 1 rag_ready.jsonl | jq
{
"pr_id": null,
"title": "Merge pull request #7461 from rimolive/kf-1.9",
"text": "PR 제목: Merge pull request #7461 from rimolive/kf-1.9\n병합자: Ricardo Martinelli de Oliveira / 날짜: Tue Mar 5 11:46:36 2024 -0300\n커밋 요약:\n- Ricardo Martinelli de Oliveira (Mon Feb 19 18:51:40 2024 -0300): Update ROADMAP.md\n 변경 파일: ROADMAP.md\n 변경사항:\ncommit 68e4d10bbf976bb89810b4e16e8b765a2a0e68b7\nAuthor: Ricardo Martinelli de Oliveira <rmartine@redhat.com>\nDate: Mon Feb 19 18:51:40 2024 -0300\n\n Update ROADMAP.md\n \n Co-authored-by: Tommy Li <Tommy.chaoping.li@ibm.com>\n\ndiff --git a/ROADMAP.md b/ROADMAP.md\nindex 35021954..cfd39558 100644\n--- a/ROADMAP.md\n+++ b/ROADMAP.md\n@@ -8,7 +8,7 @@ The Kubeflow Community plans to deliver its v1.9 release in Jul 2024 per this [t\n * CNCF Transition\n * LLM APIs\n * New component: Model Registry\n-* Kubeflow Pipelines and kfp-tekton merged in a single GitHub repository\n+* Kubeflow Pipelines and kfp-tekton V2 merged in a single GitHub repository\n \n ### Detailed features, bug fixes and enhancements are identified in the Working Group Roadmaps and Tracking Issues:\n * [Training Operators](https://github.com/kubeflow/training-operator/issues/1994)\n- Ricardo M. Oliveira (Mon Feb 5 14:43:45 2024 -0300): Add Kubeflow 1.9 release roadmap\n 변경 파일: ROADMAP.md\n 변경사항:\ncommit 5c3404782fa2700f8547b37132ff7ab2d1ed99fe\nAuthor: Ricardo M. Oliveira <rmartine@redhat.com>\nDate: Mon Feb 5 14:43:45 2024 -0300\n\n Add Kubeflow 1.9 release roadmap\n \n Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>\n\ndiff --git a/ROADMAP.md b/ROADMAP.md\nindex de3c8951..35021954 100644\n--- a/ROADMAP.md\n+++ b/ROADMAP.md\n@@ -1,6 +1,26 @@\n # Kubeflow Roadmap\n \n-## Kubeflow 1.8 Release, Planned for release: Oct 2023\n+## Kubeflow 1.9 Release, Planned for release: Jul 2024\n+The Kubeflow Community plans to deliver its v1.9 release in Jul 2024 per this [timeline](https://github.com/kubeflow/community/blob/master/releases/release-1.9/README.md#timeline). The high level deliverables are tracked in the [v1.9 Release](https://github.com/orgs/kubeflow/projects/61) Github project board. The v1.9 release process will be managed by the v1.9 [release team](https://github.com/kubeflow/community/blob/master/releases/release-1.9/release-team.md) using the best practices in the [Rele"
}
$ python3 embed_prs.py
✅ Line 1: embedded
✅ Line 2: embedded
✅ Line 3: embedded
✅ Line 4: embedded
✅ Line 5: embedded
✅ Line 6: embedded
✅ Line 7: embedded
✅ Line 8: embedded
✅ Line 9: embedded
✅ Line 10: embedded
... (중략) ...
generate_rag_data_from_pr_dataset.py
import json
def build_text(pr):
lines = []
lines.append(f"PR 제목: {pr['title']}")
lines.append(f"병합자: {pr['author']} / 날짜: {pr['date']}")
lines.append("커밋 요약:")
for c in pr["commits"]:
lines.append(f"- {c['author']} ({c['date']}): {c['message']}")
if c["files"]:
lines.append(f" 변경 파일: {', '.join(c['files'])}")
lines.append(" 변경사항:")
lines.append(c["diff"][:1000]) # 너무 길면 자름
return "\n".join(lines)
with open("pr_dataset.jsonl") as fin, open("rag_ready.jsonl", "w") as fout:
for line in fin:
pr = json.loads(line)
text = build_text(pr)
out = {
"pr_id": pr.get("pr_id"),
"title": pr.get("title"),
"text": text
}
fout.write(json.dumps(out, ensure_ascii=False) + "\n")
print("✅ RAG용 텍스트 생성 완료 → rag_ready.jsonl")import json
def build_text(pr):
lines = []
lines.append(f"PR 제목: {pr['title']}")
lines.append(f"병합자: {pr['author']} / 날짜: {pr['date']}")
lines.append("커밋 요약:")
for c in pr["commits"]:
lines.append(f"- {c['author']} ({c['date']}): {c['message']}")
if c["files"]:
lines.append(f" 변경 파일: {', '.join(c['files'])}")
lines.append(" 변경사항:")
lines.append(c["diff"][:1000]) # 너무 길면 자름
return "\n".join(lines)
with open("pr_dataset.jsonl") as fin, open("rag_ready.jsonl", "w") as fout:
for line in fin:
pr = json.loads(line)
text = build_text(pr)
out = {
"pr_id": pr.get("pr_id"),
"title": pr.get("title"),
"text": text
}
fout.write(json.dumps(out, ensure_ascii=False) + "\n")
print("✅ RAG용 텍스트 생성 완료 → rag_ready.jsonl")embed_prs.py
- 코드 내 EMBEDDING_API_URL인 AIOS_LLM_Private_Endpoint과 model의 MODEL_ID는 LLM 이용 가이드를 참고해주세요. 아래의 예시처럼 입력할 수 있습니다.
- EMBEDDING_API_URL = “{AIOS LLM 프라이빗 엔드포인트}/{API}”
- “model”: “{모델ID}”
import json
import requests
import time
EMBEDDING_API_URL = "AIOS_LLM_Private_Endpoint"
HEADERS = {"Content-Type": "application/json"}
def get_embedding(text):
payload = {
"model": "MODEL_ID",
"input": text,
"stream": False
}
try:
response = requests.post(EMBEDDING_API_URL, headers=HEADERS, json=payload)
if response.status_code == 200:
result = response.json()
return result["data"][0]["embedding"]
else:
print(f"❌ Failed with status {response.status_code}: {response.text}")
return None
except Exception as e:
print(f"⚠️ Error calling embedding API: {e}")
return None
def main():
with open("rag_ready.jsonl", "r", encoding="utf-8") as fin, \
open("rag_embedded.jsonl", "w", encoding="utf-8") as fout:
for i, line in enumerate(fin, start=1):
try:
item = json.loads(line)
text = item.get("text", "").strip()
if not text:
print(f"⚠️ Line {i}: empty text, skipping")
continue
embedding = get_embedding(text)
if embedding is None:
print(f"⚠️ Line {i}: embedding failed, skipping")
continue
item["embedding"] = embedding
fout.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"✅ Line {i}: embedded")
time.sleep(0.2) # optional: rate limiting
except Exception as e:
print(f"❌ Line {i}: error - {e}")
continue
if __name__ == "__main__":
main()import json
import requests
import time
EMBEDDING_API_URL = "AIOS_LLM_Private_Endpoint"
HEADERS = {"Content-Type": "application/json"}
def get_embedding(text):
payload = {
"model": "MODEL_ID",
"input": text,
"stream": False
}
try:
response = requests.post(EMBEDDING_API_URL, headers=HEADERS, json=payload)
if response.status_code == 200:
result = response.json()
return result["data"][0]["embedding"]
else:
print(f"❌ Failed with status {response.status_code}: {response.text}")
return None
except Exception as e:
print(f"⚠️ Error calling embedding API: {e}")
return None
def main():
with open("rag_ready.jsonl", "r", encoding="utf-8") as fin, \
open("rag_embedded.jsonl", "w", encoding="utf-8") as fout:
for i, line in enumerate(fin, start=1):
try:
item = json.loads(line)
text = item.get("text", "").strip()
if not text:
print(f"⚠️ Line {i}: empty text, skipping")
continue
embedding = get_embedding(text)
if embedding is None:
print(f"⚠️ Line {i}: embedding failed, skipping")
continue
item["embedding"] = embedding
fout.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"✅ Line {i}: embedded")
time.sleep(0.2) # optional: rate limiting
except Exception as e:
print(f"❌ Line {i}: error - {e}")
continue
if __name__ == "__main__":
main()OpenSearch에 업로드
벡터 파일을 OpenSearch에 업로드하여 검색 가능한 형태로 구성합니다.
- 이 튜토리얼에서는 VM 내부에 OpenSearch를 구성하고, http://localhost:9200 주소로 호출합니다. 사용자 벡터 데이터베이스를 사용하는 경우에는 URL을 알맞게 변경해 주세요.
# OpenSearch에 "kubeflow-pr-rag-index"이름의 인덱스 생성
$ curl -X PUT "http://localhost:9200/kubeflow-pr-rag-index" \
-H "Content-Type: application/json" \
-d '{
"settings": {
"index": {
"knn": true
}
},
"mappings": {
"properties": {
"title": { "type": "text" },
"text": { "type": "text" },
"embedding": {
"type": "knn_vector",
"dimension": 1024,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "nmslib"
}
}
}
}
}'
{"acknowledged":true,"shards_acknowledged":true,"index":"kubeflow-pr-rag-index"}
$ python3 upload_rag_documnets.py
✅ Uploaded document pr-1
✅ Uploaded document pr-2
✅ Uploaded document pr-3
✅ Uploaded document pr-4
✅ Uploaded document pr-5
✅ Uploaded document pr-6
✅ Uploaded document pr-7
✅ Uploaded document pr-8
✅ Uploaded document pr-9
✅ Uploaded document pr-10
... (중략) ...
upload_rag_documnets.py
import json
from opensearchpy import OpenSearch
# OpenSearch 연결 설정
client = OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
use_ssl=False,
verify_certs=False
)
index_name = "kubeflow-pr-rag-index"
with open("rag_embedded.jsonl", "r", encoding="utf-8") as f:
for i, line in enumerate(f, 1):
try:
doc = json.loads(line)
title = doc.get("title", "")
text = doc.get("text", "")
embedding = doc.get("embedding", [])
if not embedding or len(embedding) != 1024:
print(f"⚠️ Line {i}: Invalid embedding length, skipping.")
continue
body = {
"title": title,
"text": text,
"embedding": embedding
}
doc_id = f"pr-{i}"
client.index(index=index_name, id=doc_id, body=body)
print(f"✅ Uploaded document {doc_id}")
except Exception as e:
print(f"❌ Line {i}: Failed to upload due to {e}")import json
from opensearchpy import OpenSearch
# OpenSearch 연결 설정
client = OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
use_ssl=False,
verify_certs=False
)
index_name = "kubeflow-pr-rag-index"
with open("rag_embedded.jsonl", "r", encoding="utf-8") as f:
for i, line in enumerate(f, 1):
try:
doc = json.loads(line)
title = doc.get("title", "")
text = doc.get("text", "")
embedding = doc.get("embedding", [])
if not embedding or len(embedding) != 1024:
print(f"⚠️ Line {i}: Invalid embedding length, skipping.")
continue
body = {
"title": title,
"text": text,
"embedding": embedding
}
doc_id = f"pr-{i}"
client.index(index=index_name, id=doc_id, body=body)
print(f"✅ Uploaded document {doc_id}")
except Exception as e:
print(f"❌ Line {i}: Failed to upload due to {e}")OpenSearch Dashboards에서 확인
아래 그림과 같이 OpenSearch Dashboard에서 kubeflow-pr-rag-index 에 해당하는 데이터를 확인할 수 있습니다. 데이터는 title, text, embedding으로 구성되어 있습니다.
왼쪽 메뉴 → Dashboards Management → Index patterns → Create index pattern 클릭
RAG QA Application 구성
사용자의 질의를 임베딩하여 검색 질의로 변환한 뒤, RAG를 활용해 연관 문서를 추출하고, AIOS Chat 모델을 통해 최종 결과를 제공합니다.
- 이 코드에서는 유사도 검색 방식으로 OpenSearch의 KNN(K-Nearest Neightbors) 검색과 AIOS에서 제공하는 Embedding 모델의 Score API를 호출하여 입력 벡터와 가장 유사한 문서를 계산하는 방식을 지원합니다. 사용자는 두 방식 중 하나를 선택하여 사용할 수 있으며, 이 튜토리얼에서는 AIOS Score API 기반의 유사도 검색 방식을 사용합니다.
- OpenSearch의 KNN 호출 :
docs = search_similar_docs(query_vec, K) - AIOS Embedding 모델 호출 :
docs = search_similar_docs_with_score(question, K)
- OpenSearch의 KNN 호출 :
- 코드 내 EMBEDDING_API_URL, LLM_API_URL, SCORE_API_URL, MODEL_EMBEDDING, MODEL_CHAT은 LLM 이용 가이드를 참고하여 사용할 API와 Model로 입력해주세요. 아래의 예시처럼 입력할 수 있습니다.
- EMBEDDING_API_URL = “{AIOS LLM 프라이빗 엔드포인트}/{API}”
- MODEL_EMBEDDING = “{모델ID}”
app.py
import streamlit as st
import requests
from opensearchpy import OpenSearch
# 설정
def get_opensearch_client():
return OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
use_ssl=False,
verify_certs=False
)
EMBEDDING_API_URL = "YOUR_EMBEDDING_API_URL"
LLM_API_URL = "YOUR_LLM_API_URL"
SCORE_API_URL = "YOUR_SCORE_API_URL"
MODEL_EMBEDDING = "YOUR_MODEL_EMBEDDING"
MODEL_CHAT = "YOUR_MODEL_CHAT"
INDEX_NAME = "kubeflow-pr-rag-index"
VECTOR_DIM = 1024
K = 3
# 임베딩 생성 함수
def embed_text(text):
res = requests.post(
EMBEDDING_API_URL,
headers={"Content-Type": "application/json"},
json={"model": MODEL_EMBEDDING, "input": text, "stream": False}
)
return res.json()["data"][0]["embedding"]
# 모든 문서 불러오기 (OpenSearch)
def fetch_all_docs():
client = get_opensearch_client()
res = client.search(
index=INDEX_NAME,
body={
"size": 1000, # 필요한 만큼 설정 (작을 경우 스크롤 API 활용 가능)
"query": {"match_all": {}}
}
)
return [doc["_source"] for doc in res["hits"]["hits"]]
# 두 문장 리스트를 받아 유사도 점수 계산
def score_text_pairs(text_1, text_2):
payload = {
"model": MODEL_EMBEDDING,
"encoding_format": "float",
"text_1": text_1,
"text_2": text_2
}
headers = {
"accept": "application/json",
"Content-Type": "application/json"
}
response = requests.post(SCORE_API_URL, headers=headers, json=payload)
response.raise_for_status()
# 유사도 score만 추출
scores = [item["score"] for item in response.json()["data"]]
return scores
# 유사 문서 선택 (점수 기반 Top-K)
def search_similar_docs_with_score(query, k):
all_docs = fetch_all_docs()
doc_texts = [doc["text"] for doc in all_docs]
queries = [query] * len(doc_texts)
scores = score_text_pairs(queries, doc_texts)
# 점수 높은 순으로 정렬
scored_docs = sorted(zip(all_docs, scores), key=lambda x: x[1], reverse=True)
top_docs = [doc for doc, score in scored_docs[:k]]
return top_docs
# KNN 검색 함수
def search_similar_docs(query_vector, k):
client = get_opensearch_client()
res = client.search(
index=INDEX_NAME,
body={
"size": k,
"query": {
"knn": {
"embedding": {
"vector": query_vector,
"k": k
}
}
}
}
)
return [doc["_source"] for doc in res["hits"]["hits"]]
# 프롬프트 구성
def build_prompt(docs, question):
context_blocks = []
for i, doc in enumerate(docs):
context_blocks.append(f"[문서 {i+1}]\n{doc['text']}")
context = "\n\n".join(context_blocks)
return f"""다음은 Kubeflow 프로젝트에서 유사한 PR 문서들입니다:
{context}
사용자 질문: {question}
위 내용을 참고하여 질문에 대해 자연어로 답변해 주세요. 가능한 문서 번호를 인용해서 설명해주세요."""
# LLM 호출 함수
def call_llm(prompt):
res = requests.post(
LLM_API_URL,
headers={"Content-Type": "application/json"},
json={
"model": MODEL_CHAT,
"messages": [{"role": "user", "content": prompt}],
"stream": False
}
)
return res.json()["choices"][0]["message"]["content"]
# Streamlit UI 시작
st.set_page_config(page_title="RAG QA", layout="wide")
st.title("📘 RAG-based PR Summary Chatbot")
question = st.text_input("Enter your question:", "Please summarize the PR the Add Kubeflow 1.9 release roadmap.")
if st.button("Searching and generating response"):
with st.spinner("Generating embeddings..."):
query_vec = embed_text(question)
with st.spinner("Searching for similar documents in OpenSearch..."):
#docs = search_similar_docs(query_vec, K)
docs = search_similar_docs_with_score(question, K)
with st.spinner("Constructing prompt and invoking LLM..."):
prompt = build_prompt(docs, question)
answer = call_llm(prompt)
st.markdown("### 🤖 LLM response")
st.write(answer)
st.markdown("---")
st.markdown("### 🔍 Highlighted PR document")
for i, doc in enumerate(docs):
with st.expander(f"문서 {i+1}: {doc['title']}"):
# 간단한 질문 키워드 하이라이트
highlighted = doc['text'].replace(question.split()[0], f"**{question.split()[0]}**")
st.markdown(highlighted)import streamlit as st
import requests
from opensearchpy import OpenSearch
# 설정
def get_opensearch_client():
return OpenSearch(
hosts=[{"host": "localhost", "port": 9200}],
use_ssl=False,
verify_certs=False
)
EMBEDDING_API_URL = "YOUR_EMBEDDING_API_URL"
LLM_API_URL = "YOUR_LLM_API_URL"
SCORE_API_URL = "YOUR_SCORE_API_URL"
MODEL_EMBEDDING = "YOUR_MODEL_EMBEDDING"
MODEL_CHAT = "YOUR_MODEL_CHAT"
INDEX_NAME = "kubeflow-pr-rag-index"
VECTOR_DIM = 1024
K = 3
# 임베딩 생성 함수
def embed_text(text):
res = requests.post(
EMBEDDING_API_URL,
headers={"Content-Type": "application/json"},
json={"model": MODEL_EMBEDDING, "input": text, "stream": False}
)
return res.json()["data"][0]["embedding"]
# 모든 문서 불러오기 (OpenSearch)
def fetch_all_docs():
client = get_opensearch_client()
res = client.search(
index=INDEX_NAME,
body={
"size": 1000, # 필요한 만큼 설정 (작을 경우 스크롤 API 활용 가능)
"query": {"match_all": {}}
}
)
return [doc["_source"] for doc in res["hits"]["hits"]]
# 두 문장 리스트를 받아 유사도 점수 계산
def score_text_pairs(text_1, text_2):
payload = {
"model": MODEL_EMBEDDING,
"encoding_format": "float",
"text_1": text_1,
"text_2": text_2
}
headers = {
"accept": "application/json",
"Content-Type": "application/json"
}
response = requests.post(SCORE_API_URL, headers=headers, json=payload)
response.raise_for_status()
# 유사도 score만 추출
scores = [item["score"] for item in response.json()["data"]]
return scores
# 유사 문서 선택 (점수 기반 Top-K)
def search_similar_docs_with_score(query, k):
all_docs = fetch_all_docs()
doc_texts = [doc["text"] for doc in all_docs]
queries = [query] * len(doc_texts)
scores = score_text_pairs(queries, doc_texts)
# 점수 높은 순으로 정렬
scored_docs = sorted(zip(all_docs, scores), key=lambda x: x[1], reverse=True)
top_docs = [doc for doc, score in scored_docs[:k]]
return top_docs
# KNN 검색 함수
def search_similar_docs(query_vector, k):
client = get_opensearch_client()
res = client.search(
index=INDEX_NAME,
body={
"size": k,
"query": {
"knn": {
"embedding": {
"vector": query_vector,
"k": k
}
}
}
}
)
return [doc["_source"] for doc in res["hits"]["hits"]]
# 프롬프트 구성
def build_prompt(docs, question):
context_blocks = []
for i, doc in enumerate(docs):
context_blocks.append(f"[문서 {i+1}]\n{doc['text']}")
context = "\n\n".join(context_blocks)
return f"""다음은 Kubeflow 프로젝트에서 유사한 PR 문서들입니다:
{context}
사용자 질문: {question}
위 내용을 참고하여 질문에 대해 자연어로 답변해 주세요. 가능한 문서 번호를 인용해서 설명해주세요."""
# LLM 호출 함수
def call_llm(prompt):
res = requests.post(
LLM_API_URL,
headers={"Content-Type": "application/json"},
json={
"model": MODEL_CHAT,
"messages": [{"role": "user", "content": prompt}],
"stream": False
}
)
return res.json()["choices"][0]["message"]["content"]
# Streamlit UI 시작
st.set_page_config(page_title="RAG QA", layout="wide")
st.title("📘 RAG-based PR Summary Chatbot")
question = st.text_input("Enter your question:", "Please summarize the PR the Add Kubeflow 1.9 release roadmap.")
if st.button("Searching and generating response"):
with st.spinner("Generating embeddings..."):
query_vec = embed_text(question)
with st.spinner("Searching for similar documents in OpenSearch..."):
#docs = search_similar_docs(query_vec, K)
docs = search_similar_docs_with_score(question, K)
with st.spinner("Constructing prompt and invoking LLM..."):
prompt = build_prompt(docs, question)
answer = call_llm(prompt)
st.markdown("### 🤖 LLM response")
st.write(answer)
st.markdown("---")
st.markdown("### 🔍 Highlighted PR document")
for i, doc in enumerate(docs):
with st.expander(f"문서 {i+1}: {doc['title']}"):
# 간단한 질문 키워드 하이라이트
highlighted = doc['text'].replace(question.split()[0], f"**{question.split()[0]}**")
st.markdown(highlighted)RAG QA Chatbot UI 사용 방법
호출 코드 실행
- VM에서 Streamlit 실행배경색 변경
streamlit run app.py --server.port 8501 --server.address 0.0.0.0streamlit run app.py --server.port 8501 --server.address 0.0.0.0코드 블럭. Streamlit 실행
You can now view your Streamlit app in your browser.
URL: http://0.0.0.0:8501
브라우저에서 http://{your_server_ip}:8501 또는 서버 SSH 터널링 설정 후 http://0.0.0.0:8501 로 접속합니다. SSH 터널링은 아래를 참고하세요.
2. 로컬PC에서 터널링으로 VM접속 (http://0.0.0.0:8501 로 접속하는 경우)
ssh -i {your_pemkey.pem} -L 8501:localhost:8501 ubuntu@{your_server_ip}ssh -i {your_pemkey.pem} -L 8501:localhost:8501 ubuntu@{your_server_ip}사용 예시
Kubeflow 프로젝트 Git에서 Add Kubeflow 1.9 release roadmap PR 에 대한 요약을 질문합니다.
Kubeflow 프로젝트의 해당 PR에 대한 정보입니다.
마무리
이번 튜토리얼에서는 AIOS에서 제공하는 AI 모델을 활용하여 GIT PR 관련 데이터를 벡터화하고, OpenSearch 기반의 벡터 검색 및 LLM 응답을 조합하여 PR 리뷰 보조 챗봇을 구현해 보았습니다.이를 통해 과거 PR 히스토리에 기반한 질의응답이 가능해져, 개발자의 코드 리뷰 효율성과 품질을 향상시킬 수 있습니다. 본 시스템은 다음과 같은 방식으로 사용자 환경에 맞게 확장 및 커스터마이징할 수 있습니다.
- 벡터 데이터베이스 교체 : OpenSearch 외에 SCP Search Engine 상품 활용, 사용자 벡터 데이터베이스를 연동할 수 있습니다.
- 실시간 데이터 수집 연동 : Github Webhook 또는 Gitlab API 연동을 통해 실시간 PR 생성/업데이트 정보를 수집하고 자동 인덱싱 가능합니다.
- 대화형 UI 고도화: Streamlit 외에도 Slack Bot, 사내 메신저 등 다양한 인터페이스로 확장 가능합니다.
이번 튜토리얼을 기반으로 실제 서비스 목적에 따라 적합한 AIOS 기반 협업 도우미를 직접 구축해 보시길 바랍니다.



