Spaces:
Running
Running
import PyPDF2 | |
import pytesseract | |
from pdf2image import convert_from_bytes | |
import arabic_reshaper | |
from bidi.algorithm import get_display | |
from transformers import pipeline | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import io | |
import os | |
import re | |
import torch | |
from typing import List, Dict | |
from agents import create_judge_agent, create_advocate_agent | |
from crewai import Task, Crew | |
class PDFProcessor: | |
def __init__(self): | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, # Reduced chunk size for better memory management | |
chunk_overlap=50, | |
length_function=len, | |
separators=["\n\n", "\n", " ", ""] | |
) | |
# Initialize models with better memory management | |
self.summarizer = pipeline( | |
"summarization", | |
model="sshleifer/distilbart-cnn-6-6", # Using a smaller, faster model | |
device="cpu", # Use CPU for better compatibility | |
torch_dtype=torch.float32, | |
batch_size=1 | |
) | |
self.progress_callback = None | |
# Configure torch for memory efficiency | |
#if torch.backends.mps.is_available(): # For Mac M1/M2 | |
# torch.backends.mps.set_per_process_memory_fraction(0.7) # Use only 70% of available memory | |
if torch.cuda.is_available(): # For CUDA devices | |
torch.cuda.empty_cache() | |
torch.cuda.set_per_process_memory_fraction(0.7) | |
def set_progress_callback(self, callback): | |
"""Set a callback function to report progress.""" | |
self.progress_callback = callback | |
def update_progress(self, message: str, progress: float): | |
"""Update progress through callback if available.""" | |
if self.progress_callback: | |
self.progress_callback(message, progress) | |
def extract_text_from_pdf(self, pdf_bytes: bytes) -> str: | |
"""Extract text from PDF, handling both searchable and scanned PDFs with improved accuracy.""" | |
text = "" | |
try: | |
# Try to extract text directly first using PyPDF2 | |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes)) | |
extracted_text = [] | |
for page in pdf_reader.pages: | |
page_text = page.extract_text() | |
if page_text.strip(): | |
extracted_text.append(page_text) | |
# If direct extraction yielded results, process it | |
if extracted_text: | |
text = "\n\n".join(extracted_text) | |
else: | |
# If no text was extracted, use OCR with improved settings | |
images = convert_from_bytes(pdf_bytes, dpi=300) # Higher DPI for better quality | |
for image in images: | |
# Configure tesseract for better Arabic text recognition | |
custom_config = r'--oem 1 --psm 3 -l ara+eng' | |
page_text = pytesseract.image_to_string( | |
image, | |
config=custom_config, | |
lang='ara+eng' | |
) | |
if page_text.strip(): | |
extracted_text.append(page_text) | |
text = "\n\n".join(extracted_text) | |
# Clean up the text | |
text = self._clean_text(text) | |
# Handle Arabic text with improved reshaping | |
text = self._process_arabic_text(text) | |
except Exception as e: | |
raise Exception(f"Error processing PDF: {str(e)}") | |
return text | |
def _clean_text(self, text: str) -> str: | |
"""Clean and normalize extracted text.""" | |
# Remove control characters | |
text = "".join(char for char in text if char.isprintable() or char in "\n\r\t") | |
# Normalize whitespace | |
text = re.sub(r'\s+', ' ', text) | |
text = re.sub(r'\n\s*\n', '\n\n', text) | |
# Fix common OCR issues | |
text = re.sub(r'(?<=[a-z])(?=[A-Z])', ' ', text) | |
text = re.sub(r'([.!?])\s*(?=[A-Z])', r'\1\n', text) | |
# Remove empty lines and extra whitespace | |
lines = [line.strip() for line in text.split('\n')] | |
text = '\n'.join(line for line in lines if line) | |
return text.strip() | |
def _process_arabic_text(self, text: str) -> str: | |
"""Process Arabic text with improved handling.""" | |
try: | |
# Reshape Arabic text | |
reshaped_text = arabic_reshaper.reshape(text) | |
# Apply bidirectional algorithm | |
text = get_display(reshaped_text) | |
# Fix common Arabic text issues | |
text = re.sub(r'([ء-ي])\s+([ء-ي])', r'\1\2', text) # Remove spaces between Arabic letters | |
text = re.sub(r'[\u200B-\u200F\u202A-\u202E]', '', text) # Remove Unicode control characters | |
return text | |
except Exception as e: | |
print(f"Warning: Error in Arabic text processing: {str(e)}") | |
return text # Return original text if processing fails | |
def summarize_document(self, text: str) -> str: | |
"""Generate a summary of the document with improved memory management and handling of Arabic text.""" | |
try: | |
# Split text into smaller chunks with consideration for Arabic text | |
chunks = self.text_splitter.split_text(text) | |
if not chunks: | |
return self._create_extractive_summary(text) | |
summaries = [] | |
total_chunks = len(chunks) | |
# Process chunks in batches with improved memory management | |
batch_size = 2 # Reduced batch size for better stability | |
for i in range(0, total_chunks, batch_size): | |
# Clear memory before processing new batch | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
elif torch.backends.mps.is_available(): | |
import gc | |
gc.collect() | |
torch.mps.empty_cache() | |
batch = chunks[i:i + batch_size] | |
for chunk in batch: | |
if not chunk.strip(): | |
continue | |
try: | |
# Determine if chunk is primarily Arabic | |
is_arabic = any(ord(c) >= 0x0600 and ord(c) <= 0x06FF for c in chunk) | |
# Adjust summary parameters based on text type | |
max_length = 150 if is_arabic else 130 | |
min_length = 40 if is_arabic else 30 | |
# Generate summary with optimized parameters | |
summary = self.summarizer( | |
chunk, | |
max_length=max_length, | |
min_length=min_length, | |
do_sample=False, | |
num_beams=1, # Single beam for efficiency | |
early_stopping=True, | |
truncation=True | |
) | |
summary_text = summary[0]['summary_text'].strip() | |
if summary_text: | |
summaries.append(summary_text) | |
except Exception as e: | |
print(f"Warning: Error summarizing chunk: {str(e)}") | |
# Fallback to extractive summary for this chunk | |
chunk_summary = self._create_extractive_summary(chunk, sentences_count=2) | |
if chunk_summary: | |
summaries.append(chunk_summary) | |
# Update progress | |
progress = min(0.3 + (i / total_chunks) * 0.4, 0.7) | |
self.update_progress("جاري تلخيص المستند...", progress) | |
if not summaries: | |
return self._create_extractive_summary(text) | |
# Combine summaries with improved formatting | |
final_summary = " ".join(summaries) | |
# Clean and process the final summary | |
final_summary = self._clean_text(final_summary) | |
final_summary = self._process_arabic_text(final_summary) | |
# Ensure reasonable length | |
if len(final_summary) > 2000: | |
final_summary = self._create_extractive_summary(final_summary, sentences_count=10) | |
return final_summary | |
except Exception as e: | |
print(f"Error in summarization: {str(e)}") | |
return self._create_extractive_summary(text) | |
def _create_extractive_summary(self, text: str, sentences_count: int = 5) -> str: | |
"""Create a simple extractive summary as a fallback method.""" | |
try: | |
# Split text into sentences | |
sentences = re.split(r'[.!?]\s+', text) | |
# Remove very short sentences and clean | |
sentences = [s.strip() for s in sentences if len(s.strip()) > 30] | |
if not sentences: | |
return text[:500] + "..." # Return truncated text if no good sentences | |
# Score sentences based on position and length | |
scored_sentences = [] | |
for i, sentence in enumerate(sentences): | |
score = 0 | |
# Prefer sentences from the beginning and end of the document | |
if i < len(sentences) * 0.3: # First 30% | |
score += 2 | |
elif i > len(sentences) * 0.7: # Last 30% | |
score += 1 | |
# Prefer medium-length sentences | |
if 50 <= len(sentence) <= 200: | |
score += 1 | |
scored_sentences.append((score, sentence)) | |
# Sort by score and select top sentences | |
scored_sentences.sort(reverse=True) | |
selected_sentences = [s[1] for s in scored_sentences[:sentences_count]] | |
# Sort sentences by their original order | |
selected_sentences.sort(key=lambda s: sentences.index(s)) | |
# Join sentences and clean | |
summary = ". ".join(selected_sentences) | |
summary = self._clean_text(summary) | |
summary = self._process_arabic_text(summary) | |
return summary | |
except Exception as e: | |
print(f"Error in extractive summary: {str(e)}") | |
return text[:500] + "..." # Return truncated text as last resort | |
def analyze_legal_issues(self, text: str) -> Dict: | |
"""Analyze legal issues in the document using the Judge agent.""" | |
judge_agent = create_judge_agent() | |
task_description = f""" | |
تحليل المستند التالي وتحديد المخالفات القانونية المحتملة وفقاً للقوانين الإماراتية: | |
{text} | |
يجب أن يتضمن التحليل: | |
1. المخالفات القانونية المحتملة | |
2. المواد القانونية ذات الصلة | |
3. التوصيات للتصحيح | |
""" | |
task = Task( | |
description=task_description, | |
agent=judge_agent, | |
expected_output="تحليل قانوني شامل للمخالفات والتوصيات" | |
) | |
crew = Crew(agents=[judge_agent], tasks=[task]) | |
result = crew.kickoff() | |
return {"legal_analysis": result} | |
def map_to_uae_legislation(self, text: str) -> Dict: | |
"""Map document content to relevant UAE laws and regulations.""" | |
advocate_agent = create_advocate_agent() | |
task_description = f""" | |
تحليل المستند التالي وربطه بالقوانين والتشريعات الإماراتية ذات الصلة: | |
{text} | |
يجب أن يتضمن التحليل: | |
1. القوانين الإماراتية ذات الصلة | |
2. المواد القانونية المحددة | |
3. التفسير القانوني للعلاقة | |
""" | |
task = Task( | |
description=task_description, | |
agent=advocate_agent, | |
expected_output="خريطة تفصيلية للقوانين والتشريعات ذات الصلة" | |
) | |
crew = Crew(agents=[advocate_agent], tasks=[task]) | |
result = crew.kickoff() | |
return {"legislation_mapping": result} | |
def process_document(self, pdf_bytes: bytes) -> Dict: | |
"""Process the document through all steps with progress tracking.""" | |
try: | |
# Extract text from PDF | |
self.update_progress("استخراج النص من المستند...", 0.1) | |
text = self.extract_text_from_pdf(pdf_bytes) | |
if not text.strip(): | |
raise ValueError("لم يتم العثور على نص قابل للقراءة في المستند") | |
# Generate summary | |
self.update_progress("إنشاء ملخص للمستند...", 0.3) | |
summary = self.summarize_document(text) | |
# Analyze legal issues | |
self.update_progress("تحليل القضايا القانونية...", 0.5) | |
legal_analysis = self.analyze_legal_issues(text) | |
# Map to UAE legislation | |
self.update_progress("ربط المستند بالتشريعات الإماراتية...", 0.7) | |
legislation_mapping = self.map_to_uae_legislation(text) | |
self.update_progress("اكتمل التحليل!", 1.0) | |
return { | |
"summary": summary, | |
"legal_analysis": legal_analysis["legal_analysis"], | |
"legislation_mapping": legislation_mapping["legislation_mapping"], | |
"raw_text": text # Include raw text for translation if needed | |
} | |
except Exception as e: | |
self.update_progress(f"حدث خطأ: {str(e)}", 0) | |
raise |