File size: 2,313 Bytes
c27e5a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f6ff669
 
c27e5a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from threading import Thread
from multiprocessing import Queue
from typing import Tuple, Dict, List
from collections import defaultdict

import logging
import sys

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

logger = logging.getLogger(__name__)


class TextFilterer(Thread):
    def __init__(
        self,
        text_queue: "Queue[Tuple[str, str]]",
        filtered_text_queue: "Queue[Tuple[str, str]]",
    ):
        super().__init__()
        self.text_queue = text_queue
        self.filtered_text_queue = filtered_text_queue
        self.daemon = True  # Thread will exit when main program exits
        self.text_buffers: Dict[str, List[str]] = defaultdict(list)
        self.max_buffer_size = 5

    def filter_text(self, text: str, session_id: str) -> str | None:
        self.text_buffers[session_id].append(text)

        if len(self.text_buffers[session_id]) < self.max_buffer_size:
            return None

        while len(self.text_buffers[session_id]) > self.max_buffer_size:
            _ = self.text_buffers[session_id].pop(0)

        candidate = self.text_buffers[session_id][-2]
        if candidate != "":
            print(f"Candidate: {candidate}")

        if (
            len(self.text_buffers[session_id][-3])
            < len(candidate)
            >= len(self.text_buffers[session_id][-1])
        ):
            for past in self.text_buffers[session_id][:-2]:
                if candidate == past:
                    return None
            return candidate

        return None

    def run(self) -> None:
        """Main processing loop."""
        while True:
            try:
                # Get text from queue, blocks until text is available
                text, session_id = self.text_queue.get()

                # Process the text into an action
                filtered_text = self.filter_text(text, session_id)

                # If we got a valid action, add it to the action queue
                if filtered_text:
                    self.filtered_text_queue.put((filtered_text, session_id))

            except Exception as e:
                logger.error(f"Error processing text: {str(e)}")
                continue