1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from collections import namedtuple
from datetime import datetime
from celery import shared_task
from utils.rpc import (
rpc_client,
logging_exception,
)
from gm_types.push import PUSH_INFO_TYPE, AUTOMATED_PUSH
from talos.services import UserService
from talos.cache.gaia import push_cache
from utils.push import push_task_to_user_multi
from utils.protocol import gm_protocol
from qa.libs import _get_content_text
from qa.models import (
Answer,
QualityQuestionRead,
QualityUserQuestion,
QualityAnswerRead,
QualityQuestion,
QualityAuthorAnswer,
)
from communal.tasks import intelligent_push_task
@shared_task
def write_unread_data(quality_question_id, answer_id=0, _type=None):
"""
用户提问/作者回复 写未读记录
:param answer_id:
:param quality_question_id:
:param _type:
:return:
"""
nt = namedtuple('_TYPE', ['ASK_QUESTION', 'CREATE_ANSWER'])
_TYPE = nt(ASK_QUESTION='ask_question', CREATE_ANSWER='create_answer')
if not (_type and _type in _TYPE):
return
if _type == _TYPE.ASK_QUESTION:
try:
answer = Answer.objects.get(id=answer_id)
except Answer.DoesNotExist:
return
QualityQuestionRead.objects.create(
quality_question_id=quality_question_id,
user_id=answer.user_id,
is_read=False,
)
return
elif _type == _TYPE.CREATE_ANSWER:
asked_question_users = list(QualityUserQuestion.objects.filter(
quality_question_id=quality_question_id, is_online=True
).values_list('user_id', flat=True))
if not asked_question_users:
return
answer_read_objects = []
for user_id in asked_question_users:
qar = QualityAnswerRead(
quality_question_id=quality_question_id,
user_id=user_id,
is_read=False,
)
answer_read_objects.append(qar)
QualityAnswerRead.objects.bulk_create(answer_read_objects)
return
@shared_task
def questioning_push(user_id, answer_id, quality_question_id, question_title):
"""追问push给作者。
:param user_id: 追问人user_id
:param answer_id: 追问的回答id
:param quality_question_id: 追问id
:param question_title: 追问内容
:return:
"""
try:
answer = Answer.objects.get(id=answer_id)
except Answer.DoesNotExist:
return
# 每一个回答,一天作者只收到一次push
cache_key_tpl = "question:{date}:{user_id}:{answer_id}"
push_user_id = answer.user_id
cache_key = cache_key_tpl.format(date=str(datetime.now().date()), user_id=push_user_id, answer_id=answer_id)
if push_cache.get(cache_key):
return
# 作者对追问已经进行回复,这个追问(其他追问可以)不再产生push(时间无线)
has_author_answer = QualityAuthorAnswer.objects.filter(quality_question_id=quality_question_id).exists()
if has_author_answer:
return
user = UserService.get_user_by_user_id(user_id=user_id)
content = question_title if len(question_title) <= 30 else question_title[:30] + "..."
push_url = gm_protocol.get_user_answer_list()
intelligent_push_task.apply_async(
args=(
answer_id, [push_user_id], AUTOMATED_PUSH.QUALITY_QUESTION,
{
'type': PUSH_INFO_TYPE.GM_PROTOCOL,
'pushUrl': push_url,
'push_url': push_url,
}
),
kwargs={
"platform": None,
"alert": content,
"others": {
"title": "@{} 希望向你请教".format(user.nickname),
"alert": content,
},
"labels": {
'event_type': 'push',
'event': 'quality_question_received_questioning'
},
},
)
push_cache.set(cache_key, 1)
today = datetime.today()
push_cache.expire(cache_key, (today.replace(hour=23, minute=59, second=59) - today).seconds)
@shared_task
def answer_push(user_id, quality_question_id):
"""作者回复提问,push给追问用户"""
try:
quality_question = QualityQuestion.objects.get(id=quality_question_id, is_online=True)
except QualityQuestion.DoesNotExist:
return
try:
answer = Answer.objects.get(id=quality_question.answer_id, is_online=True)
except Answer.DoesNotExist:
return
user_ids = list(
QualityUserQuestion.objects.filter(
quality_question_id=quality_question_id, is_online=True
).values_list("user_id", flat=True)
)
if not user_ids:
return
# 校验同一追问是否推送过
cache_key_tpl = "answer:{date}:{answer_id}"
cache_key = cache_key_tpl.format(date=str(datetime.now().date()), answer_id=answer.id)
push_user_ids = []
for user_id, has_push in zip(user_ids, push_cache.hmget(cache_key, user_ids)):
if not has_push:
push_user_ids.append(user_id)
if not push_user_ids:
return
user = UserService.get_user_by_user_id(user_id=user_id)
content = _get_content_text(answer.content)
content = content if len(content) <= 30 else content[:30] + "..."
push_msg = "「{user_name}」回答了你在「{content}」下的提问,来看看她是怎么说的~".format(user_name=user.nickname, content=content)
push_type = AUTOMATED_PUSH.QUALITY_QUESTION
push_url = gm_protocol.get_user_answer_list()
extra = {
'type': PUSH_INFO_TYPE.GM_PROTOCOL,
'msgType': 4,
'pushUrl': push_url,
'push_url': push_url,
}
push_task_to_user_multi(
user_ids=push_user_ids,
push_type=push_type,
extra=extra,
labels={'event_type': 'push', 'event': 'quality_question_received_answer'},
alert=push_msg,
)
push_cache.hmset(cache_key, {user_id: 1 for user_id in push_user_ids})
today = datetime.today()
push_cache.expire(cache_key, (today.replace(hour=23, minute=59, second=59) - today).seconds)