#!/usr/bin/env python # -*- coding: utf-8 -*- ''' __title__ = '' __author__ = 'xierong@gmei.com' __mtime__ = '17/10/27' ''' import datetime from distutils.version import LooseVersion import pytz from celery import shared_task from django.conf import settings from gm_types.push import PUSH_URGENCY, PERSONAL_PUSH_TYPE, PUSH_INFO_TYPE, AUTOMATED_PUSH from talos.cache.gaia import push_cache from talos.services.other import get_user_lastest_device_app_version_by_user_id from utils.protocol import gm_protocol from utils.rpc import get_rpc_invoker, rpc_client from utils.rpc import logging_exception from talos.logger import info_logger def tzlc(dt, truncate_to_sec=True): if dt is None: return None if truncate_to_sec: dt = dt.replace(microsecond=0) return pytz.timezone(settings.TIME_ZONE).localize(dt) def eta_2_push_time(eta): if eta: eta = datetime.datetime.strptime(eta, '%Y-%m-%d %H:%M:%S') eta = tzlc(eta) return int((eta - datetime.datetime.fromtimestamp(0, pytz.timezone("UTC"))).total_seconds()) else: push_time = None return push_time def special_push_limit(user_id, push_type, extra=None): now = datetime.datetime.now() hour = now.hour if hour < 9 or hour >= 22: return False total_key = "total:{}:{}".format(now.strftime("%Y-%m-%d"), user_id) total = push_cache.get(total_key) if total and int(total) >= settings.SPECIAL_PUSH_LIMIT: return False sub_key = [push_type, user_id, ] if extra is not None: if isinstance(extra, (list, tuple)): sub_key += list(extra) else: sub_key.append(extra) sub_key = ":".join(map(str, sub_key)) pushed = push_cache.get(sub_key) if pushed: return False if total: push_cache.incr(total_key) else: push_cache.setex(total_key, 86400, 1) push_cache.setex(sub_key, settings.SPECTAL_SECONDS_LIMIT, True) return True def push_task_to_user_multi( user_ids, platform=None, extra=None, alert='', labels=None, eta=None, push_type=None, others=None ): #mimas-push全部调用新的push接口 user_ids_generator = (user_ids[i:i + 128] for i in range(0, len(user_ids), 128)) for uid_list in user_ids_generator: try: rpc_client['push2/push/user/automated_push/uids']( push_type=push_type, user_ids=uid_list, platform=platform, alert=alert, extra=extra, push_time=eta_2_push_time(eta), urgency=PUSH_URGENCY.NORMAL, others=others ).unwrap() except: logging_exception() def push_task_to_device_ids_multi( device_ids, platform=None, extra=None, alert='', labels=None, others=None, push_type=None): """根据设备ids 给设备推送信息""" device_ids_generator = (device_ids[i:i + 128] for i in range(0, len(device_ids), 128)) for device_ids_list in device_ids_generator: try: rpc_client['push2/push/user/device_ids/sp']( device_ids=device_ids_list, platform=platform, alert=alert, extra=extra, push_time=None, silent=False, urgency=PUSH_URGENCY.NORMAL, labels=labels, others=others, push_type=push_type, ).unwrap() except: logging_exception() def limit_push(user_id, sign): """ 推送限制,每种类型每天只能推送一次 :param user_id: 用户id,用于生成唯一redis_name :param sign: 推送所属类型,e.g: favor --> 收藏 :return: True ok,False off; """ sign = sign.strip() today = datetime.datetime.today() # 当前时间 # 推送时间限制 hour = today.hour if hour < 9 or hour >= 22: return False name = "{0}_personal_push_{1}".format(user_id, today.strftime("%Y-%m-%d")) # 推送类型及条数,类型:fans 粉丝、vote 点赞、favor 收藏、reply 回复、total 总值。 if sign not in PERSONAL_PUSH_TYPE: return False if push_cache.exists(name): sign_val = push_cache.hget(name, sign) total = push_cache.hget(name, "total") if int(total) >= settings.SPECIAL_PUSH_LIMIT: return False if sign_val and int(sign_val) >= settings.PUSH_SUB_LIMIT: return False push_cache.hincrby(name, sign, amount=1) push_cache.hincrby(name, "total", amount=1) else: # 用于设置失效时间,仅当天有效 push_cache.hset(name, "total", 1) push_cache.hset(name, sign, 1) push_cache.expire(name, (today.replace(hour=22, minute=1, second=0) - today).seconds) return True def limit_push_count(user_id, push_type, max_push_count=3, start_hour=9, end_hour=22): """ 推送限制,每种类型每天 推送次数限制为max_push_count 每天只在 start_hour 到 end_hour 之间推送 """ today = datetime.datetime.today() # 当前时间 # 推送时间限制 if today.hour < start_hour or today.hour >= end_hour: return False name = "{0}_personal_push_{1}_{2}_limit_{3}".format( user_id, push_type, today.strftime("%Y-%m-%d"), max_push_count) if push_cache.exists(name): total = push_cache.get(name) if int(total) >= max_push_count: return False push_cache.incr(name, amount=1) else: # 用于设置失效时间,仅当天有效 push_cache.set(name, 1) push_cache.expire(name, (today.replace(hour=23, minute=59, second=59) - today).seconds) return True def limit_push_by_uids(user_dict): ''' 推送限制,多条校验,减少调用push的次数(push 一个请求支持向多个用户发送) :param user_dict: {uid:push_type} :return: 可以推送的用户列表 ''' uid_list = [] for uid, push_type in user_dict.items(): if limit_push(uid, push_type): uid_list.append(uid) return uid_list def limit_get_comment_push(user_id): today = datetime.datetime.today() # 当前时间 name = "{0}_personal_comment_push".format(user_id) if push_cache.exists(name): total = push_cache.get(name) if int(total) >= settings.PERSON_GET_COMMENT_PUSH_LIMIT: #规定的数字 return False push_cache.incr(name, amount=1) else: # 用于设置失效时间,仅当天有效 push_cache.set(name, 1) push_cache.expire(name, (today.replace(hour=22, minute=1, second=0) - today).seconds) return True def doctor_noti_doctor_ids( doctor_ids, platform=None, alert='', extra=None, push_time=None, urgency=None): """医生版极光推送临时实现 目前不实现push_time和urgency """ doctor_ids_generator = (doctor_ids[i:i + 128] for i in range(0, len(doctor_ids), 128)) for id_list in doctor_ids_generator: try: rpc_client['push2/push/doctor/notification/doctor_ids']( doctor_ids=id_list, platform=platform, extra=extra, alert=alert, ).unwrap() except: logging_exception() def vote_push(user_id, alert, push_url, is_force=False, push_type=AUTOMATED_PUSH.DIARY_RECEIVED_PRAISE): """ 点赞 推送 :param user_id: :param is_force: :return: """ now_date = datetime.datetime.now() if not 9 <= now_date.hour < 22: # 超过晚十点灌水任务不执行 return # 经查wiki, push_url和pushUrl里值一致是因为640兼容问题, 传送门: # http://wiki.gengmei.cc/pages/viewpage.action?pageId=3354548 version = get_user_lastest_device_app_version_by_user_id(user_id) is_gte_7715_version = LooseVersion(version) >= LooseVersion('7.7.15') kwargs = { 'user_ids': [user_id], 'platform': ['android', 'iPhone'], 'extra': { 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'pushUrl': push_url, 'push_url': push_url, }, 'alert': alert, 'push_type': push_type, } push_task_to_user_multi(**kwargs) def old_vote_push(user_id): """ 7745已废弃 老版点赞 推送, 7715版本之前点赞不进行推送, 7715之后推送走消息通知页, 过几个版本可以删除此方法, 用vote_push 代替 回答、回复的点赞 :param user_id: :param is_force: :return: """ # 经查wiki, push_url和pushUrl里值一致是因为640兼容问题, 传送门: # http://wiki.gengmei.cc/pages/viewpage.action?pageId=3354548 if limit_push(user_id=user_id, sign=PERSONAL_PUSH_TYPE.VOTE): version = get_user_lastest_device_app_version_by_user_id(user_id) is_push = LooseVersion(version) >= LooseVersion('7.7.15') if not is_push: return kwargs = { 'user_ids': [user_id], 'platform': ['android', 'iPhone'], 'extra': { 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'pushUrl': gm_protocol.get_msg_notification_list(), 'push_url': gm_protocol.get_msg_notification_list(), }, 'alert': u'又有人给你点赞啦!快来看看是谁被你迷倒了~!', "labels": {'event_type':'push', 'event':'received_votes'}, } push_task_to_user_multi(**kwargs) def push_time_range_limit(): """ 推送时间段 限制 :return: """ now = datetime.datetime.now() hour = now.hour if hour < 9 or hour >= 22: return False return True def send_applet_subscribe_msg(user_id, template_id, data=None, page=None): try: res = rpc_client['api/wechat/subscribe_msg']( user_id=user_id, template_id=template_id, data=data, page=page ).unwrap() errcode = res.get("errcode", -1) if errcode != 0: errmsg = res.get("errmsg", '') info_logger.info("fail to send subscribe_msg, errcode:{errcode}, errmsg:{errmsg}".format(errcode=errcode, errmsg=errmsg)) return res except: logging_exception()