1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
__title__ = ''
__author__ = 'xierong@gmei.com'
__mtime__ = '17/12/28'
'''
import math
from celery import shared_task
from django.conf import settings
from six.moves import xrange
from talos.cache.base import tag_map_tag3_record
from talos.cache.viewrecord import ViewRecord
from talos.cache.base import diary_pv_cache
from talos.logger import info_logger
from talos.tools.tag3_tool import TagMapTool, CONTENT_TYPE, HISTORY_RECORD
def view_increase_amount(model_name, id_value, count, reverse=False):
for _ in xrange(count):
view_increase(model_name, id_value, reverse)
@shared_task
def view_increase(model_name, id_value, reverse=False):
"""
:param model_name:
:param id_value:
:param reverse: 相反的,做减法
:return:
"""
view = ViewRecord(model_name)
if reverse:
view[id_value] = int(view[id_value] or '1') - 1
else:
view[id_value] = int(view[id_value] or '0') + 1
return view[id_value]
@shared_task
def diary_increase_view(ids):
for i in ids:
diary_pv_cache.incrby(str(i), 1)
@shared_task
def sync_tag_map_by_type(content_type):
"""
根据内容类型 同步标签-标签3 映射关系
:param content_type:
:return:
"""
if not content_type:
return
cache_key = HISTORY_RECORD.format(content_type=content_type)
# 开始同步的起始id
current_record_max_id = int(tag_map_tag3_record.get(cache_key) or 0)
model = TagMapTool.current_model(content_type)
if not current_record_max_id:
return
max_id = TagMapTool.get_max_id(content_type)
info_logger.info('content_type:{}, max_id:{}'.format(content_type, max_id))
if current_record_max_id > max_id:
return
count = model.objects.using(settings.SLAVE_DB_NAME).filter(
id__gte=current_record_max_id, is_online=True
).count()
tags = []
for _ in range(int(math.ceil(count / TagMapTool.BATCH_SIZE))):
content_start_id = int(tag_map_tag3_record.get(cache_key) or 0)
content_ids = TagMapTool.get_content_ids(content_type, content_start_id)
if not content_ids:
continue
map_result = TagMapTool.get_tag_map_result(content_ids, content_type)
for res in map_result:
tag_ids = res.get('tags_info', {}).get('project_tags', [])
content_id = res.get('content_id')
if not all([tag_ids, content_id]):
continue
for _id in tag_ids:
tag_map = [content_id, _id]
tags.append(tag_map)
current_max_id = max(content_ids)
tag_map_tag3_record.set(cache_key, current_max_id)
tag_model = TagMapTool.current_tag_model(content_type)
create_info = []
for content_id, tag_id in tags: # 检查数据库是否已存在该条记录
if content_type == 'topic':
if tag_model.objects.filter(tag_v3_id=tag_id, problem_id=content_id).exists():
continue
elif content_type == 'diary':
if tag_model.objects.filter(tag_v3_id=tag_id, diary_id=content_id).exists():
continue
elif content_type == 'tractate':
if tag_model.objects.filter(tag_v3_id=tag_id, tractate_id=content_id).exists():
continue
elif content_type == 'question':
if tag_model.objects.filter(tag_v3_id=tag_id, question_id=content_id).exists():
continue
elif content_type == 'answer':
if tag_model.objects.filter(tag_v3_id=tag_id, answer_id=content_id).exists():
continue
create_info.append([content_id, tag_id])
content_objects = TagMapTool.create_info(create_info, content_type)
tag_model.objects.bulk_create(content_objects)
@shared_task
def sync_tag_map():
for content_type in CONTENT_TYPE:
sync_tag_map_by_type.apply_async(
args=(content_type, )
)