File size: 9,402 Bytes
50f0fbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import torch
from torch.utils.data._utils.collate import default_collate
from dataclasses import dataclass
from typing import Dict, List
from .base import (
    _CONFIG_MODEL_TYPE,
    _CONFIG_TOKENIZER_TYPE)
from fengshen.models.roformer import RoFormerForSequenceClassification
from fengshen.models.longformer import LongformerForSequenceClassification
from fengshen.models.zen1 import ZenForSequenceClassification
from transformers import (
    BertConfig,
    AutoModelForSequenceClassification,
    AutoTokenizer,
)
from transformers.models.auto.tokenization_auto import get_tokenizer_config
from transformers.pipelines.base import PipelineException, GenericTensor
from transformers import TextClassificationPipeline as HuggingfacePipe
import pytorch_lightning as pl
from fengshen.data.universal_datamodule import UniversalDataModule
from fengshen.utils.universal_checkpoint import UniversalCheckpoint
from fengshen.models.model_utils import add_module_args
import torchmetrics

_model_dict = {
    'fengshen-roformer': RoFormerForSequenceClassification,
    # 'fengshen-megatron_t5': T5EncoderModel,  TODO 实现T5EncoderForSequenceClassification
    'fengshen-longformer': LongformerForSequenceClassification,
    'fengshen-zen1': ZenForSequenceClassification,
    'huggingface-auto': AutoModelForSequenceClassification,
}

_tokenizer_dict = {}

_ATTR_PREPARE_INPUT = '_prepare_inputs_for_sequence_classification'


class _taskModel(pl.LightningModule):
    @staticmethod
    def add_model_specific_args(parent_args):
        _ = parent_args.add_argument_group('text classification task model')
        return parent_args

    def __init__(self, args, model):
        super().__init__()
        self.model = model
        self.acc_metrics = torchmetrics.Accuracy()
        self.save_hyperparameters(args)

    def setup(self, stage) -> None:
        if stage == 'fit':
            train_loader = self.trainer._data_connector._train_dataloader_source.dataloader()
            # Calculate total steps
            if self.trainer.max_epochs > 0:
                world_size = self.trainer.world_size
                tb_size = self.hparams.train_batchsize * max(1, world_size)
                ab_size = self.trainer.accumulate_grad_batches
                self.total_steps = (len(train_loader.dataset) *
                                    self.trainer.max_epochs // tb_size) // ab_size
            else:
                self.total_steps = self.trainer.max_steps // self.trainer.accumulate_grad_batches

            print('Total steps: {}' .format(self.total_steps))

    def training_step(self, batch, batch_idx):
        outputs = self.model(**batch)
        loss, _ = outputs[0], outputs[1]
        self.log('train_loss', loss)
        return loss

    def comput_metrix(self, logits, labels):
        y_pred = torch.argmax(logits, dim=-1)
        y_pred = y_pred.view(size=(-1,))
        y_true = labels.view(size=(-1,)).long()
        acc = self.acc_metrics(y_pred.long(), y_true.long())
        return acc

    def validation_step(self, batch, batch_idx):
        outputs = self.model(**batch)
        loss, logits = outputs[0], outputs[1]
        acc = self.comput_metrix(logits, batch['labels'])
        self.log('val_loss', loss)
        self.log('val_acc', acc)

    def predict_step(self, batch, batch_idx):
        output = self.model(**batch)
        return output.logits

    def configure_optimizers(self):
        from fengshen.models.model_utils import configure_optimizers
        return configure_optimizers(self)


@dataclass
class _Collator:
    tokenizer = None
    texta_name = 'sentence'
    textb_name = 'sentence2'
    label_name = 'label'
    max_length = 512
    model_type = 'huggingface-auto'

    def __call__(self, samples):
        sample_list = []
        for item in samples:
            if self.textb_name in item and item[self.textb_name] != '':
                if self.model_type != 'fengshen-roformer':
                    encode_dict = self.tokenizer.encode_plus(
                        [item[self.texta_name], item[self.textb_name]],
                        max_length=self.max_length,
                        padding='max_length',
                        truncation='longest_first')
                else:
                    encode_dict = self.tokenizer.encode_plus(
                        [item[self.texta_name]+'[SEP]'+item[self.textb_name]],
                        max_length=self.max_length,
                        padding='max_length',
                        truncation='longest_first')
            else:
                encode_dict = self.tokenizer.encode_plus(
                    item[self.texta_name],
                    max_length=self.max_length,
                    padding='max_length',
                    truncation='longest_first')
            sample = {}
            for k, v in encode_dict.items():
                sample[k] = torch.tensor(v)
            if self.label_name in item:
                sample['labels'] = torch.tensor(item[self.label_name]).long()
            sample_list.append(sample)
        return default_collate(sample_list)


class TextClassificationPipeline(HuggingfacePipe):
    @staticmethod
    def add_pipeline_specific_args(parent_args):
        parser = parent_args.add_argument_group('SequenceClassificationPipeline')
        parser.add_argument('--texta_name', default='sentence', type=str)
        parser.add_argument('--textb_name', default='sentence2', type=str)
        parser.add_argument('--label_name', default='label', type=str)
        parser.add_argument('--max_length', default=512, type=int)
        parser.add_argument('--device', default=-1, type=int)
        parser = _taskModel.add_model_specific_args(parent_args)
        parser = UniversalDataModule.add_data_specific_args(parent_args)
        parser = UniversalCheckpoint.add_argparse_args(parent_args)
        parser = pl.Trainer.add_argparse_args(parent_args)
        parser = add_module_args(parent_args)
        return parent_args

    def __init__(self,
                 model: str = None,
                 args=None,
                 **kwargs):
        self.args = args
        self.model_name = model
        self.model_type = 'huggingface-auto'
        # 用BertConfig做兼容,我只需要读里面的fengshen_model_type,所以这里用啥Config都可以
        config = BertConfig.from_pretrained(model)
        if hasattr(config, _CONFIG_MODEL_TYPE):
            self.model_type = config.fengshen_model_type
        if self.model_type not in _model_dict:
            raise PipelineException(self.model_name, ' not in model type dict')
        # 加载模型,并且使用模型的config
        self.model = _model_dict[self.model_type].from_pretrained(model)
        self.config = self.model.config
        # 加载分词
        tokenizer_config = get_tokenizer_config(model, **kwargs)
        self.tokenizer = None
        if hasattr(tokenizer_config, _CONFIG_TOKENIZER_TYPE):
            if tokenizer_config._CONFIG_TOKENIZER_TYPE in _tokenizer_dict:
                self.tokenizer = _tokenizer_dict[tokenizer_config._CONFIG_TOKENIZER_TYPE].from_pretrained(
                    model)
        if self.tokenizer is None:
            self.tokenizer = AutoTokenizer.from_pretrained(model)
        # 加载数据处理模块
        c = _Collator()
        c.tokenizer = self.tokenizer
        c.model_type = self.model_type
        if args is not None:
            c.texta_name = self.args.texta_name
            c.textb_name = self.args.textb_name
            c.label_name = self.args.label_name
            c.max_length = self.args.max_length
        self.collator = c
        device = -1 if args is None else args.device
        super().__init__(model=self.model,
                         tokenizer=self.tokenizer,
                         framework='pt',
                         device=device,
                         **kwargs)

    def train(self,
              datasets: Dict):
        """
        Args:
            datasets is a dict like
            {
                test: Dataset()
                validation: Dataset()
                train: Dataset()
            }
        """
        checkpoint_callback = UniversalCheckpoint(self.args)
        trainer = pl.Trainer.from_argparse_args(self.args,
                                                callbacks=[checkpoint_callback]
                                                )

        data_model = UniversalDataModule(
            datasets=datasets,
            tokenizer=self.tokenizer,
            collate_fn=self.collator,
            args=self.args)
        model = _taskModel(self.args, self.model)

        trainer.fit(model, data_model)
        return

    def preprocess(self, inputs, **tokenizer_kwargs) -> Dict[str, GenericTensor]:
        # 如果模型有自定义的接口,用模型的口
        if hasattr(self.model, _ATTR_PREPARE_INPUT):
            return getattr(self.model, _ATTR_PREPARE_INPUT)(inputs, self.tokenizer, **tokenizer_kwargs)
        samples = []
        if isinstance(inputs, str):
            samples.append({self.collator.texta_name: inputs})
        else:
            # 在__call__里面已经保证了input的类型,所以这里直接else就行
            for i in inputs:
                samples.append({self.collator.texta_name})
        return self.collator(samples)


Pipeline = TextClassificationPipeline