push.py 10.2 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
__title__ = ''
__author__ = 'xierong@gmei.com'
__mtime__ = '17/10/27'
'''

import datetime
from distutils.version import LooseVersion

import pytz
from celery import shared_task
from django.conf import settings
from gm_types.push import PUSH_URGENCY, PERSONAL_PUSH_TYPE, PUSH_INFO_TYPE, AUTOMATED_PUSH

from talos.cache.gaia import push_cache
from talos.services.other import get_user_lastest_device_app_version_by_user_id
from utils.protocol import gm_protocol
from utils.rpc import get_rpc_invoker, rpc_client
from utils.rpc import logging_exception
from talos.logger import info_logger


def tzlc(dt, truncate_to_sec=True):
    if dt is None:
        return None
    if truncate_to_sec:
        dt = dt.replace(microsecond=0)
    return pytz.timezone(settings.TIME_ZONE).localize(dt)


def eta_2_push_time(eta):
    if eta:
        eta = datetime.datetime.strptime(eta, '%Y-%m-%d %H:%M:%S')
        eta = tzlc(eta)
        return int((eta - datetime.datetime.fromtimestamp(0, pytz.timezone("UTC"))).total_seconds())
    else:
        push_time = None
    return push_time


def special_push_limit(user_id, push_type, extra=None):
    now = datetime.datetime.now()
    hour = now.hour
    if hour < 9 or hour >= 22:
        return False

    total_key = "total:{}:{}".format(now.strftime("%Y-%m-%d"), user_id)
    total = push_cache.get(total_key)
    if total and int(total) >= settings.SPECIAL_PUSH_LIMIT:
        return False

    sub_key = [push_type, user_id, ]
    if extra is not None:
        if isinstance(extra, (list, tuple)):
            sub_key += list(extra)
        else:
            sub_key.append(extra)
    sub_key = ":".join(map(str, sub_key))
    pushed = push_cache.get(sub_key)
    if pushed:
        return False

    if total:
        push_cache.incr(total_key)
    else:
        push_cache.setex(total_key, 86400, 1)
    push_cache.setex(sub_key, settings.SPECTAL_SECONDS_LIMIT, True)
    return True


def push_task_to_user_multi(
        user_ids, platform=None, extra=None, alert='', labels=None, eta=None, push_type=None, others=None
):
    #mimas-push全部调用新的push接口
    user_ids_generator = (user_ids[i:i + 128] for i in range(0, len(user_ids), 128))
    for uid_list in user_ids_generator:
        try:
            rpc_client['push2/push/user/automated_push/uids'](
                push_type=push_type,
                user_ids=uid_list,
                platform=platform,
                alert=alert,
                extra=extra,
                push_time=eta_2_push_time(eta),
                urgency=PUSH_URGENCY.NORMAL,
                others=others
            ).unwrap()
        except:
            logging_exception()


def push_task_to_device_ids_multi(
        device_ids,
        platform=None,
        extra=None,
        alert='',
        labels=None,
        others=None,
        push_type=None):
    """根据设备ids 给设备推送信息"""
    device_ids_generator = (device_ids[i:i + 128] for i in range(0, len(device_ids), 128))
    for device_ids_list in device_ids_generator:
        try:
            rpc_client['push2/push/user/device_ids/sp'](
                device_ids=device_ids_list,
                platform=platform,
                alert=alert,
                extra=extra,
                push_time=None,
                silent=False,
                urgency=PUSH_URGENCY.NORMAL,
                labels=labels,
                others=others,
                push_type=push_type,
            ).unwrap()
        except:
            logging_exception()


def limit_push(user_id, sign):
    """
    推送限制,每种类型每天只能推送一次
    :param user_id: 用户id,用于生成唯一redis_name
    :param sign:  推送所属类型,e.g: favor --> 收藏
    :return: True ok,False off;
    """
    sign = sign.strip()
    today = datetime.datetime.today()  # 当前时间
    # 推送时间限制
    hour = today.hour
    if hour < 9 or hour >= 22:
        return False

    name = "{0}_personal_push_{1}".format(user_id, today.strftime("%Y-%m-%d"))

    # 推送类型及条数,类型:fans 粉丝、vote 点赞、favor 收藏、reply 回复、total 总值。
    if sign not in PERSONAL_PUSH_TYPE:
        return False

    if push_cache.exists(name):
        sign_val = push_cache.hget(name, sign)
        total = push_cache.hget(name, "total")
        if int(total) >= settings.SPECIAL_PUSH_LIMIT:
            return False
        if sign_val and int(sign_val) >= settings.PUSH_SUB_LIMIT:
            return False

        push_cache.hincrby(name, sign, amount=1)
        push_cache.hincrby(name, "total", amount=1)
    else:
        # 用于设置失效时间,仅当天有效
        push_cache.hset(name, "total", 1)
        push_cache.hset(name, sign, 1)
        push_cache.expire(name, (today.replace(hour=22, minute=1, second=0) - today).seconds)

    return True


def limit_push_count(user_id, push_type, max_push_count=3, start_hour=9, end_hour=22):
    """
    推送限制,每种类型每天 推送次数限制为max_push_count
    每天只在 start_hour  到 end_hour 之间推送
    """
    today = datetime.datetime.today()  # 当前时间
    # 推送时间限制
    if today.hour < start_hour or today.hour >= end_hour:
        return False

    name = "{0}_personal_push_{1}_{2}_limit_{3}".format(
        user_id, push_type, today.strftime("%Y-%m-%d"), max_push_count)

    if push_cache.exists(name):
        total = push_cache.get(name)
        if int(total) >= max_push_count:
            return False
        push_cache.incr(name, amount=1)
    else:
        # 用于设置失效时间,仅当天有效
        push_cache.set(name, 1)
        push_cache.expire(name, (today.replace(hour=23, minute=59, second=59) - today).seconds)
    return True


def limit_push_by_uids(user_dict):
    '''
    推送限制,多条校验,减少调用push的次数(push 一个请求支持向多个用户发送)
    :param user_dict: {uid:push_type}
    :return: 可以推送的用户列表
    '''
    uid_list = []
    for uid, push_type in user_dict.items():
        if limit_push(uid, push_type):
            uid_list.append(uid)

    return uid_list


def limit_get_comment_push(user_id):
    today = datetime.datetime.today()  # 当前时间
    name = "{0}_personal_comment_push".format(user_id)
    if push_cache.exists(name):
        total = push_cache.get(name)
        if int(total) >= settings.PERSON_GET_COMMENT_PUSH_LIMIT:           #规定的数字
            return False

        push_cache.incr(name, amount=1)
    else:
        # 用于设置失效时间,仅当天有效
        push_cache.set(name, 1)
        push_cache.expire(name, (today.replace(hour=22, minute=1, second=0) - today).seconds)

    return True


def doctor_noti_doctor_ids(
        doctor_ids, platform=None, alert='', extra=None, push_time=None, urgency=None):
    """医生版极光推送临时实现
    目前不实现push_time和urgency
    """
    doctor_ids_generator = (doctor_ids[i:i + 128] for i in range(0, len(doctor_ids), 128))
    for id_list in doctor_ids_generator:
        try:
            rpc_client['push2/push/doctor/notification/doctor_ids'](
                doctor_ids=id_list,
                platform=platform,
                extra=extra,
                alert=alert,
            ).unwrap()
        except:
            logging_exception()


def vote_push(user_id, alert, push_url, is_force=False, push_type=AUTOMATED_PUSH.DIARY_RECEIVED_PRAISE):
    """
    点赞 推送
    :param user_id:
    :param is_force:
    :return:
    """
    now_date = datetime.datetime.now()
    if not 9 <= now_date.hour < 22:  # 超过晚十点灌水任务不执行
        return
    # 经查wiki, push_url和pushUrl里值一致是因为640兼容问题, 传送门:
    # http://wiki.gengmei.cc/pages/viewpage.action?pageId=3354548
    version = get_user_lastest_device_app_version_by_user_id(user_id)
    is_gte_7715_version = LooseVersion(version) >= LooseVersion('7.7.15')
    kwargs = {
        'user_ids': [user_id],
        'platform': ['android', 'iPhone'],
        'extra': {
            'type': PUSH_INFO_TYPE.GM_PROTOCOL,
            'pushUrl': push_url,
            'push_url': push_url,
        },
        'alert': alert,
        'push_type': push_type,
    }
    push_task_to_user_multi(**kwargs)


def old_vote_push(user_id):
    """
    7745已废弃
    老版点赞 推送, 7715版本之前点赞不进行推送, 7715之后推送走消息通知页,  过几个版本可以删除此方法, 用vote_push 代替
    回答、回复的点赞
    :param user_id:
    :param is_force:
    :return:
    """
    # 经查wiki, push_url和pushUrl里值一致是因为640兼容问题, 传送门:
    # http://wiki.gengmei.cc/pages/viewpage.action?pageId=3354548

    if limit_push(user_id=user_id, sign=PERSONAL_PUSH_TYPE.VOTE):
        version = get_user_lastest_device_app_version_by_user_id(user_id)
        is_push = LooseVersion(version) >= LooseVersion('7.7.15')
        if not is_push:
            return
        kwargs = {
            'user_ids': [user_id],
            'platform': ['android', 'iPhone'],
            'extra': {
                'type': PUSH_INFO_TYPE.GM_PROTOCOL,
                'pushUrl': gm_protocol.get_msg_notification_list(),
                'push_url': gm_protocol.get_msg_notification_list(),
            },
            'alert': u'又有人给你点赞啦!快来看看是谁被你迷倒了~!',
            "labels": {'event_type':'push', 'event':'received_votes'},
        }
        push_task_to_user_multi(**kwargs)


def push_time_range_limit():
    """
    推送时间段 限制
    :return:
    """
    now = datetime.datetime.now()
    hour = now.hour
    if hour < 9 or hour >= 22:
        return False
    return True


def send_applet_subscribe_msg(user_id, template_id, data=None, page=None):
    try:
        res = rpc_client['api/wechat/subscribe_msg'](
            user_id=user_id,
            template_id=template_id,
            data=data,
            page=page
        ).unwrap()
        errcode = res.get("errcode", -1)
        if errcode != 0:
            errmsg = res.get("errmsg", '')
            info_logger.info("fail to send subscribe_msg, errcode:{errcode}, errmsg:{errmsg}".format(errcode=errcode, errmsg=errmsg))
        return res
    except:
        logging_exception()