1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import random
import datetime
from celery import shared_task
from gm_types.push import PUSH_INFO_TYPE
from talos.cache.gaia import push_cache
from talos.logger import push_logger, info_logger
from utils.push import push_task_to_user_multi
from utils.rpc import (
rpc_client,
logging_exception,
)
from communal.models.push.personalize_tag import PersonalizeActionTag
from communal.cache.push import personalize_push_cache
@shared_task
def push_control_task(user_ids, platform, extra, alert, eta, push_type, labels, others=None):
"""
公共推送方法
:return:
"""
push_task_to_user_multi(
user_ids=user_ids,
platform=platform,
extra=extra,
alert=alert,
eta=eta,
push_type=push_type,
labels=labels,
others=others
)
@shared_task
def aggregation_push_trigger_task(cache_name, default_cache_key, unit_time=60 * 60, sole_sign=""):
"""
聚合推送 延迟触发任务
:param cache_name: 缓存名
:param default_cache_key: 默认缓存key
:param unit_time: 单位时间
:param sole_sign: 用于存日志的标识
:return:
"""
push_logger.info(json.dumps({
"subordinate": "aggregation_push",
"sole_sign": sole_sign,
"resume": "aggregation_push_trigger_task.",
"params": {
"cache_name": cache_name,
"default_cache_key": default_cache_key,
"unit_time": unit_time,
},
}))
# 从缓存中获取数据
for cache_key, item in push_cache.hscan_iter(cache_name, count=100):
if cache_key == default_cache_key:
continue
if item:
push_data = json.loads(item)
for push_type, record_ids in push_data.items():
countdown = random.choice(range(10, unit_time - 60 * 5)) # 获取触发的时间
_kw = {
"user_ids": [cache_key],
"action_type": push_type,
"record_ids": record_ids,
"sole_sign": sole_sign,
}
# 触发另一个异步任务
task_ = aggregation_push_timing_task.apply_async(
kwargs=_kw,
countdown=countdown
)
push_logger.info(json.dumps({
"subordinate": "aggregation_push",
"sole_sign": sole_sign,
"resume": "aggregation_push_timing_task.",
"task_id": task_.id,
"params": _kw,
"countdown": countdown,
}))
@shared_task
def aggregation_push_timing_task(**kwargs):
"""
聚合推送 单任务触发方法
:return:
"""
from communal.tools.push.push_aggregation import AggregationPushService
params = AggregationPushService.build_push_params(**kwargs)
if params:
push_logger.info(json.dumps({
"subordinate": "aggregation_push",
"sole_sign": kwargs.get("sole_sign", ""),
"resume": "aggregation_push builder params.",
"build_params": kwargs,
"push_params": params,
}))
push_task_to_user_multi(**kwargs)
@shared_task
def intelligent_push_task(content_id, user_ids, push_type, extra, platform=None, alert='', others=None, labels={}):
"""
push 统一管理 使用demeter发送 push
:param content_id: 内容id
:param user_ids: 接收push用户 list
:param push_type: 推送类型
:param platform: 推送渠道 Android ios
:param extra: dict
:param alert: object
:param others: object
:param labels: object
:return:
"""
_rpc_url = "demeter/push/user/community_push"
kwargs = {
"content_id": content_id,
"user_ids": user_ids,
"push_type": push_type,
"platform": platform,
"extra": extra,
"alert": alert,
"labels": labels,
}
if others:
kwargs.update({'others': others})
try:
rpc_client[_rpc_url](**kwargs).unwrap()
info_logger.info('invoke demeter push {}:{}'.format(content_id, push_type))
except:
logging_exception()
@shared_task
def parse_personalize_action_task(date_index=None):
push_logger.info(json.dumps(dict(mag='parse_personalize_action_task start!')))
if not date_index:
date_index = str(datetime.date.today() - datetime.timedelta(days=1))
cache_key_personalize_action_tag_info = 'demeter:push:action_tag_push:action_tag_info_{}'.format(date_index)
first_item = PersonalizeActionTag.objects.filter(date_index=date_index).first()
last_item = PersonalizeActionTag.objects.filter(date_index=date_index).last()
search_offset = 100
split_ids = [(i, i + search_offset) for i in range(first_item.id, last_item.id+1, search_offset)]
for (start_id, end_id) in split_ids:
res = PersonalizeActionTag.objects.filter(id__range=(start_id, end_id)).values(
"tag_info", "device_id"
)
for obj in res:
# personalize_action_dict[obj['device_id']] = obj['tag_info']
personalize_push_cache.hset(
cache_key_personalize_action_tag_info,
obj['device_id'],
obj['tag_info']
)
push_logger.info(json.dumps(dict(mag='parse_personalize_action_task finish!')))
@shared_task
def add_device_to_city_map_for_push():
push_logger.info(json.dumps(dict(mag='add_device_to_city_map_for_push start!')))
cache_key_demeter_device_2_city = 'demeter:push:device_2_city_map'
date_index = str(datetime.date.today() - datetime.timedelta(days=1))
first_item = PersonalizeActionTag.objects.filter(date_index=date_index).first()
last_item = PersonalizeActionTag.objects.filter(date_index=date_index).last()
search_offset = 200
split_ids = [(i, i + search_offset) for i in range(first_item.id, last_item.id+1, search_offset)]
for (start_id, end_id) in split_ids:
device_list = list(PersonalizeActionTag.objects.filter(id__range=(start_id, end_id)).values_list(
"device_id", flat=True
))
res = rpc_client['api/device/get_device_city'](
device_id_list=device_list
).unwrap()
for device_id in res:
personalize_push_cache.hset(
cache_key_demeter_device_2_city,
device_id,
res[device_id]
)
push_logger.info(json.dumps(dict(mag='add_device_to_city_map_for_push finish!')))