1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
__title__ = ''
__author__ = 'xierong@gmei.com'
__mtime__ = '17/10/27'
'''
import datetime
from distutils.version import LooseVersion
import pytz
from celery import shared_task
from django.conf import settings
from gm_types.push import PUSH_URGENCY, PERSONAL_PUSH_TYPE, PUSH_INFO_TYPE, AUTOMATED_PUSH
from talos.cache.gaia import push_cache
from talos.services.other import get_user_lastest_device_app_version_by_user_id
from utils.protocol import gm_protocol
from utils.rpc import get_rpc_invoker, rpc_client
from utils.rpc import logging_exception
from talos.logger import info_logger
def tzlc(dt, truncate_to_sec=True):
if dt is None:
return None
if truncate_to_sec:
dt = dt.replace(microsecond=0)
return pytz.timezone(settings.TIME_ZONE).localize(dt)
def eta_2_push_time(eta):
if eta:
eta = datetime.datetime.strptime(eta, '%Y-%m-%d %H:%M:%S')
eta = tzlc(eta)
return int((eta - datetime.datetime.fromtimestamp(0, pytz.timezone("UTC"))).total_seconds())
else:
push_time = None
return push_time
def special_push_limit(user_id, push_type, extra=None):
now = datetime.datetime.now()
hour = now.hour
if hour < 9 or hour >= 22:
return False
total_key = "total:{}:{}".format(now.strftime("%Y-%m-%d"), user_id)
total = push_cache.get(total_key)
if total and int(total) >= settings.SPECIAL_PUSH_LIMIT:
return False
sub_key = [push_type, user_id, ]
if extra is not None:
if isinstance(extra, (list, tuple)):
sub_key += list(extra)
else:
sub_key.append(extra)
sub_key = ":".join(map(str, sub_key))
pushed = push_cache.get(sub_key)
if pushed:
return False
if total:
push_cache.incr(total_key)
else:
push_cache.setex(total_key, 86400, 1)
push_cache.setex(sub_key, settings.SPECTAL_SECONDS_LIMIT, True)
return True
def push_task_to_user_multi(
user_ids, platform=None, extra=None, alert='', labels=None, eta=None, push_type=None, others=None
):
#mimas-push全部调用新的push接口
user_ids_generator = (user_ids[i:i + 128] for i in range(0, len(user_ids), 128))
for uid_list in user_ids_generator:
try:
rpc_client['push2/push/user/automated_push/uids'](
push_type=push_type,
user_ids=uid_list,
platform=platform,
alert=alert,
extra=extra,
push_time=eta_2_push_time(eta),
urgency=PUSH_URGENCY.NORMAL,
others=others
).unwrap()
except:
logging_exception()
def push_task_to_device_ids_multi(
device_ids,
platform=None,
extra=None,
alert='',
labels=None,
others=None,
push_type=None):
"""根据设备ids 给设备推送信息"""
device_ids_generator = (device_ids[i:i + 128] for i in range(0, len(device_ids), 128))
for device_ids_list in device_ids_generator:
try:
rpc_client['push2/push/user/device_ids/sp'](
device_ids=device_ids_list,
platform=platform,
alert=alert,
extra=extra,
push_time=None,
silent=False,
urgency=PUSH_URGENCY.NORMAL,
labels=labels,
others=others,
push_type=push_type,
).unwrap()
except:
logging_exception()
def limit_push(user_id, sign):
"""
推送限制,每种类型每天只能推送一次
:param user_id: 用户id,用于生成唯一redis_name
:param sign: 推送所属类型,e.g: favor --> 收藏
:return: True ok,False off;
"""
sign = sign.strip()
today = datetime.datetime.today() # 当前时间
# 推送时间限制
hour = today.hour
if hour < 9 or hour >= 22:
return False
name = "{0}_personal_push_{1}".format(user_id, today.strftime("%Y-%m-%d"))
# 推送类型及条数,类型:fans 粉丝、vote 点赞、favor 收藏、reply 回复、total 总值。
if sign not in PERSONAL_PUSH_TYPE:
return False
if push_cache.exists(name):
sign_val = push_cache.hget(name, sign)
total = push_cache.hget(name, "total")
if int(total) >= settings.SPECIAL_PUSH_LIMIT:
return False
if sign_val and int(sign_val) >= settings.PUSH_SUB_LIMIT:
return False
push_cache.hincrby(name, sign, amount=1)
push_cache.hincrby(name, "total", amount=1)
else:
# 用于设置失效时间,仅当天有效
push_cache.hset(name, "total", 1)
push_cache.hset(name, sign, 1)
push_cache.expire(name, (today.replace(hour=22, minute=1, second=0) - today).seconds)
return True
def limit_push_count(user_id, push_type, max_push_count=3, start_hour=9, end_hour=22):
"""
推送限制,每种类型每天 推送次数限制为max_push_count
每天只在 start_hour 到 end_hour 之间推送
"""
today = datetime.datetime.today() # 当前时间
# 推送时间限制
if today.hour < start_hour or today.hour >= end_hour:
return False
name = "{0}_personal_push_{1}_{2}_limit_{3}".format(
user_id, push_type, today.strftime("%Y-%m-%d"), max_push_count)
if push_cache.exists(name):
total = push_cache.get(name)
if int(total) >= max_push_count:
return False
push_cache.incr(name, amount=1)
else:
# 用于设置失效时间,仅当天有效
push_cache.set(name, 1)
push_cache.expire(name, (today.replace(hour=23, minute=59, second=59) - today).seconds)
return True
def limit_push_by_uids(user_dict):
'''
推送限制,多条校验,减少调用push的次数(push 一个请求支持向多个用户发送)
:param user_dict: {uid:push_type}
:return: 可以推送的用户列表
'''
uid_list = []
for uid, push_type in user_dict.items():
if limit_push(uid, push_type):
uid_list.append(uid)
return uid_list
def limit_get_comment_push(user_id):
today = datetime.datetime.today() # 当前时间
name = "{0}_personal_comment_push".format(user_id)
if push_cache.exists(name):
total = push_cache.get(name)
if int(total) >= settings.PERSON_GET_COMMENT_PUSH_LIMIT: #规定的数字
return False
push_cache.incr(name, amount=1)
else:
# 用于设置失效时间,仅当天有效
push_cache.set(name, 1)
push_cache.expire(name, (today.replace(hour=22, minute=1, second=0) - today).seconds)
return True
def doctor_noti_doctor_ids(
doctor_ids, platform=None, alert='', extra=None, push_time=None, urgency=None):
"""医生版极光推送临时实现
目前不实现push_time和urgency
"""
doctor_ids_generator = (doctor_ids[i:i + 128] for i in range(0, len(doctor_ids), 128))
for id_list in doctor_ids_generator:
try:
rpc_client['push2/push/doctor/notification/doctor_ids'](
doctor_ids=id_list,
platform=platform,
extra=extra,
alert=alert,
).unwrap()
except:
logging_exception()
def vote_push(user_id, alert, push_url, is_force=False, push_type=AUTOMATED_PUSH.DIARY_RECEIVED_PRAISE):
"""
点赞 推送
:param user_id:
:param is_force:
:return:
"""
now_date = datetime.datetime.now()
if not 9 <= now_date.hour < 22: # 超过晚十点灌水任务不执行
return
# 经查wiki, push_url和pushUrl里值一致是因为640兼容问题, 传送门:
# http://wiki.gengmei.cc/pages/viewpage.action?pageId=3354548
version = get_user_lastest_device_app_version_by_user_id(user_id)
is_gte_7715_version = LooseVersion(version) >= LooseVersion('7.7.15')
kwargs = {
'user_ids': [user_id],
'platform': ['android', 'iPhone'],
'extra': {
'type': PUSH_INFO_TYPE.GM_PROTOCOL,
'pushUrl': push_url,
'push_url': push_url,
},
'alert': alert,
'push_type': push_type,
}
push_task_to_user_multi(**kwargs)
def old_vote_push(user_id):
"""
7745已废弃
老版点赞 推送, 7715版本之前点赞不进行推送, 7715之后推送走消息通知页, 过几个版本可以删除此方法, 用vote_push 代替
回答、回复的点赞
:param user_id:
:param is_force:
:return:
"""
# 经查wiki, push_url和pushUrl里值一致是因为640兼容问题, 传送门:
# http://wiki.gengmei.cc/pages/viewpage.action?pageId=3354548
if limit_push(user_id=user_id, sign=PERSONAL_PUSH_TYPE.VOTE):
version = get_user_lastest_device_app_version_by_user_id(user_id)
is_push = LooseVersion(version) >= LooseVersion('7.7.15')
if not is_push:
return
kwargs = {
'user_ids': [user_id],
'platform': ['android', 'iPhone'],
'extra': {
'type': PUSH_INFO_TYPE.GM_PROTOCOL,
'pushUrl': gm_protocol.get_msg_notification_list(),
'push_url': gm_protocol.get_msg_notification_list(),
},
'alert': u'又有人给你点赞啦!快来看看是谁被你迷倒了~!',
"labels": {'event_type':'push', 'event':'received_votes'},
}
push_task_to_user_multi(**kwargs)
def push_time_range_limit():
"""
推送时间段 限制
:return:
"""
now = datetime.datetime.now()
hour = now.hour
if hour < 9 or hour >= 22:
return False
return True
def send_applet_subscribe_msg(user_id, template_id, data=None, page=None):
try:
res = rpc_client['api/wechat/subscribe_msg'](
user_id=user_id,
template_id=template_id,
data=data,
page=page
).unwrap()
errcode = res.get("errcode", -1)
if errcode != 0:
errmsg = res.get("errmsg", '')
info_logger.info("fail to send subscribe_msg, errcode:{errcode}, errmsg:{errmsg}".format(errcode=errcode, errmsg=errmsg))
return res
except:
logging_exception()