#!/usr/bin/env python # -*- coding: utf-8 -*- # 聚合推送 import datetime import json from utils.common import big_data_iter from .push_base import ( hour, minute, push_cache, push_logger, PushServiceBase, get_datetime_to_str, ) from .push_builder import PushEvents class AggregationPushService(PushServiceBase): """ 聚合推送 service """ _default_cache_name = "aggregation_push" ratelimit_unit_times = 2 push_handlers = PushEvents.aggregation_push_handlers # < ----------- 用户 缓存名 获取 begin ------------ > @classmethod def _get_aggregation_push_normal_time_cache_name(cls, push_types, times): """ 获取正常时间段内 需要聚合推送的name :param push_types: 推送类型 :param times: 字符串类型的格式化时间 :return: """ _cache_name = "{d_cache_name}_normal_types_{push_types}_times_{times}".format( d_cache_name=cls._default_cache_name, push_types=":".join(push_types), times=times ) return _cache_name @classmethod def _get_aggregation_push_abnormal_times_info(cls, push_types, now): """ 获取非正常时间段的缓存name + expire_time :param push_types: :return: """ if now.hour >= cls.end_push_hour: # 22 - 23 点 start_date = now.date() end_date = start_date + datetime.timedelta(days=1) elif now.hour < cls.start_push_hour: # 0 - 8 点 end_date = now.date() start_date = end_date - datetime.timedelta(days=1) _name = "{d_cache_name}_abnormal_types_{push_types}_times_{times}".format( d_cache_name=cls._default_cache_name, push_types=":".join(push_types), times="{}_{}".format( get_datetime_to_str(datetime.datetime.combine(start_date, datetime.time(hour=cls.end_push_hour))), get_datetime_to_str(datetime.datetime.combine(end_date, datetime.time(hour=cls.start_push_hour))) ) ) eta = datetime.datetime.combine(end_date, datetime.time(cls.start_push_hour)) return { "cache_name": _name, "eta": eta, } # < ----------- 用户 缓存名 获取 end ------------ > @classmethod def _create_cache(cls, cache_name): """ 缓存创建 :param cache_name: 缓存key :return: """ if not push_cache.exists(cache_name): push_cache.hincrby(cache_name, cls._default_cache_name) push_cache.expire(cache_name, hour * 26) # 设置超时时间 cls._trigger_task_switch(cache_name) # 触发异步任务开关 加锁 return True return False # < ----------- 用户 聚合推送 缓存创建 + 异步任务触发 begin ------------ > @staticmethod def _trigger_task_switch(cache_name): """ 触发异步任务开关 :return: """ _cache_name = "{}_task_switch".format(cache_name) # 任务触发开关,缓存名 区分下 return push_cache.set(_cache_name, 1, ex=60, nx=True) @classmethod def _trigger_task(cls, cache_name, eta, unit_time, **kwargs): """ 异步任务触发 只会触发有效时间内的任务 :param cache_name: 缓存名字 :param eta: 延迟时间 :return: """ sole_sign = kwargs.get("sole_sign", "") from communal.tasks import aggregation_push_trigger_task task_ = aggregation_push_trigger_task.apply_async( args=( cache_name, cls._default_cache_name, unit_time, sole_sign, ), eta=eta - datetime.timedelta(hours=8) # utc时间 ε=(´ο`*)))唉 ) return task_.id @classmethod def get_cache_name_and_trigger_task(cls, push_types, now, unit_time=60 * 60, **kwargs): """ 获取缓存name + 触发任务 :return: """ sole_sign = kwargs.get("sole_sign", "") if cls.in_push_limit_time(now): # 在正常推送时间内 9 - 22小时 hour_str = (cls.current_times(now) or {}).get("hour_str", "") _cache_name = cls._get_aggregation_push_normal_time_cache_name(push_types, hour_str) eta = cls.expire_time(unit_time=unit_time, need_datetime=True) else: # 时间段问题 今日22时 - 次日8时内数据 由定时任务在9点触发 _abnormal_info = cls._get_aggregation_push_abnormal_times_info( push_types=push_types, now=now ) _cache_name = _abnormal_info.get("cache_name", "") eta = _abnormal_info.get("eta", None) _status = cls._create_cache(_cache_name) # 检测创建状态 # 业务逻辑触发异步任务 if _status and cls._trigger_task_switch(_cache_name): task_id = cls._trigger_task( _cache_name, eta=eta, unit_time=unit_time, sole_sign=sole_sign ) need_trigger_task = True else: task_id = "" need_trigger_task = False push_logger.info(json.dumps({ "aggregation_step": 4, "sole_sign": sole_sign, "subordinate": "aggregation_push", "resume": "Aggregation Push. get_cache_name_and_trigger_task", "push_types": push_types, "cache_name": _cache_name, "now": now.strftime("%Y%m%d %H:%M:%S"), "need_trigger_task": need_trigger_task, "Triggered task": { "task_id": task_id, "parameters": { "cache_name": _cache_name, "default_cache_key": cls._default_cache_name, "unit_time": unit_time, }, "eta": eta and eta.strftime("%Y%m%d %H:%M:%S") or eta, }, })) return _cache_name # < ----------- 用户 聚合推送 缓存创建 + 异步任务触发 end ------------ > # < ----------- 用户 聚合推送数据汇总 begin ------------ > @classmethod def aggregation_push_data(cls, user_ids, action_type, **kwargs): """ 聚合推送信息 把需要聚合的数据写到redis中 hash 操作 name 精确到小时的时间 key use_id value jsonstring {push_action_type: [source_id, source_id]} ---> {推送类型: [对应表索引id,]} --> eg: {回答点赞:[AnswerVote.id]} :param user_ids: :param action_type: :param kwargs: :return: """ _source_id = kwargs.get("source_id", 0) now = kwargs.pop("now", cls.get_now_datetime()) # 当前时间 sole_sign = kwargs.get("sole_sign", "") # 唯一标识 push_logger.info(json.dumps({ "aggregation_step": 2, "sole_sign": sole_sign, "resume": "Aggregation Push. Trigger parameters.", "now": now.strftime("%Y%m%d %H:%M:%S"), "user_ids": user_ids, "action_type": action_type, "other_params": kwargs, })) if all([user_ids, action_type, _source_id]): # 获取异步任务触发时间 delay_time = hour * 1 _push_types = [] for item in PushEvents.aggregation_push_rule.values(): aggregation_push_types = sorted(item.get("aggregation_push_types", [])) # 这里做了一次排序!!! if action_type in aggregation_push_types: delay_time = item.get("delay_times", hour * 1) _push_types = aggregation_push_types break push_logger.info(json.dumps({ "aggregation_step": 3, "sole_sign": sole_sign, "resume": "Aggregation Push. action_type conditions satisfied.", "action_type": action_type, "aggregation_push_types": _push_types, "delay_times": delay_time, })) # 获取缓存名 _cache_name = cls.get_cache_name_and_trigger_task( push_types=_push_types, now=now, unit_time=delay_time, sole_sign=sole_sign # 这是个日志的标识,没多大意义 ) # 聚合数据 处理 for user_ids in big_data_iter(user_ids, fetch_num=200): vs = push_cache.hmget(_cache_name, user_ids) write_to_cache_dic = {} for user_id, v in zip(user_ids, vs): if v: action_type_record_dic = json.loads(v) # 从缓存中获取 else: action_type_record_dic = {} if action_type not in action_type_record_dic: action_type_record_dic[action_type] = [] action_type_record_dic[action_type].append(_source_id) write_to_cache_dic[user_id] = json.dumps(action_type_record_dic) # 写入缓存 push_cache.hmset(_cache_name, write_to_cache_dic) push_logger.info(json.dumps({ "aggregation_step": 5, "sole_sign": sole_sign, "resume": "Aggregation Push. write in redis OK.", "cache_name": _cache_name, "action_type": action_type, "user_ids": user_ids, })) # < ----------- 用户 聚合推送数据汇总 end ------------ >