#!/usr/bin/env python # -*- coding: utf-8 -*- import json import random import datetime from celery import shared_task from gm_types.push import PUSH_INFO_TYPE from talos.cache.gaia import push_cache from talos.logger import push_logger, info_logger from utils.push import push_task_to_user_multi from utils.rpc import ( rpc_client, logging_exception, ) from communal.models.push.personalize_tag import PersonalizeActionTag from communal.cache.push import personalize_push_cache @shared_task def push_control_task(user_ids, platform, extra, alert, eta, push_type, labels, others=None): """ 公共推送方法 :return: """ push_task_to_user_multi( user_ids=user_ids, platform=platform, extra=extra, alert=alert, eta=eta, push_type=push_type, labels=labels, others=others ) @shared_task def aggregation_push_trigger_task(cache_name, default_cache_key, unit_time=60 * 60, sole_sign=""): """ 聚合推送 延迟触发任务 :param cache_name: 缓存名 :param default_cache_key: 默认缓存key :param unit_time: 单位时间 :param sole_sign: 用于存日志的标识 :return: """ push_logger.info(json.dumps({ "subordinate": "aggregation_push", "sole_sign": sole_sign, "resume": "aggregation_push_trigger_task.", "params": { "cache_name": cache_name, "default_cache_key": default_cache_key, "unit_time": unit_time, }, })) # 从缓存中获取数据 for cache_key, item in push_cache.hscan_iter(cache_name, count=100): if cache_key == default_cache_key: continue if item: push_data = json.loads(item) for push_type, record_ids in push_data.items(): countdown = random.choice(range(10, unit_time - 60 * 5)) # 获取触发的时间 _kw = { "user_ids": [cache_key], "action_type": push_type, "record_ids": record_ids, "sole_sign": sole_sign, } # 触发另一个异步任务 task_ = aggregation_push_timing_task.apply_async( kwargs=_kw, countdown=countdown ) push_logger.info(json.dumps({ "subordinate": "aggregation_push", "sole_sign": sole_sign, "resume": "aggregation_push_timing_task.", "task_id": task_.id, "params": _kw, "countdown": countdown, })) @shared_task def aggregation_push_timing_task(**kwargs): """ 聚合推送 单任务触发方法 :return: """ from communal.tools.push.push_aggregation import AggregationPushService params = AggregationPushService.build_push_params(**kwargs) if params: push_logger.info(json.dumps({ "subordinate": "aggregation_push", "sole_sign": kwargs.get("sole_sign", ""), "resume": "aggregation_push builder params.", "build_params": kwargs, "push_params": params, })) push_task_to_user_multi(**kwargs) @shared_task def intelligent_push_task(content_id, user_ids, push_type, extra, platform=None, alert='', others=None, labels={}): """ push 统一管理 使用demeter发送 push :param content_id: 内容id :param user_ids: 接收push用户 list :param push_type: 推送类型 :param platform: 推送渠道 Android ios :param extra: dict :param alert: object :param others: object :param labels: object :return: """ _rpc_url = "demeter/push/user/community_push" kwargs = { "content_id": content_id, "user_ids": user_ids, "push_type": push_type, "platform": platform, "extra": extra, "alert": alert, "labels": labels, } if others: kwargs.update({'others': others}) try: rpc_client[_rpc_url](**kwargs).unwrap() info_logger.info('invoke demeter push {}:{}'.format(content_id, push_type)) except: logging_exception() @shared_task def parse_personalize_action_task(date_index=None): push_logger.info(json.dumps(dict(mag='parse_personalize_action_task start!'))) if not date_index: date_index = str(datetime.date.today() - datetime.timedelta(days=1)) cache_key_personalize_action_tag_info = 'demeter:push:action_tag_push:action_tag_info_{}'.format(date_index) first_item = PersonalizeActionTag.objects.filter(date_index=date_index).first() last_item = PersonalizeActionTag.objects.filter(date_index=date_index).last() search_offset = 100 split_ids = [(i, i + search_offset) for i in range(first_item.id, last_item.id+1, search_offset)] for (start_id, end_id) in split_ids: res = PersonalizeActionTag.objects.filter(id__range=(start_id, end_id)).values( "tag_info", "device_id" ) for obj in res: # personalize_action_dict[obj['device_id']] = obj['tag_info'] personalize_push_cache.hset( cache_key_personalize_action_tag_info, obj['device_id'], obj['tag_info'] ) push_logger.info(json.dumps(dict(mag='parse_personalize_action_task finish!'))) @shared_task def add_device_to_city_map_for_push(): push_logger.info(json.dumps(dict(mag='add_device_to_city_map_for_push start!'))) cache_key_demeter_device_2_city = 'demeter:push:device_2_city_map' date_index = str(datetime.date.today() - datetime.timedelta(days=1)) first_item = PersonalizeActionTag.objects.filter(date_index=date_index).first() last_item = PersonalizeActionTag.objects.filter(date_index=date_index).last() search_offset = 200 split_ids = [(i, i + search_offset) for i in range(first_item.id, last_item.id+1, search_offset)] for (start_id, end_id) in split_ids: device_list = list(PersonalizeActionTag.objects.filter(id__range=(start_id, end_id)).values_list( "device_id", flat=True )) res = rpc_client['api/device/get_device_city']( device_id_list=device_list ).unwrap() for device_id in res: personalize_push_cache.hset( cache_key_demeter_device_2_city, device_id, res[device_id] ) push_logger.info(json.dumps(dict(mag='add_device_to_city_map_for_push finish!')))