File size: 2,872 Bytes
76a154f
 
 
 
 
 
6c3bb74
 
 
76a154f
6c3bb74
 
 
 
 
 
 
 
 
 
 
 
 
76a154f
 
6c3bb74
 
 
 
 
 
 
 
 
 
 
 
76a154f
 
6c3bb74
 
 
 
98df5b4
 
6c3bb74
 
 
 
 
 
 
 
 
76a154f
6c3bb74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76a154f
6c3bb74
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from threading import Thread
from typing import Iterator

import torch
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer

class LlamaModel:
    def __init__(self, model_id: str):
        self.model_id = model_id

        if torch.cuda.is_available():
            config = AutoConfig.from_pretrained(model_id)
            config.pretraining_tp = 1
            self.model = AutoModelForCausalLM.from_pretrained(
                model_id,
                config=config,
                torch_dtype=torch.float16,
                load_in_4bit=False,
                device_map='auto'
            )
        else:
            self.model = None
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)


    def get_prompt(self, message: str, chat_history: list[tuple[str, str]],
                system_prompt: str) -> str:
        texts = [f'<s>[INST] <<SYS>>\n{system_prompt}\n<</SYS>>\n\n']
        # The first user input is _not_ stripped
        do_strip = False
        for user_input, response in chat_history:
            user_input = user_input.strip() if do_strip else user_input
            do_strip = True
            texts.append(f'{user_input} [/INST] {response.strip()} </s><s>[INST] ')
        message = message.strip() if do_strip else message
        texts.append(f'{message} [/INST]')
        return ''.join(texts)


    def get_input_token_length(self, message: str, chat_history: list[tuple[str, str]], system_prompt: str) -> int:
        prompt = self.get_prompt(message, chat_history, system_prompt)
        input_ids = self.tokenizer([prompt], return_tensors='np', add_special_tokens=False)['input_ids']
        return input_ids.shape[-1]


    def run(self, message: str,
            chat_history: list[tuple[str, str]],
            system_prompt: str,
            max_new_tokens: int = 1024,
            temperature: float = 0.8,
            top_p: float = 0.95,
            top_k: int = 50) -> Iterator[str]:
        prompt = self.get_prompt(message, chat_history, system_prompt)
        inputs = self.tokenizer([prompt], return_tensors='pt', add_special_tokens=False).to('cuda')

        streamer = TextIteratorStreamer(self.tokenizer,
                                        timeout=10.,
                                        skip_prompt=True,
                                        skip_special_tokens=True)
        generate_kwargs = dict(
            inputs,
            streamer=streamer,
            max_new_tokens=max_new_tokens,
            do_sample=True,
            top_p=top_p,
            top_k=top_k,
            temperature=temperature,
            num_beams=1,
        )
        t = Thread(target=self.model.generate, kwargs=generate_kwargs)
        t.start()

        outputs = []
        for text in streamer:
            outputs.append(text)
            yield ''.join(outputs)