はじめに
「新しいPDFをアップしたのに、答えが前のPDFの内容っぽい…」 初めてRAGを作ると、ここでつまずきやすいです。原因はシンプル。“どのPDFのベクトルなのか”を見分けるためのIDが無いので、ベクトルDB(Qdrant)が過去のPDFも一緒に検索してしまうんです。
今日は、図でイメージをつかんでから、直し方をやさしく実装していきます。
まずはイメージで理解しよう
問題のある状態(IDなし)
┌──────────────── ベクトルDB(my_pdfs) ────────────────┐
│ [chunk] 「PDF_A の p1 の文」 ← どのPDFか分からない │
│ [chunk] 「PDF_A の p2 の文」 ← どのPDFか分からない │
│ [chunk] 「PDF_B の p1 の文」 ← どのPDFか分からない │
│ [chunk] 「PDF_B の p3 の文」 ← どのPDFか分からない │
│ … │
└──────────────────────────────────────────────────────┘
あなたの質問 →(Embedding)→ ベクトル検索 → 近いもの全部(AもBも)取ってくる
↑
ここで“前のPDFの断片”が混ざる
IDが無いので、同じコレクションに溜まった全PDFのチャンクが、仲良くごちゃ混ぜでヒットします。 その結果、新しいPDFを読んだつもりが、古いPDFの内容で答えてしまうことが起きます。
解決したい状態(doc_idで絞り込み)
┌──────────────── ベクトルDB(my_pdfs) ────────────────┐
│ [chunk] 「PDF_A の p1」 doc_id=A │
│ [chunk] 「PDF_A の p2」 doc_id=A │
│ [chunk] 「PDF_B の p1」 doc_id=B │
│ [chunk] 「PDF_B の p3」 doc_id=B │
│ … │
└──────────────────────────────────────────────────────┘
あなたの質問 →(Embedding)→ ベクトル検索(doc_id = A でフィルタ)
↓
A のチャンクだけが候補になる(Bは除外)
ポイントはひとつ。 アップロードしたそのPDFだけに付くID(doc_id)を作って、
- 保存するときに
payload
にdoc_id
を入れる - 検索するときに
doc_id
でフィルタしてそのPDFだけを対象にする
これで過去のPDFは混ざらなくなります。
どうやってIDを作るの?
難しくありません。アップロードされたPDFのバイト列からハッシュ値を作るのが簡単で堅実です。 同じファイルなら同じID、別のファイルなら違うIDになります。
import hashlib, io
from PyPDF2 import PdfReader
# アップロード直後に
pdf_bytes = uploaded_file.getvalue()
doc_id = hashlib.sha1(pdf_bytes).hexdigest() # ← 一意なIDを生成
st.session_state["doc_id"] = doc_id # セッションに保持
st.session_state["doc_name"] = uploaded_file.name
# BytesIOに包んで安全に読む(PyPDF2の安定対策)
reader = PdfReader(io.BytesIO(pdf_bytes))
セッションに入れておくと、この後の保存処理や検索処理で同じ
doc_id
を使い回せます。
保存時に doc_id を一緒に入れよう
チャンクをQdrantへ保存するとき、payloadに doc_id
を入れるだけです。
from qdrant_client.models import PointStruct
points = []
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=emb,
payload={
"text": chunk,
"chunk_index": i,
"doc_id": st.session_state["doc_id"], # ← ここが大事!
"doc_name": st.session_state["doc_name"], # (おまけ:表示用)
}
))
client.upsert(collection_name=COLLECTION_NAME, points=points)
検索時は doc_id で絞り込み
Qdrantのフィルタ機能を使って、いまのPDFの doc_id
だけを検索対象にします。
from qdrant_client.models import Filter, FieldCondition, MatchValue
doc_id = st.session_state.get("doc_id")
qfilter = Filter(must=[FieldCondition(key="doc_id", match=MatchValue(value=doc_id))])
results = client.search(
collection_name=COLLECTION_NAME,
query_vector=query_embedding[0],
query_filter=qfilter, # ← doc_id で絞る!
limit=top_k,
)
この1行(query_filter=qfilter
)が効きます。
「前に入れたPDFの断片」を機械的に除外できるようになります。
ここまでの流れをもう一度、図で
[1] PDFをアップロード
└─ bytes → sha1 → doc_id を作る(例: "a1b2c3..." )
└─ st.session_state に doc_id / doc_name を保存
[2] 文章をチャンク化 → Embedding 作成
└─ Qdrantに保存(payload に doc_id を入れる)
[3] 質問が来た
└─ 質問も Embedding 化
└─ Qdrant 検索(query_filter: doc_id == "a1b2c3...")
└─ このPDFのチャンクだけが返る
一度仕組みができてしまえば、毎回同じ手順で安心して使えます。
実際に試すときのチェックポイント
- 新しいPDFをアップした直後に、画面のどこかで ファイル名が出ているか確認
(
doc_name
をpayloadに入れておくと、デバッグ時に分かりやすいです) - 質問して出てくる参照チャンク(根拠)が、さっきのPDFの内容になっているか
もし違うなら、フィルタが効いていない可能性が高いです(
query_filter
を再確認) - 古いデータを全消ししたくなることもあります。開発中は潔く消してもOK
client.delete_collection(COLLECTION_NAME) # 作り直す派の最終手段
よくある疑問にひとこと
- 「同じPDFをもう一回アップしたら?」
中身が同じなら
doc_id
も同じになります。重複を避けたいときは、アップロード前に同じdoc_id
が存在するかを見てスキップする、という工夫もできます。 - 「PDFをちょっとだけ直したら?」
バイト列が変わるので
doc_id
も変わります。別バージョンとして扱われます。 - 「doc_id じゃなくて、コレクションを分けるのは?」 それも手ですが、コレクションが増えすぎると管理が大変になります。まずはdoc_idフィルタをおすすめします。
まとめ
“前のPDFの内容が混ざる”問題は、IDがないことが原因でした。 doc_idを付けて保存し、検索時にdoc_idで絞り込むだけで、いま読んだPDFだけを根拠に答えられるようになります。
難しいことはしていません。正体(doc_id)を名札のように付ける。 それだけで、RAGはグッと頼れる子になります。
差分
最後に差分を載せておきます!
--- a/app.py
+++ b/app.py
@@
-import os
+import os
+import io
+import hashlib
import uuid
import streamlit as st
from PyPDF2 import PdfReader
from openai import OpenAI
from qdrant_client import QdrantClient
-from qdrant_client.models import Distance, VectorParams, PointStruct
+from qdrant_client.models import (
+ Distance, VectorParams, PointStruct,
+ Filter, FieldCondition, MatchValue
+)
import tiktoken
@@
-def get_pdf_text():
- """PDFをアップロードして全ページのテキストを結合して返す"""
+def get_pdf_text():
+ """PDFをアップロード → doc_id算出 → 全ページのテキストを結合して返す"""
uploaded_file = st.file_uploader(
label="PDFファイルを選んでください",
type="pdf",
help="テキストが含まれるPDFファイルをアップロードしてください",
)
if not uploaded_file:
return None
try:
- reader = PdfReader(uploaded_file)
+ # バイト列から doc_id を生成してセッションに保持
+ pdf_bytes = uploaded_file.getvalue()
+ doc_id = hashlib.sha1(pdf_bytes).hexdigest()
+ st.session_state["doc_id"] = doc_id
+ st.session_state["doc_name"] = uploaded_file.name
+
+ # BytesIO 経由で安全に読み込み
+ reader = PdfReader(io.BytesIO(pdf_bytes))
text = ""
total_pages = len(reader.pages)
bar = st.progress(0)
for i, page in enumerate(reader.pages):
- page_text = page.extract_text()
+ page_text = page.extract_text() or ""
text += f"\n[ページ {i+1}]\n{page_text}\n"
bar.progress((i + 1) / total_pages)
bar.empty()
@@
-def store_embeddings(chunks, embeddings):
- """作成したEmbeddingをQdrantに保存"""
+def store_embeddings(chunks, embeddings):
+ """作成したEmbeddingをQdrantに保存(doc_id 付き)"""
client = init_qdrant()
if not client:
return False
- try:
+ # 現在のPDFの doc_id / doc_name を取得
+ doc_id = st.session_state.get("doc_id")
+ doc_name = st.session_state.get("doc_name", "unknown.pdf")
+ if not doc_id:
+ st.error("doc_id が見つかりません。PDFをアップロードし直してください。")
+ return False
+ try:
points = []
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
points.append(
PointStruct(
id=str(uuid.uuid4()),
vector=emb,
- payload={"text": chunk, "chunk_index": i},
+ payload={
+ "text": chunk,
+ "chunk_index": i,
+ "doc_id": doc_id, # ← 追加:どのPDFかを識別
+ "doc_name": doc_name, # ← 追加:任意(表示用)
+ },
)
)
client.upsert(collection_name=COLLECTION_NAME, points=points)
return True
except Exception as e:
st.error(f"保存エラー: {e}")
return False
@@
-def search_similar_chunks(query, top_k=3):
- """クエリをEmbedding化し、類似チャンクを検索(R)"""
+def search_similar_chunks(query, top_k=3):
+ """クエリをEmbedding化し、現在のPDF(doc_id)だけから類似チャンクを検索(R)"""
client = init_qdrant()
if not client:
return []
+ # 現在のPDFの doc_id を確認(なければ検索しない)
+ doc_id = st.session_state.get("doc_id")
+ if not doc_id:
+ st.warning("doc_id が未設定です。PDFをアップロードしてから検索してください。")
+ return []
+
query_embedding = create_embeddings([query])
if not query_embedding:
return []
try:
- results = client.search(
+ # doc_id で絞り込むフィルタを追加
+ qfilter = Filter(must=[FieldCondition(key="doc_id", match=MatchValue(value=doc_id))])
+ results = client.search(
collection_name=COLLECTION_NAME,
query_vector=query_embedding[0],
+ query_filter=qfilter, # ← 追加:過去PDFを除外
limit=top_k,
)
out = []
for r in results:
out.append({"text": r.payload["text"], "score": r.score})
return out
except Exception as e:
st.error(f"検索エラー: {e}")
return []