luanpoppe
refactor: pequenas refatorações
7fa7a9c
raw
history blame
6.57 kB
import os
from _utils.gerar_relatorio_modelo_usuario.prompts import (
prompt_auxiliar_do_contextual_prompt,
create_prompt_auxiliar_do_contextual_prompt,
)
from _utils.bubble_integrations.obter_arquivo import get_pdf_from_bubble
from _utils.chains.Chain_class import Chain
from _utils.prompts.Prompt_class import Prompt
from _utils.splitters.Splitter_class import Splitter
from setup.easy_imports import PyPDFLoader
from langchain_openai import ChatOpenAI
from typing import List, Dict, Tuple, Optional
from anthropic import Anthropic, AsyncAnthropic
import logging
from langchain.schema import Document
import asyncio
from langchain.prompts import PromptTemplate
from typing import List
from multiprocessing import Process, Barrier, Queue
from dataclasses import dataclass
from langchain_core.messages import HumanMessage
from asgiref.sync import sync_to_async
from setup.easy_imports import ChatPromptTemplate, ChatOpenAI
from _utils.gerar_relatorio_modelo_usuario.llm_calls import aclaude_answer, agpt_answer
from _utils.gerar_relatorio_modelo_usuario.prompts import contextual_prompt
from _utils.models.gerar_relatorio import (
ContextualizedChunk,
DocumentChunk,
RetrievalConfig,
)
from _utils.prompts.Prompt_class import prompt as prompt_obj
lista_contador = []
class ContextualRetriever:
def __init__(
self, config: RetrievalConfig, claude_api_key: str, claude_context_model: str
):
self.config = config
# self.claude_client = Anthropic(api_key=claude_api_key)
self.claude_client = AsyncAnthropic(api_key=claude_api_key)
self.logger = logging.getLogger(__name__)
self.bm25 = None
self.claude_context_model = claude_context_model
async def llm_generate_context(
self, page_text: str, chunk: DocumentChunk, resumo_auxiliar
) -> str:
"""Generate contextual description using ChatOpenAI"""
try:
print("COMEÇOU A REQUISIÇÃO")
prompt = contextual_prompt(page_text, resumo_auxiliar, chunk.content)
# response = await aclaude_answer(
# self.claude_client, self.claude_context_model, prompt
# )
response = await agpt_answer(prompt)
return response
except Exception as e:
self.logger.error(
f"Context generation failed for chunk {chunk.chunk_id}: {str(e)}"
)
return ""
# def gerar_resumo_auxiliar_do_contextual_embedding(self):
# prompt = Prompt().create_prompt_template(
# "", prompt_auxiliar_do_contextual_prompt
# )
# Chain(prompt, ChatOpenAI())
# return
async def create_contextualized_chunk(
self, chunk, single_page_text, response_auxiliar_summary
):
lista_contador.append(0)
print("contador: ", len(lista_contador))
# Código comentado abaixo é para ler as páginas ao redor da página atual do chunk
# page_content = ""
# for i in range(
# max(0, chunk.page_number - 1),
# min(len(single_page_text), chunk.page_number + 2),
# ):
# page_content += single_page_text[i].page_content if single_page_text[i] else ""
page_number = chunk.page_number - 1
page_content = single_page_text[page_number].page_content
context = await self.llm_generate_context(
page_content, chunk, response_auxiliar_summary
)
return ContextualizedChunk(
content=chunk.content,
page_number=chunk.page_number,
chunk_id=chunk.chunk_id,
start_char=chunk.start_char,
end_char=chunk.end_char,
context=context,
)
async def contextualize_all_chunks(
self, full_text_as_array: List[Document], chunks: List[DocumentChunk]
) -> List[ContextualizedChunk]:
"""Add context to all chunks"""
contextualized_chunks = []
lista_contador = []
full_text = ""
for x in full_text_as_array:
full_text += x.page_content
# prompt_auxiliar_summary = prompt_obj.create_prompt_template(
# "", prompt_auxiliar_do_contextual_prompt
# ).invoke({"PROCESSO_JURIDICO": full_text})
# response_auxiliar_summary = await ChatOpenAI(max_tokens=128000).ainvoke(
# prompt_auxiliar_summary
# )
prompt_auxiliar_summary = create_prompt_auxiliar_do_contextual_prompt(full_text)
print("\n\n\nprompt_auxiliar_summary: ", prompt_auxiliar_summary)
response_auxiliar_summary = await aclaude_answer(
self.claude_client, self.claude_context_model, prompt_auxiliar_summary
)
print("\n\n\n\nresponse_auxiliar_summary: ", response_auxiliar_summary)
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(
self.create_contextualized_chunk(
chunk, full_text_as_array, response_auxiliar_summary
)
)
for chunk in chunks
]
contextualized_chunks = [task.result() for task in tasks]
return contextualized_chunks
def get_full_text_and_all_PDFs_chunks(contexto, listaPDFs, splitterObject: Splitter):
all_PDFs_chunks = []
full_text = ""
if contexto:
full_text = contexto
chunks = splitterObject.load_and_split_text(full_text)
all_PDFs_chunks = chunks
else:
# Load and process document
for pdf in listaPDFs:
pdf_path = pdf
chunks = splitterObject.load_and_split_document(pdf_path)
all_PDFs_chunks = all_PDFs_chunks + chunks
# Get full text for contextualization
# loader = PyPDFLoader(pdf_path)
loader = get_pdf_from_bubble(pdf_path)
pages = loader.load()
full_text = " ".join([page.page_content for page in pages])
return full_text, all_PDFs_chunks, pages
async def contextualize_chunk_based_on_serializer(
serializer, contextual_retriever: ContextualRetriever, pages, all_PDFs_chunks
):
if serializer["should_have_contextual_chunks"]:
contextualized_chunks = await contextual_retriever.contextualize_all_chunks(
pages, all_PDFs_chunks
)
chunks_passados = contextualized_chunks
is_contextualized_chunk = True
else:
chunks_passados = all_PDFs_chunks
is_contextualized_chunk = False
return chunks_passados, is_contextualized_chunk