tasks.py 9.01 KB
# -*- coding: UTF-8 -*-
import logging
import traceback
import json
import pymysql
import threading
import random
import datetime
from celery import shared_task
from django.conf import settings
from django.core import serializers
from trans2es.type_info import get_type_info_map
# from rpc.all import get_rpc_remote_invoker
from libs.es import ESPerform
from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar, UserSimilarScore
from linucb.utils.register_user_tag import RegisterUserTag
from injection.data_sync.get_session import get_comments, click, login, reply
from injection.data_sync.get_session import host, user, passwd, db

exec_count_click = 0
exec_count = 0


@shared_task
def write_to_es(es_type, pk_list, use_batch_query_set=False):
    try:
        pk_list = list(frozenset(pk_list))

        if es_type == "register_user_tag":
            RegisterUserTag.get_register_user_tag(pk_list)
        elif es_type == "attention_user_tag":
            RegisterUserTag.get_user_attention_tag(pk_list)
        else:
            type_info_map = get_type_info_map()
            type_info = type_info_map[es_type]

            logging.info("consume es_type:%s" % str(es_type))
            type_info.insert_table_by_pk_list(
                sub_index_name=es_type,
                pk_list=pk_list,
                use_batch_query_set=use_batch_query_set,
                es=ESPerform.get_cli()
            )
    except:
        logging.error("catch exception,err_msg:%s" % traceback.format_exc())


@shared_task
def sync_face_similar_data_to_redis():
    try:
        result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False).distinct().values(
            "participant_user_id").values_list("participant_user_id", flat=True)

        logging.info("begin sync_face_similar_data_to_redis!")

        redis_key_prefix = "physical:user_similar:participant_user_id:"
        for participant_user_id in result_items:

            redis_key = redis_key_prefix + str(participant_user_id)
            similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False,
                                                                          participant_user_id=participant_user_id,
                                                                          similarity__gt=0.3).order_by(
                "-similarity")

            item_list = list()
            for item in similar_result_items:
                item_list.append(
                    {
                        "contrast_user_id": item.contrast_user_id,
                        "similarity": item.similarity
                    }
                )
            redis_client.set(redis_key, json.dumps(item_list))

            logging.info("participant_user_id:%d set data done!" % participant_user_id)
    except:
        logging.error("catch exception,err_msg:%s" % traceback.format_exc())


@shared_task
def sync_user_similar_score():
    try:
        results_items = UserSimilarScore.objects.filter(is_deleted=False).distinct().values("user_id").values_list(
            "user_id", flat=True)
        redis_key_prefix = "physical:user_similar_score:user_id:"

        logging.info("begin sync user_similar_score!")
        for user_id in results_items:
            redis_key = redis_key_prefix + str(user_id)
            similar_results_items = UserSimilarScore.objects.filter(is_deleted=False, user_id=user_id).order_by(
                "-score")

            item_list = list()
            for item in similar_results_items:
                contrast_user_id = item.contrast_user_id
                score = item.score
                item_list.append(
                    [contrast_user_id, score]
                )

            redis_client.set(redis_key, json.dumps(item_list))
    except:
        logging.error("catch exception,err_msg:%s" % traceback.format_exc())


def auto_click(pk_list):
    logging.info("get--------auto_click--------------:%s" % pk_list)
    try:
        now = datetime.datetime.now()
        yes_time_str = now.strftime('%Y-%m-%d')
        pc = pymysql.connect(host=host, user=user, passwd=passwd, db=db, port=3306)
        cursor = pc.cursor()
        topic_id_list = []
        if len(pk_list) > 0:
            if len(pk_list) == 1:
                cursor.execute(
                    "SELECT id FROM topic WHERE id = " + str(
                        pk_list[
                            0]) + "  and user_id in (select user_id from user_extra  where is_shadow = 0) and create_time LIKE '%%%%%s%%%%'" % yes_time_str)

            else:
                cursor.execute(
                    "SELECT id FROM topic WHERE id  in  " + str(
                        tuple(
                            pk_list)) + "  and user_id in (select user_id from user_extra  where is_shadow = 0 ) and create_time LIKE '%%%%%s%%%%'" % yes_time_str)
            data = cursor.fetchall()
            topic_id_list = list(data)
            logging.info("Database version : %s " % topic_id_list)
            pc.close()

        if topic_id_list:
            try:
                def fun_timer():
                    cookies = login()
                    if cookies:
                        logging.info("get topic_id_list:%s" % topic_id_list)
                        click(cookies, topic_id_list[0][0])

                    global timer
                    global exec_count_click

                    exec_count_click += 1
                    if exec_count_click == 1:
                        logging.info("----------2-----------")
                        # sleep_time = random.randint(300, 540)
                        sleep_time = 30
                        timer = threading.Timer(sleep_time, fun_timer)
                        timer.start()

                    if exec_count_click == 2:
                        # sleep_time = random.randint(1000, 1900)
                        logging.info("----------3-----------")
                        sleep_time = 50
                        timer = threading.Timer(sleep_time, fun_timer)
                        timer.start()

                    if exec_count_click > 2:
                        pass


                sleep_time = random.randint(300, 540)
                logging.info("----------1-----------")
                timer = threading.Timer(10, fun_timer)  # 首次启动
                timer.start()
            except:
                logging.error("catch exception,main:%s" % traceback.format_exc())
        else:
            pass
    except:
        logging.error("catch exception,main:%s" % traceback.format_exc())


def auto_reply(pk_list):
    logging.info("get----------------------:%s" % pk_list)

    exec_count = 0
    try:
        now = datetime.datetime.now()
        yes_time_str = now.strftime('%Y-%m-%d')
        pc = pymysql.connect(host=host, user=user, passwd=passwd, db=db, port=3306)
        cursor = pc.cursor()
        topic_id_list = []
        try:
            if len(pk_list) > 0:
                if len(pk_list) == 1:
                    cursor.execute(
                        "SELECT id FROM topic WHERE id = " + str(  #
                            pk_list[
                                0]) + " and user_id in (select user_id from user_extra  where is_shadow = 0) and create_time LIKE '%%%%%s%%%%' " % yes_time_str)

                else:
                    cursor.execute(
                        "SELECT id FROM topic WHERE id in " + str(tuple(  # and create_time LIKE '%%%%%s%%%%'
                            pk_list)) + " and user_id in (select user_id from user_extra  where is_shadow = 0) and and create_time LIKE '%%%%%s%%%%'"%yes_time_str)

                data = cursor.fetchall()
                topic_id_list = list(data)
                logging.info("Database version : %s " % topic_id_list)
                pc.close()

        except:
            pass
        if topic_id_list:
            try:
                def fun_comment():
                    cookies = login()
                    if cookies:
                        comment_content = get_comments()
                        comment = comment_content[0]
                        reply(cookies, topic_id_list[0][0], comment)

                    global timer
                    global exec_count

                    exec_count += 1
                    if exec_count == 1:
                        sleep_time = random.randint(300, 540)
                        sleep_time = 30
                        timer = threading.Timer(sleep_time, fun_comment)
                        timer.start()

                    if exec_count == 2:
                        sleep_time = random.randint(1000, 1900)
                        sleep_time = 30
                        timer = threading.Timer(sleep_time, fun_comment)
                        timer.start()

                sleep_time = random.randint(300, 540)
                timer = threading.Timer(10, fun_comment)  # 首次启动
                timer.start()

            except:
                logging.error("catch exception,main:%s" % traceback.format_exc())

    except:
        logging.error("catch exception,main:%s" % traceback.format_exc())