Spaces:
Running
on
A10G
Running
on
A10G
File size: 2,313 Bytes
c27e5a4 f6ff669 c27e5a4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
from threading import Thread
from multiprocessing import Queue
from typing import Tuple, Dict, List
from collections import defaultdict
import logging
import sys
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
class TextFilterer(Thread):
def __init__(
self,
text_queue: "Queue[Tuple[str, str]]",
filtered_text_queue: "Queue[Tuple[str, str]]",
):
super().__init__()
self.text_queue = text_queue
self.filtered_text_queue = filtered_text_queue
self.daemon = True # Thread will exit when main program exits
self.text_buffers: Dict[str, List[str]] = defaultdict(list)
self.max_buffer_size = 5
def filter_text(self, text: str, session_id: str) -> str | None:
self.text_buffers[session_id].append(text)
if len(self.text_buffers[session_id]) < self.max_buffer_size:
return None
while len(self.text_buffers[session_id]) > self.max_buffer_size:
_ = self.text_buffers[session_id].pop(0)
candidate = self.text_buffers[session_id][-2]
if candidate != "":
print(f"Candidate: {candidate}")
if (
len(self.text_buffers[session_id][-3])
< len(candidate)
>= len(self.text_buffers[session_id][-1])
):
for past in self.text_buffers[session_id][:-2]:
if candidate == past:
return None
return candidate
return None
def run(self) -> None:
"""Main processing loop."""
while True:
try:
# Get text from queue, blocks until text is available
text, session_id = self.text_queue.get()
# Process the text into an action
filtered_text = self.filter_text(text, session_id)
# If we got a valid action, add it to the action queue
if filtered_text:
self.filtered_text_queue.put((filtered_text, session_id))
except Exception as e:
logger.error(f"Error processing text: {str(e)}")
continue
|