# -*- coding: UTF-8 -*- import logging import traceback import json import pymysql import threading import random import datetime from celery import shared_task from django.conf import settings from django.core import serializers from trans2es.type_info import get_type_info_map # from rpc.all import get_rpc_remote_invoker from libs.es import ESPerform from libs.cache import redis_client from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar, UserSimilarScore from linucb.utils.register_user_tag import RegisterUserTag from injection.data_sync.get_session import get_comments, click, login, reply from injection.data_sync.get_session import host, user, passwd, db @shared_task def write_to_es(es_type, pk_list, use_batch_query_set=False): try: pk_list = list(frozenset(pk_list)) if es_type == "register_user_tag": RegisterUserTag.get_register_user_tag(pk_list) elif es_type == "attention_user_tag": RegisterUserTag.get_user_attention_tag(pk_list) else: type_info_map = get_type_info_map() type_info = type_info_map[es_type] logging.info("consume es_type:%s" % str(es_type)) type_info.insert_table_by_pk_list( sub_index_name=es_type, pk_list=pk_list, use_batch_query_set=use_batch_query_set, es=ESPerform.get_cli() ) auto_click(pk_list) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) @shared_task def sync_face_similar_data_to_redis(): try: result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False).distinct().values( "participant_user_id").values_list("participant_user_id", flat=True) logging.info("begin sync_face_similar_data_to_redis!") redis_key_prefix = "physical:user_similar:participant_user_id:" for participant_user_id in result_items: redis_key = redis_key_prefix + str(participant_user_id) similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False, participant_user_id=participant_user_id, similarity__gt=0.3).order_by( "-similarity") item_list = list() for item in similar_result_items: item_list.append( { "contrast_user_id": item.contrast_user_id, "similarity": item.similarity } ) redis_client.set(redis_key, json.dumps(item_list)) logging.info("participant_user_id:%d set data done!" % participant_user_id) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) @shared_task def sync_user_similar_score(): try: results_items = UserSimilarScore.objects.filter(is_deleted=False).distinct().values("user_id").values_list( "user_id", flat=True) redis_key_prefix = "physical:user_similar_score:user_id:" logging.info("begin sync user_similar_score!") for user_id in results_items: redis_key = redis_key_prefix + str(user_id) similar_results_items = UserSimilarScore.objects.filter(is_deleted=False, user_id=user_id).order_by( "-score") item_list = list() for item in similar_results_items: contrast_user_id = item.contrast_user_id score = item.score item_list.append( [contrast_user_id, score] ) redis_client.set(redis_key, json.dumps(item_list)) except: logging.error("catch exception,err_msg:%s" % traceback.format_exc()) def auto_click(pk_list): logging.info("get--------auto_click--------------:%s" % pk_list) exec_count = 0 try: now = datetime.datetime.now() yes_time_str = now.strftime('%Y-%m-%d') pc = pymysql.connect(host=host, user=user, passwd=passwd, db=db, port=3306) cursor = pc.cursor() cursor.execute( "SELECT id,user_id FROM topic WHERE id in " + str( tuple( pk_list)) + " and user_id in (select user_id from user_extra where is_shadow = 0) and create_time LIKE '%%%%%s%%%%' " % yes_time_str) data = cursor.fetchall() topic_id_list = list(data) logging.info("Database version : %s " % topic_id_list) pc.close() exec_count = 0 if topic_id_list: try: def fun_timer(): cookies = login() if cookies: click(cookies, topic_id_list[0]) global timer global exec_count exec_count += 1 if exec_count == 1: logging.info("----------2-----------") # sleep_time = random.randint(300, 540) sleep_time = 30 timer = threading.Timer(sleep_time, fun_timer) timer.start() if exec_count == 2: # sleep_time = random.randint(1000, 1900) logging.info("----------3-----------") sleep_time = 50 timer = threading.Timer(sleep_time, fun_timer) timer.start() sleep_time = random.randint(300, 540) logging.info("----------1-----------") timer = threading.Timer(10, fun_timer) # 首次启动 timer.start() except: logging.error("catch exception,main:%s" % traceback.format_exc()) else: pass except: logging.error("catch exception,main:%s" % traceback.format_exc()) def auto_reply(pk_list): logging.info("get----------------------:%s" % pk_list) exec_count = 0 try: now = datetime.datetime.now() yes_time_str = now.strftime('%Y-%m-%d') pc = pymysql.connect(host=host, user=user, passwd=passwd, db=db, port=3306) cursor = pc.cursor() cursor.execute( "SELECT id,user_id FROM topic WHERE id in " + str( tuple( pk_list)) + " and user_id in (select user_id from user_extra where is_shadow = 0) and create_time LIKE '%%%%%s%%%%' " % ( yes_time_str)) data = cursor.fetchall() topic_id_list = list(data) logging.info("Database version : %s " % topic_id_list) pc.close() exec_count = 0 if topic_id_list: try: def fun_comment(): cookies = login() if cookies: comment_content = get_comments() comment = comment_content[0] reply(cookies, topic_id_list[0], comment) global timer global exec_count exec_count += 1 if exec_count == 1: sleep_time = random.randint(300, 540) sleep_time = 30 timer = threading.Timer(sleep_time, fun_comment) timer.start() if exec_count == 2: sleep_time = random.randint(1000, 1900) sleep_time = 30 timer = threading.Timer(sleep_time, fun_comment) timer.start() sleep_time = random.randint(300, 540) timer = threading.Timer(10, fun_comment) # 首次启动 timer.start() except: logging.error("catch exception,main:%s" % traceback.format_exc()) except: logging.error("catch exception,main:%s" % traceback.format_exc())