Spaces:
Runtime error
Runtime error
import re | |
import json | |
import multiprocessing | |
from tqdm import tqdm | |
from pathlib import Path | |
from itertools import chain | |
_SPLIT_DATA_PATH = '/data1/datas/wudao_180g' | |
def cut_sent(path): | |
""" | |
中文分句,默认?、。、!、省略号分句,考虑双引号包裹的句子 | |
采用分割替换的方式 | |
""" | |
path = Path(path) | |
# print(path) | |
save_path = str(Path('/data1/datas/wudao_180g_split', path.name)) | |
print('处理文件:', save_path) | |
with open(save_path, 'wt', encoding='utf-8') as w: | |
with open(path, 'rt', encoding='utf-8') as f: | |
for para in tqdm(f): | |
para = json.loads(para) | |
para_ = para['text'] + ' ' | |
# print('sentence piece......') | |
# pep8中 正则不能些 \? 要写成\\? | |
para_ = re.sub('([?。!\\?\\!…]+)([^”’]|[”’])', | |
r'\1#####\2', para_) | |
para_ = re.sub('([\\.]{3,})([^”’])', r'\1#####\2', para_) | |
# 匹配 \1: 句子结束符紧挨’” \2: 非句子结束符号,被引号包裹的句子 | |
para_ = re.sub( | |
'([。!?\\?\\!…][”’])([^,。!?\\?\\!]|\\s)', r'\1#####\2', para_) | |
para_ = re.sub( | |
'([\\.]{3,}[”’])([^,。!?\\?\\!]|\\s)', r'\1#####\2', para_) | |
para_ = re.sub( | |
'([#]{5})([”’])([^,。!?\\?\\!])', r'\2#####\3', para_) | |
para_ = para_.strip() | |
# 一个512里面多个样本 | |
line_ = '' | |
for line in para_.split('#####'): | |
line = line.strip() | |
if len(line_) < 512 and len(line) > 0: | |
line_ += line | |
else: | |
w.writelines(json.dumps( | |
{'text': line_}, ensure_ascii=False)+'\n') | |
line_ = line | |
w.writelines(json.dumps( | |
{'text': line_}, ensure_ascii=False)+'\n') | |
def chain_iter(*filenames): | |
""" | |
将多个文件读成一个迭代器 | |
""" | |
reader = [open(file, 'r') for file in filenames] | |
return chain(*reader) | |
class Config(object): | |
def __init__(self, data_path=_SPLIT_DATA_PATH, num_worker=16, split_numb=600000, cut_sentence=True, output_file=None) -> None: | |
self.data_path = Path(data_path) | |
self.num_worker = num_worker | |
self.split_numb = split_numb | |
self.cut_sentence = cut_sentence | |
def processing1(): | |
args = Config() | |
p_ = [str(i) for i in args.data_path.glob('*')] | |
fin = chain_iter(*p_) | |
pool = multiprocessing.Pool(args.num_worker) | |
docs = pool.imap(cut_sent, fin, chunksize=args.num_worker) | |
if not Path(args.data_path.parent, args.data_path.name+'_split').exists(): | |
Path(args.data_path.parent, args.data_path.name+'_split').mkdir() | |
writer = open(str(Path(args.data_path.parent, args.data_path.name + | |
'_split', 'sentence_level.json')), 'wt', encoding='utf-8') | |
for doc in tqdm(docs): | |
for sentence in doc: | |
writer.writelines(json.dumps( | |
{"text": sentence}, ensure_ascii=False)+'\n') | |
pool.close() | |
pool.join() | |
writer.close() | |
if __name__ == '__main__': | |
from time import process_time, perf_counter | |
from random import shuffle | |
st = process_time() | |
args = Config(num_worker=16) | |
if not Path(args.data_path.parent, args.data_path.name+'_split').exists(): | |
Path(args.data_path.parent, args.data_path.name + | |
'_split').mkdir(parents=True) | |
p_ = [str(i) for i in args.data_path.glob('*')] | |
# 简单shuffle | |
shuffle(p_) | |
pool = multiprocessing.Pool(args.num_worker) | |
for item in p_: | |
pool.apply_async(func=cut_sent, args=(item,)) | |
pool.close() | |
pool.join() | |
cost_time = process_time() - st | |
print('DONE!! cost time : %.5f' % cost_time) | |