1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from multiprocessing import Pool
from .init_user_rights_base import *
@trace_unhandled_exceptions
def process_diary_exe(line):
_d = json.loads(line)
# 剔除脏数据
if not _d['author_id']:
return
if _d['is_operation'] == SERVICE_TYPE.OPERATION and _d['content_level'] == DIARY_CONTENT_LEVEL.EXCELLENT:
push_event(event_type=EventType.OPERATIONDIARYBEEDEXCELENTFROMOTHER, trigger_time=time.time(),
user_id=_d['author_id'], item_id=_d['diary_id'])
elif _d['is_operation'] == SERVICE_TYPE.OPERATION and _d['content_level'] == DIARY_CONTENT_LEVEL.FINE:
push_event(event_type=EventType.OPERATIONDIARYBEFINEFROMOTHER,
trigger_time=time.time(),
user_id=_d['author_id'], item_id=_d['diary_id'])
elif _d['is_operation'] == SERVICE_TYPE.NO_OPERATION and _d['content_level'] == DIARY_CONTENT_LEVEL.EXCELLENT:
push_event(event_type=EventType.NONOPERATIONDIARYBEEXCELENTFROMOTHER,
trigger_time=time.time(),
user_id=_d['author_id'], item_id=_d['diary_id'])
elif _d['is_operation'] == SERVICE_TYPE.NO_OPERATION and _d['content_level'] == DIARY_CONTENT_LEVEL.FINE:
push_event(event_type=EventType.NONOPERATIONDIARYBEFINEFROMOTHER,
trigger_time=time.time(),
user_id=_d['author_id'], item_id=_d['diary_id'])
class Command(BaseCommand):
def handle(self, *args, **options):
# diary
print('------start-----')
start_time = time.time()
print(start_time)
with open(path_base + 'diary_infos.txt', 'r') as f:
lines = f.readlines()
pool = Pool(processes=4)
pool.map(process_diary_exe, lines)
pool.close()
pool.join()
end_time = time.time()
print(end_time)
print('use {} s'.format(end_time - start_time))
print('Done!')