from multiprocessing import Pool from .init_user_rights_base import * @trace_unhandled_exceptions def process_diary_exe(line): _d = json.loads(line) # 剔除脏数据 if not _d['author_id']: return if _d['is_operation'] == SERVICE_TYPE.OPERATION and _d['content_level'] == DIARY_CONTENT_LEVEL.EXCELLENT: push_event(event_type=EventType.OPERATIONDIARYBEEDEXCELENTFROMOTHER, trigger_time=time.time(), user_id=_d['author_id'], item_id=_d['diary_id']) elif _d['is_operation'] == SERVICE_TYPE.OPERATION and _d['content_level'] == DIARY_CONTENT_LEVEL.FINE: push_event(event_type=EventType.OPERATIONDIARYBEFINEFROMOTHER, trigger_time=time.time(), user_id=_d['author_id'], item_id=_d['diary_id']) elif _d['is_operation'] == SERVICE_TYPE.NO_OPERATION and _d['content_level'] == DIARY_CONTENT_LEVEL.EXCELLENT: push_event(event_type=EventType.NONOPERATIONDIARYBEEXCELENTFROMOTHER, trigger_time=time.time(), user_id=_d['author_id'], item_id=_d['diary_id']) elif _d['is_operation'] == SERVICE_TYPE.NO_OPERATION and _d['content_level'] == DIARY_CONTENT_LEVEL.FINE: push_event(event_type=EventType.NONOPERATIONDIARYBEFINEFROMOTHER, trigger_time=time.time(), user_id=_d['author_id'], item_id=_d['diary_id']) class Command(BaseCommand): def handle(self, *args, **options): # diary print('------start-----') start_time = time.time() print(start_time) with open(path_base + 'diary_infos.txt', 'r') as f: lines = f.readlines() pool = Pool(processes=4) pool.map(process_diary_exe, lines) pool.close() pool.join() end_time = time.time() print(end_time) print('use {} s'.format(end_time - start_time)) print('Done!')