PDFに質問しよう7 ~ 完成版コード | SkillhubAI(スキルハブエーアイ)

PDFに質問しよう7 ~ 完成版コード

すべてを組み合わせた完成版コード

さあ、いよいよすべてのパーツを組み合わせて、完成版を作りましょう!今まで作ってきた機能を全部合体させます。

"""
PDFに質問しよう - 前編(2025年版)
LangChainを使わないシンプルな実装

必要なライブラリ:
pip install streamlit PyPDF2 openai qdrant-client tiktoken
"""

import streamlit as st
import os
from PyPDF2 import PdfReader
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import tiktoken
import uuid

# ========== 設定 ==========
QDRANT_PATH = "./local_qdrant"
COLLECTION_NAME = "my_pdfs"
EMBEDDING_MODEL = "text-embedding-3-small"
CHUNK_SIZE = 500  # トークン数
CHUNK_OVERLAP = 50  # オーバーラップ

# ========== ページ設定 ==========
st.set_page_config(
    page_title="PDFに質問しよう",
    page_icon="📄",
    layout="wide"
)

# ========== ヘルパー関数 ==========

def get_pdf_text():
    """PDFをアップロードしてテキストを取得"""
    uploaded_file = st.file_uploader(
        label='PDFファイルを選んでください😇',
        type='pdf',
        help='テキストが含まれるPDFファイルをアップロードしてください'
    )

    if uploaded_file:
        try:
            # PDFを読み込み
            pdf_reader = PdfReader(uploaded_file)

            # 全ページのテキストを結合
            text = ''
            total_pages = len(pdf_reader.pages)

            # プログレスバーを表示
            progress_bar = st.progress(0)
            for i, page in enumerate(pdf_reader.pages):
                page_text = page.extract_text()
                text += f'\n[ページ {i+1}]\n{page_text}\n'
                progress_bar.progress((i + 1) / total_pages)

            progress_bar.empty()  # プログレスバーを消す

            # 情報を表示
            st.success(f"✅ PDFを読み込みました!")
            col1, col2, col3 = st.columns(3)
            with col1:
                st.metric("📄 ページ数", f"{total_pages}ページ")
            with col2:
                st.metric("📝 文字数", f"{len(text):,}文字")
            with col3:
                st.metric("📦 ファイルサイズ", f"{uploaded_file.size / 1024:.1f}KB")

            # プレビュー
            with st.expander("📖 テキストのプレビュー(最初の1000文字)"):
                st.text(text[:1000] + "..." if len(text) > 1000 else text)

            return text

        except Exception as e:
            st.error(f"PDFの読み込みに失敗しました: {e}")
            return None

    return None

def split_text_by_tokens(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP):
    """テキストをトークン数ベースで分割"""
    if not text:
        return []

    # GPT-4のトークナイザーを使用
    encoding = tiktoken.encoding_for_model("gpt-4")
    tokens = encoding.encode(text)

    chunks = []
    start = 0

    while start < len(tokens):
        end = min(start + chunk_size, len(tokens))
        chunk_tokens = tokens[start:end]
        chunk_text = encoding.decode(chunk_tokens)

        # 空のチャンクは除外
        if chunk_text.strip():
            chunks.append(chunk_text)

        # オーバーラップを考慮して次の開始位置を設定
        start = end - chunk_overlap if end < len(tokens) else end

    return chunks

def create_embeddings(texts, api_key):
    """テキストをEmbeddingに変換"""
    if not texts or not api_key:
        return None

    try:
        client = OpenAI(api_key=api_key)

        # バッチ処理(一度に最大20個まで)
        all_embeddings = []
        batch_size = 20

        progress_bar = st.progress(0)
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]

            response = client.embeddings.create(
                model=EMBEDDING_MODEL,
                input=batch
            )

            batch_embeddings = [item.embedding for item in response.data]
            all_embeddings.extend(batch_embeddings)

            # プログレスバー更新
            progress = min((i + batch_size) / len(texts), 1.0)
            progress_bar.progress(progress)

        progress_bar.empty()
        return all_embeddings

    except Exception as e:
        st.error(f"Embedding作成エラー: {e}")
        return None

def init_qdrant():
    """Qdrantクライアントを初期化"""
    try:
        client = QdrantClient(path=QDRANT_PATH)

        collections = client.get_collections().collections
        collection_names = [c.name for c in collections]

        if COLLECTION_NAME not in collection_names:
            client.create_collection(
                collection_name=COLLECTION_NAME,
                vectors_config=VectorParams(
                    size=1536,
                    distance=Distance.COSINE
                )
            )
            st.info("📦 新しいコレクションを作成しました")

        return client

    except Exception as e:
        st.error(f"Qdrant初期化エラー: {e}")
        return None

def store_embeddings(chunks, embeddings):
    """EmbeddingをベクトルDBに保存"""
    client = init_qdrant()
    if not client:
        return False

    try:
        points = []
        for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
            point = PointStruct(
                id=str(uuid.uuid4()),
                vector=embedding,
                payload={
                    "text": chunk,
                    "chunk_index": i
                }
            )
            points.append(point)

        # バッチで保存
        client.upsert(
            collection_name=COLLECTION_NAME,
            points=points
        )

        return True

    except Exception as e:
        st.error(f"保存エラー: {e}")
        return False

def search_similar_chunks(query, api_key, top_k=3):
    """類似チャンクを検索(次章で使用)"""
    client = init_qdrant()
    if not client:
        return []

    # クエリをEmbeddingに変換
    query_embedding = create_embeddings([query], api_key)
    if not query_embedding:
        return []

    try:
        results = client.search(
            collection_name=COLLECTION_NAME,
            query_vector=query_embedding[0],
            limit=top_k
        )

        similar_chunks = []
        for result in results:
            similar_chunks.append({
                'text': result.payload['text'],
                'score': result.score
            })

        return similar_chunks

    except Exception as e:
        st.error(f"検索エラー: {e}")
        return []

# ========== ページ実装 ==========

def page_pdf_upload():
    """PDFアップロードページ"""
    st.title("📄 PDF Upload")
    st.markdown("""
    ### 🎯 このページでできること
    PDFファイルをアップロードして、AIが理解できる形に変換します。
    変換が完了したら、「Ask My PDF(s)」ページで質問できるようになります!
    """)

    # APIキーの入力
    with st.sidebar:
        st.header("⚙️ 設定")
        api_key = st.text_input(
            "OpenAI APIキー",
            type="password",
            placeholder="sk-...",
            help="https://platform.openai.com で取得できます"
        )

        if api_key:
            os.environ["OPENAI_API_KEY"] = api_key
            st.success("✅ APIキー設定完了")
        else:
            st.warning("⚠️ APIキーを入力してください")

        st.markdown("---")
        st.markdown("""
        ### 📖 処理の流れ
        1. PDFをアップロード
        2. テキストを抽出
        3. チャンクに分割
        4. Embeddingを作成
        5. ベクトルDBに保存
        """)

    # メインコンテンツ
    st.markdown("---")

    # PDFアップロード
    pdf_text = get_pdf_text()

    if pdf_text and api_key:
        st.markdown("---")

        # 処理開始ボタン
        col1, col2, col3 = st.columns([1, 2, 1])
        with col2:
            if st.button("🚀 処理を開始", type="primary", use_container_width=True):

                # ステップ1: チャンク分割
                with st.status("✂️ テキストを分割中...", expanded=True) as status:
                    chunks = split_text_by_tokens(pdf_text)
                    st.write(f"📊 {len(chunks)}個のチャンクに分割しました")

                    # サンプル表示
                    with st.expander("チャンクのサンプル(最初の3個)"):
                        for i, chunk in enumerate(chunks[:3], 1):
                            st.text_area(
                                f"チャンク {i}",
                                chunk[:300] + "..." if len(chunk) > 300 else chunk,
                                height=100,
                                disabled=True
                            )

                    status.update(label="✅ 分割完了", state="complete")

                # ステップ2: Embedding作成
                with st.status("🔢 Embeddingを作成中...", expanded=True) as status:
                    embeddings = create_embeddings(chunks, api_key)

                    if embeddings:
                        st.write(f"📊 {len(embeddings)}個のEmbeddingを作成しました")
                        st.write(f"📐 各Embeddingは{len(embeddings[0])}次元のベクトルです")
                        status.update(label="✅ Embedding作成完了", state="complete")
                    else:
                        status.update(label="❌ Embedding作成失敗", state="error")
                        st.stop()

                # ステップ3: DB保存
                with st.status("💾 データベースに保存中...", expanded=True) as status:
                    success = store_embeddings(chunks, embeddings)

                    if success:
                        status.update(label="✅ 保存完了!", state="complete")
                        st.balloons()

                        st.success("""
                        🎉 **処理が完了しました!**

                        PDFの内容がデータベースに保存されました。
                        「Ask My PDF(s)」ページで質問してみましょう!
                        """)
                    else:
                        status.update(label="❌ 保存失敗", state="error")

    elif pdf_text and not api_key:
        st.warning("⚠️ サイドバーでAPIキーを入力してください")

def page_ask_my_pdf():
    """質問ページ(簡易版)"""
    st.title("🤔 Ask My PDF(s)")
    st.markdown("""
    ### 🎯 このページでできること
    アップロードしたPDFの内容について質問できます!
    (完全版は次の章で実装します)
    """)

    # APIキーの確認
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        st.warning("⚠️ まずPDF Uploadページでセットアップを完了してください")
        return

    st.markdown("---")

    # 検索テスト(デモ)
    st.subheader("🔍 類似検索テスト")
    st.write("PDFの内容から関連する部分を検索します")

    query = st.text_input(
        "検索したい内容を入力してください",
        placeholder="例:売上について教えて"
    )

    if query:
        if st.button("🔍 検索", type="primary"):
            with st.spinner("検索中..."):
                results = search_similar_chunks(query, api_key, top_k=3)

                if results:
                    st.success(f"✅ {len(results)}件の関連チャンクが見つかりました")

                    for i, result in enumerate(results, 1):
                        with st.expander(f"📄 結果 {i} (類似度: {result['score']:.3f})"):
                            st.write(result['text'])
                else:
                    st.warning("関連するチャンクが見つかりませんでした")
                    st.info("PDFをアップロードしてから検索してください")

# ========== メイン処理 ==========

def main():
    """メインアプリケーション"""
    # ページ選択
    selection = st.sidebar.radio(
        "ページを選択",
        ["PDF Upload", "Ask My PDF(s)"],
        help="まずPDF Uploadから始めてください"
    )

    # フッター
    st.sidebar.markdown("---")
    st.sidebar.markdown("""
    <div style='text-align: center; color: gray; font-size: 12px;'>
        Made with ❤️ using Streamlit<br>
        2025年版 - LangChain不使用
    </div>
    """, unsafe_allow_html=True)

    # ページ表示
    if selection == "PDF Upload":
        page_pdf_upload()
    elif selection == "Ask My PDF(s)":
        page_ask_my_pdf()

if __name__ == "__main__":
    main()