すべてを組み合わせた完成版コード
さあ、いよいよすべてのパーツを組み合わせて、完成版を作りましょう!今まで作ってきた機能を全部合体させます。
"""
PDFに質問しよう - 前編(2025年版)
LangChainを使わないシンプルな実装
必要なライブラリ:
pip install streamlit PyPDF2 openai qdrant-client tiktoken
"""
import streamlit as st
import os
from PyPDF2 import PdfReader
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import tiktoken
import uuid
# ========== 設定 ==========
QDRANT_PATH = "./local_qdrant"
COLLECTION_NAME = "my_pdfs"
EMBEDDING_MODEL = "text-embedding-3-small"
CHUNK_SIZE = 500 # トークン数
CHUNK_OVERLAP = 50 # オーバーラップ
# ========== ページ設定 ==========
st.set_page_config(
page_title="PDFに質問しよう",
page_icon="📄",
layout="wide"
)
# ========== ヘルパー関数 ==========
def get_pdf_text():
"""PDFをアップロードしてテキストを取得"""
uploaded_file = st.file_uploader(
label='PDFファイルを選んでください😇',
type='pdf',
help='テキストが含まれるPDFファイルをアップロードしてください'
)
if uploaded_file:
try:
# PDFを読み込み
pdf_reader = PdfReader(uploaded_file)
# 全ページのテキストを結合
text = ''
total_pages = len(pdf_reader.pages)
# プログレスバーを表示
progress_bar = st.progress(0)
for i, page in enumerate(pdf_reader.pages):
page_text = page.extract_text()
text += f'\n[ページ {i+1}]\n{page_text}\n'
progress_bar.progress((i + 1) / total_pages)
progress_bar.empty() # プログレスバーを消す
# 情報を表示
st.success(f"✅ PDFを読み込みました!")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("📄 ページ数", f"{total_pages}ページ")
with col2:
st.metric("📝 文字数", f"{len(text):,}文字")
with col3:
st.metric("📦 ファイルサイズ", f"{uploaded_file.size / 1024:.1f}KB")
# プレビュー
with st.expander("📖 テキストのプレビュー(最初の1000文字)"):
st.text(text[:1000] + "..." if len(text) > 1000 else text)
return text
except Exception as e:
st.error(f"PDFの読み込みに失敗しました: {e}")
return None
return None
def split_text_by_tokens(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP):
"""テキストをトークン数ベースで分割"""
if not text:
return []
# GPT-4のトークナイザーを使用
encoding = tiktoken.encoding_for_model("gpt-4")
tokens = encoding.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = encoding.decode(chunk_tokens)
# 空のチャンクは除外
if chunk_text.strip():
chunks.append(chunk_text)
# オーバーラップを考慮して次の開始位置を設定
start = end - chunk_overlap if end < len(tokens) else end
return chunks
def create_embeddings(texts, api_key):
"""テキストをEmbeddingに変換"""
if not texts or not api_key:
return None
try:
client = OpenAI(api_key=api_key)
# バッチ処理(一度に最大20個まで)
all_embeddings = []
batch_size = 20
progress_bar = st.progress(0)
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = client.embeddings.create(
model=EMBEDDING_MODEL,
input=batch
)
batch_embeddings = [item.embedding for item in response.data]
all_embeddings.extend(batch_embeddings)
# プログレスバー更新
progress = min((i + batch_size) / len(texts), 1.0)
progress_bar.progress(progress)
progress_bar.empty()
return all_embeddings
except Exception as e:
st.error(f"Embedding作成エラー: {e}")
return None
def init_qdrant():
"""Qdrantクライアントを初期化"""
try:
client = QdrantClient(path=QDRANT_PATH)
collections = client.get_collections().collections
collection_names = [c.name for c in collections]
if COLLECTION_NAME not in collection_names:
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=VectorParams(
size=1536,
distance=Distance.COSINE
)
)
st.info("📦 新しいコレクションを作成しました")
return client
except Exception as e:
st.error(f"Qdrant初期化エラー: {e}")
return None
def store_embeddings(chunks, embeddings):
"""EmbeddingをベクトルDBに保存"""
client = init_qdrant()
if not client:
return False
try:
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point = PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={
"text": chunk,
"chunk_index": i
}
)
points.append(point)
# バッチで保存
client.upsert(
collection_name=COLLECTION_NAME,
points=points
)
return True
except Exception as e:
st.error(f"保存エラー: {e}")
return False
def search_similar_chunks(query, api_key, top_k=3):
"""類似チャンクを検索(次章で使用)"""
client = init_qdrant()
if not client:
return []
# クエリをEmbeddingに変換
query_embedding = create_embeddings([query], api_key)
if not query_embedding:
return []
try:
results = client.search(
collection_name=COLLECTION_NAME,
query_vector=query_embedding[0],
limit=top_k
)
similar_chunks = []
for result in results:
similar_chunks.append({
'text': result.payload['text'],
'score': result.score
})
return similar_chunks
except Exception as e:
st.error(f"検索エラー: {e}")
return []
# ========== ページ実装 ==========
def page_pdf_upload():
"""PDFアップロードページ"""
st.title("📄 PDF Upload")
st.markdown("""
### 🎯 このページでできること
PDFファイルをアップロードして、AIが理解できる形に変換します。
変換が完了したら、「Ask My PDF(s)」ページで質問できるようになります!
""")
# APIキーの入力
with st.sidebar:
st.header("⚙️ 設定")
api_key = st.text_input(
"OpenAI APIキー",
type="password",
placeholder="sk-...",
help="https://platform.openai.com で取得できます"
)
if api_key:
os.environ["OPENAI_API_KEY"] = api_key
st.success("✅ APIキー設定完了")
else:
st.warning("⚠️ APIキーを入力してください")
st.markdown("---")
st.markdown("""
### 📖 処理の流れ
1. PDFをアップロード
2. テキストを抽出
3. チャンクに分割
4. Embeddingを作成
5. ベクトルDBに保存
""")
# メインコンテンツ
st.markdown("---")
# PDFアップロード
pdf_text = get_pdf_text()
if pdf_text and api_key:
st.markdown("---")
# 処理開始ボタン
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
if st.button("🚀 処理を開始", type="primary", use_container_width=True):
# ステップ1: チャンク分割
with st.status("✂️ テキストを分割中...", expanded=True) as status:
chunks = split_text_by_tokens(pdf_text)
st.write(f"📊 {len(chunks)}個のチャンクに分割しました")
# サンプル表示
with st.expander("チャンクのサンプル(最初の3個)"):
for i, chunk in enumerate(chunks[:3], 1):
st.text_area(
f"チャンク {i}",
chunk[:300] + "..." if len(chunk) > 300 else chunk,
height=100,
disabled=True
)
status.update(label="✅ 分割完了", state="complete")
# ステップ2: Embedding作成
with st.status("🔢 Embeddingを作成中...", expanded=True) as status:
embeddings = create_embeddings(chunks, api_key)
if embeddings:
st.write(f"📊 {len(embeddings)}個のEmbeddingを作成しました")
st.write(f"📐 各Embeddingは{len(embeddings[0])}次元のベクトルです")
status.update(label="✅ Embedding作成完了", state="complete")
else:
status.update(label="❌ Embedding作成失敗", state="error")
st.stop()
# ステップ3: DB保存
with st.status("💾 データベースに保存中...", expanded=True) as status:
success = store_embeddings(chunks, embeddings)
if success:
status.update(label="✅ 保存完了!", state="complete")
st.balloons()
st.success("""
🎉 **処理が完了しました!**
PDFの内容がデータベースに保存されました。
「Ask My PDF(s)」ページで質問してみましょう!
""")
else:
status.update(label="❌ 保存失敗", state="error")
elif pdf_text and not api_key:
st.warning("⚠️ サイドバーでAPIキーを入力してください")
def page_ask_my_pdf():
"""質問ページ(簡易版)"""
st.title("🤔 Ask My PDF(s)")
st.markdown("""
### 🎯 このページでできること
アップロードしたPDFの内容について質問できます!
(完全版は次の章で実装します)
""")
# APIキーの確認
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
st.warning("⚠️ まずPDF Uploadページでセットアップを完了してください")
return
st.markdown("---")
# 検索テスト(デモ)
st.subheader("🔍 類似検索テスト")
st.write("PDFの内容から関連する部分を検索します")
query = st.text_input(
"検索したい内容を入力してください",
placeholder="例:売上について教えて"
)
if query:
if st.button("🔍 検索", type="primary"):
with st.spinner("検索中..."):
results = search_similar_chunks(query, api_key, top_k=3)
if results:
st.success(f"✅ {len(results)}件の関連チャンクが見つかりました")
for i, result in enumerate(results, 1):
with st.expander(f"📄 結果 {i} (類似度: {result['score']:.3f})"):
st.write(result['text'])
else:
st.warning("関連するチャンクが見つかりませんでした")
st.info("PDFをアップロードしてから検索してください")
# ========== メイン処理 ==========
def main():
"""メインアプリケーション"""
# ページ選択
selection = st.sidebar.radio(
"ページを選択",
["PDF Upload", "Ask My PDF(s)"],
help="まずPDF Uploadから始めてください"
)
# フッター
st.sidebar.markdown("---")
st.sidebar.markdown("""
<div style='text-align: center; color: gray; font-size: 12px;'>
Made with ❤️ using Streamlit<br>
2025年版 - LangChain不使用
</div>
""", unsafe_allow_html=True)
# ページ表示
if selection == "PDF Upload":
page_pdf_upload()
elif selection == "Ask My PDF(s)":
page_ask_my_pdf()
if __name__ == "__main__":
main()