File size: 5,378 Bytes
0181645
 
 
 
f8872d2
0181645
 
 
 
 
 
 
 
 
f8872d2
0181645
 
9dcbf6a
0181645
733fb63
 
88a7359
 
733fb63
88a7359
 
733fb63
0181645
733fb63
0181645
f8872d2
733fb63
 
 
 
0181645
 
88a7359
2425701
0181645
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#embedding_utils.py

from sentence_transformers import SentenceTransformer
from llama_index.core import SimpleDirectoryReader
from huggingface_hub import login
from typing import List, Tuple
from dotenv import load_dotenv
import numpy as np
import os
import tempfile
from docx import Document
import tempfile
import os
import logging


# Load environment variables from .env file
#load_dotenv()

# Set the HF_HOME environment variable to a writable directory
os.environ["HF_HOME"] = "/tmp/huggingface_cache"

# Ensure the cache directory exists
cache_dir = os.environ["HF_HOME"]
if not os.path.exists(cache_dir):
    os.makedirs(cache_dir)
    
# Load Hugging Face token from environment variable
huggingface_token = os.getenv('HUGGINGFACE_HUB_TOKEN')


if huggingface_token:
    login(token=huggingface_token, add_to_git_credential=True, write_permission=True)
else:
    raise ValueError("Hugging Face token is not set. Please set the HUGGINGFACE_HUB_TOKEN environment variable.")

# Load model and tokenizer with authentication
model_name = 'nvidia/NV-Embed-v1'

model_name = 'nomic-ai/nomic-embed-text-v1.5'
model = SentenceTransformer('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True)
model.max_seq_length = 4096
model.tokenizer.padding_side = "right"


def read_document(file_content: bytes, file_id: int, file_format: str) -> str:
    """Extract text content from a document file depending on its format."""
    try:
        # Create a temporary directory to save the file
        with tempfile.TemporaryDirectory() as temp_dir:
            file_path = os.path.join(temp_dir, f"document_{file_id}.{file_format}")
            
            # Save the content to the file
            with open(file_path, 'wb') as temp_file:
                temp_file.write(file_content)

            # Handle different file formats
            if file_format.lower() == 'docx':
                text_content = extract_text_from_docx(file_path)
            elif file_format.lower() == 'pdf':
                text_content = extract_text_from_pdf(file_path)
            elif file_format.lower() in ['txt', 'md', 'csv']:
                reader = SimpleDirectoryReader(input_files=[file_path])
                documents = reader.load_data()
                text_content = documents[0].text if documents else ''
            else:
                raise ValueError(f"Unsupported file format: {file_format}")

            if text_content:
                return text_content
            else:
                raise ValueError("No content extracted from the document.")
    
    except Exception as e:
        logging.error(f"Error reading document: {e}")
        raise

def extract_text_from_docx(file_path: str) -> str:
    """Extract text from a DOCX file."""
    try:
        doc = Document(file_path)
        full_text = [para.text for para in doc.paragraphs]
        return '\n'.join(full_text)
    except Exception as e:
        logging.error(f"Error extracting text from DOCX file: {e}")
        raise

def extract_text_from_pdf(file_path: str) -> str:
    """Extract text from a PDF file."""
    import pdfplumber
    try:
        with pdfplumber.open(file_path) as pdf:
            full_text = [page.extract_text() for page in pdf.pages]
        return '\n'.join(full_text).strip()
    except Exception as e:
        logging.error(f"Error extracting text from PDF file: {e}")
        raise



def cumulative_semantic_chunking( text: str, max_chunk_size: int, similarity_threshold: float, embedding_model: SentenceTransformer = model) -> List[str]:
    """Cumulative semantic chunking using sentence embeddings."""
    sentences = text.split('.')

    # Encode sentences
    sentence_embeddings = model.encode(sentences)

    chunks = []
    current_chunk = sentences[0]
    #print('current chunk',current_chunk)
    current_embedding = sentence_embeddings[0]

    for sentence, embedding in zip(sentences[1:], sentence_embeddings[1:]):
        combined_chunk = current_chunk + '. ' + sentence
        combined_embedding = (current_embedding * len(current_chunk.split()) + embedding * len(sentence.split())) / (len(current_chunk.split()) + len(sentence.split()))

        similarity = np.dot(current_embedding, combined_embedding) / (np.linalg.norm(current_embedding) * np.linalg.norm(combined_embedding))

        if similarity >= similarity_threshold and len(combined_chunk) <= max_chunk_size:
            current_chunk = combined_chunk
            current_embedding = combined_embedding
        else:
            chunks.append(current_chunk.strip())
            current_chunk = sentence
            current_embedding = embedding

    if current_chunk:
        chunks.append(current_chunk.strip())

    #print('chunks',chunks)
    return chunks

# def embed_chunks(chunks: List[str]) -> List[np.ndarray]:
#     """Embed the chunks using the SentenceTransformer model."""
#     return model.encode(chunks)

def embed_chunks(chunks: List[str]) -> Tuple[List[np.ndarray], int]:
    """Embed the chunks using the SentenceTransformer model and return embeddings along with the total token count."""
    total_tokens = 0
    embeddings = []

    for chunk in chunks:
        tokens = model.tokenizer.encode(chunk, add_special_tokens=False)
        total_tokens += len(tokens)
        embedding = model.encode(chunk)
        embeddings.append(embedding)

    return embeddings, total_tokens