RAG

목표

AIOS에서 제공하는 AI모델을 활용해 GIT 로그, PR 설명, 리뷰 코멘트 등을 벡터화하고, 이를 기반으로 RAG 기반의 PR리뷰 보조 챗봇을 구현합니다.

참고
RAG
RAG(Retrieval-Augmented Generation, 검색 증강 생성)는 대규모 언어 모델(LLM)이 응답을 생성하기 전에 외부의 신뢰할 수 있는 지식 베이스나 데이터베이스에서 관련 정보를 검색(Retrieval)하고, 그 검색된 정보를 바탕으로 답변을 생성(Generation)하는 자연어 처리 기술입니다. 기존 LLM은 훈련된 데이터에만 의존하기 때문에 최신 정보나 특정 도메인에 특화된 지식을 반영하는 데 한계가 있습니다. RAG는 이 한계를 보완하여, 사용자의 질문에 대해 먼저 관련 문서나 데이터를 벡터 검색 등의 방법으로 찾아내고, 그 정보를 활용해 더 정확하고 맥락에 맞는 답변을 생성합니다.

환경

이 튜토리얼을 진행하려면 아래와 같은 환경이 준비되어 있어야 합니다.

시스템 환경

  • Python 3.10 +
  • pip

설치 필요 패키지

배경색 변경
pip install streamlit
pip install opensearch-py
pip install streamlit
pip install opensearch-py
코드 블럭. streamlit, opensearch 패키지 설치

사전 준비 사항

  • 사용자 지식 베이스나 데이터베이스
참고
  • 이 튜토리얼에서는 VM 내부에 OpenSearch를 구성하여 벡터 데이터베이스로 활용하였습니다.
  • 사용자의 기존 저장소를 사용하거나, SCP의 Search Engine 상품을 활용 할 수 있습니다.

시스템 아키텍처

GitHub PR 데이터를 수집하여 RAG 기반 QA 시스템을 구성하고, AIOS 모델을 활용해 임베딩 및 응답 생성을 수행하는 전체 흐름을 보여줍니다.

그림1

RAG Flow

  1. Git 저장소에서 PR 데이터를 수집하여 pr_dataset.jsonl 생성
  2. RAG 입력에 적합하도록 텍스트 정제 → rag_ready.jsonl
  3. AIOS Embedding 모델을 통해 벡터 생성 후 rag_embedded.jsonl 파일로 저장
  4. 해당 벡터 파일을 OpenSearch에 업로드하여 검색 가능한 형태로 구성

RAG QA Application Flow

  1. 사용자의 질의(예: “이 PR을 분석해줘.")를 임베딩하여 검색 질의로 변환
  2. OpenSearch에서 KNN 검색 또는 AIOS Embedding 모델(score API) 호출을 통해 연관 문서 추출
  3. 추출된 문서 기반으로 프롬프트를 구성하고 AIOS Chat 모델로 전송
  4. 응답을 생성하여 최종 결과 출력

구현

참고
  • 이 튜토리얼에서는 kubeflow 프로젝트 github 을 활용하였습니다.
  • 벡터 데이터베이스 데이터는 일회성으로 구성하였으며, 실제 서비스 시에는 실시간 연동 등으로 커스터마이징하여 사용하실 수 있습니다.

프로젝트 구조

rag-tutorial
├── app.py                                  # streamlit 메인 웹 앱 파일
├── generate_pr_dateset_from_branch.py      # 1. Github PR 데이터 수집
├── generate_rag_data_from_pr_dataset.py    # 2. RAG 입력용 텍스트 구성 (RAG 입력에 적합하도록 요약하여 텍스트 정제)
├── embed_prs.py                            # 3. RAG 입력용 텍스트 구성 (AIOS Embedding 모델을 통해 벡터 생성)
└── upload_rag_documnets.py                 # 4. OpenSearch에 업로드

Github PR 데이터 수집

Git 저장소에서 PR 데이터를 수집하여 pr_dataset.jsonl 생성합니다.

참고
  • 아래 코드는 git 디렉토리 내에서 실행합니다.
  • 추가 PR 병합 기록이 없거나, PR 병합이 rebase 방식 또는 squash-merge 방식으로 이루어져 정규 merge 커밋이 생성되지 않으면 데이터 수집이 되지 않습니다.
  • 데이터 수집 시 각 커밋의 diff 항목은 최대 3000자로 제한하였습니다. 실제 시스템을 구성할 때는 효율적인 검색과 응답 생성을 위해, 내용의 길이나 구조에 따라 적절한 청킹(chunking) 작업이 추가적으로 필요합니다.
$ git branch
* (HEAD detached at v1.9.1)
  master

$ python3 generate_pr_dateset_from_branch.py
🔍 Searching for merged PRs...
✅ Generated pr_dataset.jsonl with 43 merged PRs.

$ head -n 1 pr_dataset.jsonl | jq
{
  "merge_sha": "167e162ef7dffc033ddc82e55b0a108db27fc340",
  "author": "Ricardo Martinelli de Oliveira",
  "date": "Tue Mar 5 11:46:36 2024 -0300",
  "title": "Merge pull request #7461 from rimolive/kf-1.9",
  "pr_id": null,
  "commits": [
    {
      "sha": "68e4d10bbf976bb89810b4e16e8b765a2a0e68b7",
      "author": "Ricardo Martinelli de Oliveira",
      "message": "Update ROADMAP.md",
      "date": "Mon Feb 19 18:51:40 2024 -0300",
      "files": [
        "ROADMAP.md"
      ],
      "diff": "commit 68e4d10bbf976bb89810b4e16e8b765a2a0e68b7\nAuthor: Ricardo Martinelli de Oliveira <rmartine@redhat.com>\nDate:   Mon Feb 19 18:51:40 2024 -0300\n\n    Update ROADMAP.md\n    \n    Co-authored-by: Tommy Li <Tommy.chaoping.li@ibm.com>\n\ndiff --git a/ROADMAP.md b/ROADMAP.md\nindex 35021954..cfd39558 100644\n--- a/ROADMAP.md\n+++ b/ROADMAP.md\n@@ -8,7 +8,7 @@ The Kubeflow Community plans to deliver its v1.9 release in Jul 2024 per this [t\n * CNCF Transition\n * LLM APIs\n * New component: Model Registry\n-* Kubeflow Pipelines and kfp-tekton merged in a single GitHub repository\n+* Kubeflow Pipelines and kfp-tekton V2 merged in a single GitHub repository\n \n ### Detailed features, bug fixes and enhancements are identified in the Working Group Roadmaps and Tracking Issues:\n * [Training Operators](https://github.com/kubeflow/training-operator/issues/1994)"
    },
    {
      "sha": "5c3404782fa2700f8547b37132ff7ab2d1ed99fe",
      "author": "Ricardo M. Oliveira",
      "message": "Add Kubeflow 1.9 release roadmap",
      "date": "Mon Feb 5 14:43:45 2024 -0300",
      "files": [
        "ROADMAP.md"
      ],
      "diff": "commit 5c3404782fa2700f8547b37132ff7ab2d1ed99fe\nAuthor: Ricardo M. Oliveira <rmartine@redhat.com>\nDate:   Mon Feb 5 14:43:45 2024 -0300\n\n    Add Kubeflow 1.9 release roadmap\n    \n    Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>\n\ndiff --git a/ROADMAP.md b/ROADMAP.md\nindex de3c8951..35021954 100644\n--- a/ROADMAP.md\n+++ b/ROADMAP.md\n@@ -1,6 +1,26 @@\n # Kubeflow Roadmap\n \n-## Kubeflow 1.8 Release, Planned for release: Oct 2023\n+## Kubeflow 1.9 Release, Planned for release: Jul 2024\n+The Kubeflow Community plans to deliver its v1.9 release in Jul 2024 per this [timeline](https://github.com/kubeflow/community/blob/master/releases/release-1.9/README.md#timeline). The high level deliverables are tracked in the [v1.9 Release](https://github.com/orgs/kubeflow/projects/61) Github project board. The v1.9 release process will be managed by the v1.9 [release team](https://github.com/kubeflow/community/blob/master/releases/release-1.9/release-team.md) using the best practices in the [Release Handbook](https://github.com/kubeflow/community/blob/master/releases/handbook.md).\n+\n+### Themes\n+* Kubernetes 1.29 support\n+* CNCF Transition\n+* LLM APIs\n+* New component: Model Registry\n+* Kubeflow Pipelines and kfp-tekton merged in a single GitHub repository\n+\n+### Detailed features, bug fixes and enhancements are identified in the Working Group Roadmaps and Tracking Issues:\n+* [Training Operators](https://github.com/kubeflow/training-operator/issues/1994)\n+* [KServe](https://github.com/orgs/kserve/projects/12)\n+* [Katib](https://github.com/kubeflow/katib/issues/2255)\n+* [Kubeflow Pipelines](https://github.com/kubeflow/pipelines/issues/10402)\n+* [Notebooks](https://github.com/kubeflow/kubeflow/issues/7459)\n+* [Manifests](https://github.com/kubeflow/manifests/issues/2592)\n+* [Security](https://github.com/kubeflow/manifests/issues/2598)\n+* [Model Registry](https://github.com/kubeflow/model-registry/issues/3)\n+\n+## Kubeflow 1.8 Release, Delivered: Nov 2023\n The Kubeflow Community plans to deliver its v1.8 release in Oct 2023 per this [timeline](https://github.com/kubeflow/community/tree/master/releases/release-1.8#timeline). The high level deliverables are tracked in the [v1.8 Release](https://github.com/orgs/kubeflow/projects/58/) Github project board. The v1.8 release process will be managed by the v1.8 [release team](https://github.com/kubeflow/community/blob/a956b3f6f15c49f928e37eaafec40d7f73ee1d5b/releases/release-team.md) using the best practices in the [Release Handbook](https://github.com/kubeflow/community/blob/master/releases/handbook.md).\n \n ### Themes"
    }
  ]
}

generate_pr_dateset_from_branch.py

배경색 변경
import subprocess
import json

def run(cmd):
    return subprocess.check_output(cmd, shell=True, text=True).strip()

def extract_pr_commits(merge_sha):
    try:
        parent1 = run(f"git rev-parse {merge_sha}^1")
        parent2 = run(f"git rev-parse {merge_sha}^2")
    except subprocess.CalledProcessError:
        return []

    try:
        lines = run(f"git log {parent1}..{parent2} --pretty=format:'%H|%an|%s|%ad'").splitlines()
    except subprocess.CalledProcessError:
        return []

    commits = []
    for line in lines:
        try:
            sha, author, msg, date = line.split("|", 3)
            files = run(f"git show --pretty=format:'' --name-only {sha}").splitlines()
            diff = run(f"git show {sha}")
            commits.append({
                "sha": sha,
                "author": author,
                "message": msg,
                "date": date,
                "files": files,
                "diff": diff[:3000]  # diff가 너무 길면 자름
            })
        except:
            continue
    return commits

def extract_pr_id(title):
    if "# " in title:
        try:
            return title.split("#")[1].split()[0]
        except:
            return None
    return None

output = []

print("🔍 Searching for merged PRs...")
log_lines = run("git log --merges --pretty=format:'%H|%an|%ad|%s'").splitlines()

for line in log_lines:
    try:
        merge_sha, author, date, title = line.split("|", 3)
    except ValueError:
        continue

    commits = extract_pr_commits(merge_sha)
    if not commits:
        continue

    pr_doc = {
        "merge_sha": merge_sha,
        "author": author,
        "date": date,
        "title": title,
        "pr_id": extract_pr_id(title),
        "commits": commits
    }

    output.append(pr_doc)

with open("pr_dataset.jsonl", "w") as f:
    for item in output:
        f.write(json.dumps(item, ensure_ascii=False) + "\n")

print(f"✅ Generated pr_dataset.jsonl with {len(output)} merged PRs.")
import subprocess
import json

def run(cmd):
    return subprocess.check_output(cmd, shell=True, text=True).strip()

def extract_pr_commits(merge_sha):
    try:
        parent1 = run(f"git rev-parse {merge_sha}^1")
        parent2 = run(f"git rev-parse {merge_sha}^2")
    except subprocess.CalledProcessError:
        return []

    try:
        lines = run(f"git log {parent1}..{parent2} --pretty=format:'%H|%an|%s|%ad'").splitlines()
    except subprocess.CalledProcessError:
        return []

    commits = []
    for line in lines:
        try:
            sha, author, msg, date = line.split("|", 3)
            files = run(f"git show --pretty=format:'' --name-only {sha}").splitlines()
            diff = run(f"git show {sha}")
            commits.append({
                "sha": sha,
                "author": author,
                "message": msg,
                "date": date,
                "files": files,
                "diff": diff[:3000]  # diff가 너무 길면 자름
            })
        except:
            continue
    return commits

def extract_pr_id(title):
    if "# " in title:
        try:
            return title.split("#")[1].split()[0]
        except:
            return None
    return None

output = []

print("🔍 Searching for merged PRs...")
log_lines = run("git log --merges --pretty=format:'%H|%an|%ad|%s'").splitlines()

for line in log_lines:
    try:
        merge_sha, author, date, title = line.split("|", 3)
    except ValueError:
        continue

    commits = extract_pr_commits(merge_sha)
    if not commits:
        continue

    pr_doc = {
        "merge_sha": merge_sha,
        "author": author,
        "date": date,
        "title": title,
        "pr_id": extract_pr_id(title),
        "commits": commits
    }

    output.append(pr_doc)

with open("pr_dataset.jsonl", "w") as f:
    for item in output:
        f.write(json.dumps(item, ensure_ascii=False) + "\n")

print(f"✅ Generated pr_dataset.jsonl with {len(output)} merged PRs.")
코드 블럭. generate_pr_dateset_from_branch.py

RAG 입력용 텍스트 구성

RAG 입력에 적합하도록 요약하여 텍스트 정제후, AIOS Embedding 모델을 통해 벡터를 생성합니다.

$ python3 generate_rag_data_from_pr_dataset.py
✅ RAG용 텍스트 생성 완료 → rag_ready.jsonl
$ head -n 1 rag_ready.jsonl | jq
{
  "pr_id": null,
  "title": "Merge pull request #7461 from rimolive/kf-1.9",
  "text": "PR 제목: Merge pull request #7461 from rimolive/kf-1.9\n병합자: Ricardo Martinelli de Oliveira / 날짜: Tue Mar 5 11:46:36 2024 -0300\n커밋 요약:\n- Ricardo Martinelli de Oliveira (Mon Feb 19 18:51:40 2024 -0300): Update ROADMAP.md\n  변경 파일: ROADMAP.md\n  변경사항:\ncommit 68e4d10bbf976bb89810b4e16e8b765a2a0e68b7\nAuthor: Ricardo Martinelli de Oliveira <rmartine@redhat.com>\nDate:   Mon Feb 19 18:51:40 2024 -0300\n\n    Update ROADMAP.md\n    \n    Co-authored-by: Tommy Li <Tommy.chaoping.li@ibm.com>\n\ndiff --git a/ROADMAP.md b/ROADMAP.md\nindex 35021954..cfd39558 100644\n--- a/ROADMAP.md\n+++ b/ROADMAP.md\n@@ -8,7 +8,7 @@ The Kubeflow Community plans to deliver its v1.9 release in Jul 2024 per this [t\n * CNCF Transition\n * LLM APIs\n * New component: Model Registry\n-* Kubeflow Pipelines and kfp-tekton merged in a single GitHub repository\n+* Kubeflow Pipelines and kfp-tekton V2 merged in a single GitHub repository\n \n ### Detailed features, bug fixes and enhancements are identified in the Working Group Roadmaps and Tracking Issues:\n * [Training Operators](https://github.com/kubeflow/training-operator/issues/1994)\n- Ricardo M. Oliveira (Mon Feb 5 14:43:45 2024 -0300): Add Kubeflow 1.9 release roadmap\n  변경 파일: ROADMAP.md\n  변경사항:\ncommit 5c3404782fa2700f8547b37132ff7ab2d1ed99fe\nAuthor: Ricardo M. Oliveira <rmartine@redhat.com>\nDate:   Mon Feb 5 14:43:45 2024 -0300\n\n    Add Kubeflow 1.9 release roadmap\n    \n    Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>\n\ndiff --git a/ROADMAP.md b/ROADMAP.md\nindex de3c8951..35021954 100644\n--- a/ROADMAP.md\n+++ b/ROADMAP.md\n@@ -1,6 +1,26 @@\n # Kubeflow Roadmap\n \n-## Kubeflow 1.8 Release, Planned for release: Oct 2023\n+## Kubeflow 1.9 Release, Planned for release: Jul 2024\n+The Kubeflow Community plans to deliver its v1.9 release in Jul 2024 per this [timeline](https://github.com/kubeflow/community/blob/master/releases/release-1.9/README.md#timeline). The high level deliverables are tracked in the [v1.9 Release](https://github.com/orgs/kubeflow/projects/61) Github project board. The v1.9 release process will be managed by the v1.9 [release team](https://github.com/kubeflow/community/blob/master/releases/release-1.9/release-team.md) using the best practices in the [Rele"
}

$ python3 embed_prs.py
✅ Line 1: embedded
✅ Line 2: embedded
✅ Line 3: embedded
✅ Line 4: embedded
✅ Line 5: embedded
✅ Line 6: embedded
✅ Line 7: embedded
✅ Line 8: embedded
✅ Line 9: embedded
✅ Line 10: embedded
... (중략) ...

generate_rag_data_from_pr_dataset.py

배경색 변경
import json

def build_text(pr):
    lines = []
    lines.append(f"PR 제목: {pr['title']}")
    lines.append(f"병합자: {pr['author']} / 날짜: {pr['date']}")
    lines.append("커밋 요약:")
    for c in pr["commits"]:
        lines.append(f"- {c['author']} ({c['date']}): {c['message']}")
        if c["files"]:
            lines.append(f"  변경 파일: {', '.join(c['files'])}")
        lines.append("  변경사항:")
        lines.append(c["diff"][:1000])  # 너무 길면 자름
    return "\n".join(lines)

with open("pr_dataset.jsonl") as fin, open("rag_ready.jsonl", "w") as fout:
    for line in fin:
        pr = json.loads(line)
        text = build_text(pr)
        out = {
            "pr_id": pr.get("pr_id"),
            "title": pr.get("title"),
            "text": text
        }
        fout.write(json.dumps(out, ensure_ascii=False) + "\n")

print("✅ RAG용 텍스트 생성 완료 → rag_ready.jsonl")
import json

def build_text(pr):
    lines = []
    lines.append(f"PR 제목: {pr['title']}")
    lines.append(f"병합자: {pr['author']} / 날짜: {pr['date']}")
    lines.append("커밋 요약:")
    for c in pr["commits"]:
        lines.append(f"- {c['author']} ({c['date']}): {c['message']}")
        if c["files"]:
            lines.append(f"  변경 파일: {', '.join(c['files'])}")
        lines.append("  변경사항:")
        lines.append(c["diff"][:1000])  # 너무 길면 자름
    return "\n".join(lines)

with open("pr_dataset.jsonl") as fin, open("rag_ready.jsonl", "w") as fout:
    for line in fin:
        pr = json.loads(line)
        text = build_text(pr)
        out = {
            "pr_id": pr.get("pr_id"),
            "title": pr.get("title"),
            "text": text
        }
        fout.write(json.dumps(out, ensure_ascii=False) + "\n")

print("✅ RAG용 텍스트 생성 완료 → rag_ready.jsonl")
코드 블럭. generate_rag_data_from_pr_dataset.py

embed_prs.py

참고
  • 코드 내 EMBEDDING_API_URL인 AIOS_LLM_Private_Endpoint과 model의 MODEL_IDLLM 이용 가이드를 참고해주세요. 아래의 예시처럼 입력할 수 있습니다.
    • EMBEDDING_API_URL = “{AIOS LLM 프라이빗 엔드포인트}/{API}”
    • “model”: “{모델ID}”
배경색 변경
import json
import requests
import time

EMBEDDING_API_URL = "AIOS_LLM_Private_Endpoint"
HEADERS = {"Content-Type": "application/json"}

def get_embedding(text):
    payload = {
        "model": "MODEL_ID",
        "input": text,
        "stream": False
    }

    try:
        response = requests.post(EMBEDDING_API_URL, headers=HEADERS, json=payload)
        if response.status_code == 200:
            result = response.json()
            return result["data"][0]["embedding"]
        else:
            print(f"❌ Failed with status {response.status_code}: {response.text}")
            return None
    except Exception as e:
        print(f"⚠️ Error calling embedding API: {e}")
        return None

def main():
    with open("rag_ready.jsonl", "r", encoding="utf-8") as fin, \
         open("rag_embedded.jsonl", "w", encoding="utf-8") as fout:

        for i, line in enumerate(fin, start=1):
            try:
                item = json.loads(line)
                text = item.get("text", "").strip()
                if not text:
                    print(f"⚠️ Line {i}: empty text, skipping")
                    continue

                embedding = get_embedding(text)
                if embedding is None:
                    print(f"⚠️ Line {i}: embedding failed, skipping")
                    continue

                item["embedding"] = embedding
                fout.write(json.dumps(item, ensure_ascii=False) + "\n")
                print(f"✅ Line {i}: embedded")

                time.sleep(0.2)  # optional: rate limiting
            except Exception as e:
                print(f"❌ Line {i}: error - {e}")
                continue

if __name__ == "__main__":
    main()
import json
import requests
import time

EMBEDDING_API_URL = "AIOS_LLM_Private_Endpoint"
HEADERS = {"Content-Type": "application/json"}

def get_embedding(text):
    payload = {
        "model": "MODEL_ID",
        "input": text,
        "stream": False
    }

    try:
        response = requests.post(EMBEDDING_API_URL, headers=HEADERS, json=payload)
        if response.status_code == 200:
            result = response.json()
            return result["data"][0]["embedding"]
        else:
            print(f"❌ Failed with status {response.status_code}: {response.text}")
            return None
    except Exception as e:
        print(f"⚠️ Error calling embedding API: {e}")
        return None

def main():
    with open("rag_ready.jsonl", "r", encoding="utf-8") as fin, \
         open("rag_embedded.jsonl", "w", encoding="utf-8") as fout:

        for i, line in enumerate(fin, start=1):
            try:
                item = json.loads(line)
                text = item.get("text", "").strip()
                if not text:
                    print(f"⚠️ Line {i}: empty text, skipping")
                    continue

                embedding = get_embedding(text)
                if embedding is None:
                    print(f"⚠️ Line {i}: embedding failed, skipping")
                    continue

                item["embedding"] = embedding
                fout.write(json.dumps(item, ensure_ascii=False) + "\n")
                print(f"✅ Line {i}: embedded")

                time.sleep(0.2)  # optional: rate limiting
            except Exception as e:
                print(f"❌ Line {i}: error - {e}")
                continue

if __name__ == "__main__":
    main()
코드 블럭. embed_prs.py

OpenSearch에 업로드

벡터 파일을 OpenSearch에 업로드하여 검색 가능한 형태로 구성합니다.

참고
  • 이 튜토리얼에서는 VM 내부에 OpenSearch를 구성하고, http://localhost:9200 주소로 호출합니다. 사용자 벡터 데이터베이스를 사용하는 경우에는 URL을 알맞게 변경해 주세요.
# OpenSearch에 "kubeflow-pr-rag-index"이름의 인덱스 생성
$ curl -X PUT "http://localhost:9200/kubeflow-pr-rag-index" \
  -H "Content-Type: application/json" \
  -d '{
    "settings": {
      "index": {
        "knn": true
      }
    },
    "mappings": {
      "properties": {
        "title": { "type": "text" },
        "text":  { "type": "text" },
        "embedding": {
          "type": "knn_vector",
          "dimension": 1024,
          "method": {
            "name": "hnsw",
            "space_type": "cosinesimil",
            "engine": "nmslib"
          }
        }
      }
    }
  }'
{"acknowledged":true,"shards_acknowledged":true,"index":"kubeflow-pr-rag-index"}

$ python3 upload_rag_documnets.py
✅ Uploaded document pr-1
✅ Uploaded document pr-2
✅ Uploaded document pr-3
✅ Uploaded document pr-4
✅ Uploaded document pr-5
✅ Uploaded document pr-6
✅ Uploaded document pr-7
✅ Uploaded document pr-8
✅ Uploaded document pr-9
✅ Uploaded document pr-10
... (중략) ...

upload_rag_documnets.py

배경색 변경
import json
from opensearchpy import OpenSearch

# OpenSearch 연결 설정
client = OpenSearch(
    hosts=[{"host": "localhost", "port": 9200}],
    use_ssl=False,
    verify_certs=False
)

index_name = "kubeflow-pr-rag-index"

with open("rag_embedded.jsonl", "r", encoding="utf-8") as f:
    for i, line in enumerate(f, 1):
        try:
            doc = json.loads(line)
            title = doc.get("title", "")
            text = doc.get("text", "")
            embedding = doc.get("embedding", [])

            if not embedding or len(embedding) != 1024:
                print(f"⚠️  Line {i}: Invalid embedding length, skipping.")
                continue

            body = {
                "title": title,
                "text": text,
                "embedding": embedding
            }

            doc_id = f"pr-{i}"
            client.index(index=index_name, id=doc_id, body=body)
            print(f"✅ Uploaded document {doc_id}")
        except Exception as e:
            print(f"❌ Line {i}: Failed to upload due to {e}")
import json
from opensearchpy import OpenSearch

# OpenSearch 연결 설정
client = OpenSearch(
    hosts=[{"host": "localhost", "port": 9200}],
    use_ssl=False,
    verify_certs=False
)

index_name = "kubeflow-pr-rag-index"

with open("rag_embedded.jsonl", "r", encoding="utf-8") as f:
    for i, line in enumerate(f, 1):
        try:
            doc = json.loads(line)
            title = doc.get("title", "")
            text = doc.get("text", "")
            embedding = doc.get("embedding", [])

            if not embedding or len(embedding) != 1024:
                print(f"⚠️  Line {i}: Invalid embedding length, skipping.")
                continue

            body = {
                "title": title,
                "text": text,
                "embedding": embedding
            }

            doc_id = f"pr-{i}"
            client.index(index=index_name, id=doc_id, body=body)
            print(f"✅ Uploaded document {doc_id}")
        except Exception as e:
            print(f"❌ Line {i}: Failed to upload due to {e}")
코드 블럭. upload_rag_documnets.py

OpenSearch Dashboards에서 확인

아래 그림과 같이 OpenSearch Dashboard에서 kubeflow-pr-rag-index 에 해당하는 데이터를 확인할 수 있습니다. 데이터는 title, text, embedding으로 구성되어 있습니다.

참고
OpenSearch Dashboard에서 Index Patterns 등록
왼쪽 메뉴 → Dashboards Management → Index patterns → Create index pattern 클릭

그림2

RAG QA Application 구성

사용자의 질의를 임베딩하여 검색 질의로 변환한 뒤, RAG를 활용해 연관 문서를 추출하고, AIOS Chat 모델을 통해 최종 결과를 제공합니다.

참고
  • 이 코드에서는 유사도 검색 방식으로 OpenSearch의 KNN(K-Nearest Neightbors) 검색과 AIOS에서 제공하는 Embedding 모델의 Score API를 호출하여 입력 벡터와 가장 유사한 문서를 계산하는 방식을 지원합니다. 사용자는 두 방식 중 하나를 선택하여 사용할 수 있으며, 이 튜토리얼에서는 AIOS Score API 기반의 유사도 검색 방식을 사용합니다.
    • OpenSearch의 KNN 호출 : docs = search_similar_docs(query_vec, K)
    • AIOS Embedding 모델 호출 : docs = search_similar_docs_with_score(question, K)
  • 코드 내 EMBEDDING_API_URL, LLM_API_URL, SCORE_API_URL, MODEL_EMBEDDING, MODEL_CHAT은 LLM 이용 가이드를 참고하여 사용할 API와 Model로 입력해주세요. 아래의 예시처럼 입력할 수 있습니다.
    • EMBEDDING_API_URL = “{AIOS LLM 프라이빗 엔드포인트}/{API}”
    • MODEL_EMBEDDING = “{모델ID}”

app.py

배경색 변경
import streamlit as st
import requests
from opensearchpy import OpenSearch

# 설정
def get_opensearch_client():
    return OpenSearch(
        hosts=[{"host": "localhost", "port": 9200}],
        use_ssl=False,
        verify_certs=False
    )

EMBEDDING_API_URL = "YOUR_EMBEDDING_API_URL"
LLM_API_URL = "YOUR_LLM_API_URL"
SCORE_API_URL = "YOUR_SCORE_API_URL"
MODEL_EMBEDDING = "YOUR_MODEL_EMBEDDING"
MODEL_CHAT = "YOUR_MODEL_CHAT"
INDEX_NAME = "kubeflow-pr-rag-index"
VECTOR_DIM = 1024
K = 3

# 임베딩 생성 함수
def embed_text(text):
    res = requests.post(
        EMBEDDING_API_URL,
        headers={"Content-Type": "application/json"},
        json={"model": MODEL_EMBEDDING, "input": text, "stream": False}
    )
    return res.json()["data"][0]["embedding"]

# 모든 문서 불러오기 (OpenSearch)
def fetch_all_docs():
    client = get_opensearch_client()
    res = client.search(
        index=INDEX_NAME,
        body={
            "size": 1000,  # 필요한 만큼 설정 (작을 경우 스크롤 API 활용 가능)
            "query": {"match_all": {}}
        }
    )
    return [doc["_source"] for doc in res["hits"]["hits"]]

# 두 문장 리스트를 받아 유사도 점수 계산
def score_text_pairs(text_1, text_2):
    payload = {
        "model": MODEL_EMBEDDING,
        "encoding_format": "float",
        "text_1": text_1,
        "text_2": text_2
    }
    headers = {
        "accept": "application/json",
        "Content-Type": "application/json"
    }

    response = requests.post(SCORE_API_URL, headers=headers, json=payload)
    response.raise_for_status()

    # 유사도 score만 추출
    scores = [item["score"] for item in response.json()["data"]]
    return scores

# 유사 문서 선택 (점수 기반 Top-K)
def search_similar_docs_with_score(query, k):
    all_docs = fetch_all_docs()
    doc_texts = [doc["text"] for doc in all_docs]
    queries = [query] * len(doc_texts)
    scores = score_text_pairs(queries, doc_texts)

    # 점수 높은 순으로 정렬
    scored_docs = sorted(zip(all_docs, scores), key=lambda x: x[1], reverse=True)
    top_docs = [doc for doc, score in scored_docs[:k]]
    return top_docs

# KNN 검색 함수
def search_similar_docs(query_vector, k):
    client = get_opensearch_client()
    res = client.search(
        index=INDEX_NAME,
        body={
            "size": k,
            "query": {
                "knn": {
                    "embedding": {
                        "vector": query_vector,
                        "k": k
                    }
                }
            }
        }
    )
    return [doc["_source"] for doc in res["hits"]["hits"]]

# 프롬프트 구성
def build_prompt(docs, question):
    context_blocks = []
    for i, doc in enumerate(docs):
        context_blocks.append(f"[문서 {i+1}]\n{doc['text']}")
    context = "\n\n".join(context_blocks)
    return f"""다음은 Kubeflow 프로젝트에서 유사한 PR 문서들입니다:

{context}

사용자 질문: {question}

위 내용을 참고하여 질문에 대해 자연어로 답변해 주세요. 가능한 문서 번호를 인용해서 설명해주세요."""

# LLM 호출 함수
def call_llm(prompt):
    res = requests.post(
        LLM_API_URL,
        headers={"Content-Type": "application/json"},
        json={
            "model": MODEL_CHAT,
            "messages": [{"role": "user", "content": prompt}],
            "stream": False
        }
    )
    return res.json()["choices"][0]["message"]["content"]

# Streamlit UI 시작
st.set_page_config(page_title="RAG QA", layout="wide")
st.title("📘 RAG-based PR Summary Chatbot")

question = st.text_input("Enter your question:", "Please summarize the PR the Add Kubeflow 1.9 release roadmap.")

if st.button("Searching and generating response"):
    with st.spinner("Generating embeddings..."):
        query_vec = embed_text(question)

    with st.spinner("Searching for similar documents in OpenSearch..."):
        #docs = search_similar_docs(query_vec, K)
        docs = search_similar_docs_with_score(question, K)

    with st.spinner("Constructing prompt and invoking LLM..."):
        prompt = build_prompt(docs, question)
        answer = call_llm(prompt)

    st.markdown("### 🤖 LLM response")
    st.write(answer)

    st.markdown("---")
    st.markdown("### 🔍 Highlighted PR document")
    for i, doc in enumerate(docs):
        with st.expander(f"문서 {i+1}: {doc['title']}"):
            # 간단한 질문 키워드 하이라이트 
            highlighted = doc['text'].replace(question.split()[0], f"**{question.split()[0]}**")
            st.markdown(highlighted)
import streamlit as st
import requests
from opensearchpy import OpenSearch

# 설정
def get_opensearch_client():
    return OpenSearch(
        hosts=[{"host": "localhost", "port": 9200}],
        use_ssl=False,
        verify_certs=False
    )

EMBEDDING_API_URL = "YOUR_EMBEDDING_API_URL"
LLM_API_URL = "YOUR_LLM_API_URL"
SCORE_API_URL = "YOUR_SCORE_API_URL"
MODEL_EMBEDDING = "YOUR_MODEL_EMBEDDING"
MODEL_CHAT = "YOUR_MODEL_CHAT"
INDEX_NAME = "kubeflow-pr-rag-index"
VECTOR_DIM = 1024
K = 3

# 임베딩 생성 함수
def embed_text(text):
    res = requests.post(
        EMBEDDING_API_URL,
        headers={"Content-Type": "application/json"},
        json={"model": MODEL_EMBEDDING, "input": text, "stream": False}
    )
    return res.json()["data"][0]["embedding"]

# 모든 문서 불러오기 (OpenSearch)
def fetch_all_docs():
    client = get_opensearch_client()
    res = client.search(
        index=INDEX_NAME,
        body={
            "size": 1000,  # 필요한 만큼 설정 (작을 경우 스크롤 API 활용 가능)
            "query": {"match_all": {}}
        }
    )
    return [doc["_source"] for doc in res["hits"]["hits"]]

# 두 문장 리스트를 받아 유사도 점수 계산
def score_text_pairs(text_1, text_2):
    payload = {
        "model": MODEL_EMBEDDING,
        "encoding_format": "float",
        "text_1": text_1,
        "text_2": text_2
    }
    headers = {
        "accept": "application/json",
        "Content-Type": "application/json"
    }

    response = requests.post(SCORE_API_URL, headers=headers, json=payload)
    response.raise_for_status()

    # 유사도 score만 추출
    scores = [item["score"] for item in response.json()["data"]]
    return scores

# 유사 문서 선택 (점수 기반 Top-K)
def search_similar_docs_with_score(query, k):
    all_docs = fetch_all_docs()
    doc_texts = [doc["text"] for doc in all_docs]
    queries = [query] * len(doc_texts)
    scores = score_text_pairs(queries, doc_texts)

    # 점수 높은 순으로 정렬
    scored_docs = sorted(zip(all_docs, scores), key=lambda x: x[1], reverse=True)
    top_docs = [doc for doc, score in scored_docs[:k]]
    return top_docs

# KNN 검색 함수
def search_similar_docs(query_vector, k):
    client = get_opensearch_client()
    res = client.search(
        index=INDEX_NAME,
        body={
            "size": k,
            "query": {
                "knn": {
                    "embedding": {
                        "vector": query_vector,
                        "k": k
                    }
                }
            }
        }
    )
    return [doc["_source"] for doc in res["hits"]["hits"]]

# 프롬프트 구성
def build_prompt(docs, question):
    context_blocks = []
    for i, doc in enumerate(docs):
        context_blocks.append(f"[문서 {i+1}]\n{doc['text']}")
    context = "\n\n".join(context_blocks)
    return f"""다음은 Kubeflow 프로젝트에서 유사한 PR 문서들입니다:

{context}

사용자 질문: {question}

위 내용을 참고하여 질문에 대해 자연어로 답변해 주세요. 가능한 문서 번호를 인용해서 설명해주세요."""

# LLM 호출 함수
def call_llm(prompt):
    res = requests.post(
        LLM_API_URL,
        headers={"Content-Type": "application/json"},
        json={
            "model": MODEL_CHAT,
            "messages": [{"role": "user", "content": prompt}],
            "stream": False
        }
    )
    return res.json()["choices"][0]["message"]["content"]

# Streamlit UI 시작
st.set_page_config(page_title="RAG QA", layout="wide")
st.title("📘 RAG-based PR Summary Chatbot")

question = st.text_input("Enter your question:", "Please summarize the PR the Add Kubeflow 1.9 release roadmap.")

if st.button("Searching and generating response"):
    with st.spinner("Generating embeddings..."):
        query_vec = embed_text(question)

    with st.spinner("Searching for similar documents in OpenSearch..."):
        #docs = search_similar_docs(query_vec, K)
        docs = search_similar_docs_with_score(question, K)

    with st.spinner("Constructing prompt and invoking LLM..."):
        prompt = build_prompt(docs, question)
        answer = call_llm(prompt)

    st.markdown("### 🤖 LLM response")
    st.write(answer)

    st.markdown("---")
    st.markdown("### 🔍 Highlighted PR document")
    for i, doc in enumerate(docs):
        with st.expander(f"문서 {i+1}: {doc['title']}"):
            # 간단한 질문 키워드 하이라이트 
            highlighted = doc['text'].replace(question.split()[0], f"**{question.split()[0]}**")
            st.markdown(highlighted)
코드 블럭. app.py

RAG QA Chatbot UI 사용 방법

호출 코드 실행

  1. VM에서 Streamlit 실행
    배경색 변경
    streamlit run app.py --server.port 8501 --server.address 0.0.0.0
    streamlit run app.py --server.port 8501 --server.address 0.0.0.0
    코드 블럭. Streamlit 실행
You can now view your Streamlit app in your browser.
 
URL: http://0.0.0.0:8501

브라우저에서 http://{your_server_ip}:8501 또는 서버 SSH 터널링 설정 후 http://0.0.0.0:8501 로 접속합니다. SSH 터널링은 아래를 참고하세요.

2. 로컬PC에서 터널링으로 VM접속 (http://0.0.0.0:8501 로 접속하는 경우)

배경색 변경
ssh -i {your_pemkey.pem} -L 8501:localhost:8501 ubuntu@{your_server_ip}
ssh -i {your_pemkey.pem} -L 8501:localhost:8501 ubuntu@{your_server_ip}
코드 블럭. 로컬PC에서 터널링

사용 예시

Kubeflow 프로젝트 Git에서 Add Kubeflow 1.9 release roadmap PR 에 대한 요약을 질문합니다.

그림3

Kubeflow 프로젝트의 해당 PR에 대한 정보입니다.

그림4

마무리

이번 튜토리얼에서는 AIOS에서 제공하는 AI 모델을 활용하여 GIT PR 관련 데이터를 벡터화하고, OpenSearch 기반의 벡터 검색 및 LLM 응답을 조합하여 PR 리뷰 보조 챗봇을 구현해 보았습니다.이를 통해 과거 PR 히스토리에 기반한 질의응답이 가능해져, 개발자의 코드 리뷰 효율성과 품질을 향상시킬 수 있습니다. 본 시스템은 다음과 같은 방식으로 사용자 환경에 맞게 확장 및 커스터마이징할 수 있습니다.

  • 벡터 데이터베이스 교체 : OpenSearch 외에 SCP Search Engine 상품 활용, 사용자 벡터 데이터베이스를 연동할 수 있습니다.
  • 실시간 데이터 수집 연동 : Github Webhook 또는 Gitlab API 연동을 통해 실시간 PR 생성/업데이트 정보를 수집하고 자동 인덱싱 가능합니다.
  • 대화형 UI 고도화: Streamlit 외에도 Slack Bot, 사내 메신저 등 다양한 인터페이스로 확장 가능합니다.

이번 튜토리얼을 기반으로 실제 서비스 목적에 따라 적합한 AIOS 기반 협업 도우미를 직접 구축해 보시길 바랍니다.

참고 링크

https://opensearch.org/
https://github.com/kubeflow/kubeflow

Chat Playground
Autogen