# -*- coding: UTF-8 -*- import pymysql import pandas as pd import datetime import gevent from gevent import monkey import requests import time # # def get_jerry_test(): # db = pymysql.connect(host="bj-cdb-6slgqwlc.sql.tencentcdb.com", # port=62120, # user="work", # passwd="Gengmei1", # db="jerry_test", # charset="utf8") # return db, db.cursor() def get_jerry_test(): db = pymysql.connect(host="172.16.40.170", port=4000, user="st_user", passwd="aqpuBLYzEV7tML5RPsN1pntUzFy", db="jerry_test", charset="utf8") return db, db.cursor() def get_user_portrait_log(cl_id): try: sql = """select cl_id,event_cn, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions, projects from kafka_tag3_log where cl_id = '{}' """.format(cl_id) db, cursor = get_jerry_test() cursor.execute(sql) data = list(cursor.fetchall()) db.close() cursor.close() return data except Exception as e: print(e) return None def user_portrait_action_statistic(cl_id): # try: ud_dict = {} user_df = get_user_portrait_log(cl_id) for ud in user_df: # print("ud", ud) ud2_list = ud[2].split(",") ud3_list = ud[3].split(",") ud4_list = ud[4].split(",") ud5_list = ud[5].split(",") ud6_list = ud[6].split(",") ud7_list = ud[7].split(",") ud8_list = ud[8].split(",") if ud[0] in ud_dict.keys(): if ud[1] in ud_dict[ud[0]].keys(): ud_dict[ud[0]][ud[1]]["first_solutions"].extend(ud2_list) ud_dict[ud[0]][ud[1]]["second_solutions"].extend(ud3_list) ud_dict[ud[0]][ud[1]]["first_demands"].extend(ud4_list) ud_dict[ud[0]][ud[1]]["second_demands"].extend(ud5_list) ud_dict[ud[0]][ud[1]]["first_positions"].extend(ud6_list) ud_dict[ud[0]][ud[1]]["second_positions"].extend(ud7_list) ud_dict[ud[0]][ud[1]]["projects"].extend(ud8_list) else: ud_dict[ud[0]][ud[1]] = {} ud_dict[ud[0]][ud[1]] = {"first_solutions": ud2_list, "second_solutions": ud3_list, "first_demands": ud4_list, "second_demands": ud5_list, "first_positions": ud6_list, "second_positions": ud7_list, "projects": ud8_list} else: ud_dict[ud[0]] = {} ud_dict[ud[0]][ud[1]] = {"first_solutions": ud2_list,"second_solutions": ud3_list, "first_demands": ud4_list,"second_demands": ud5_list, "first_positions": ud6_list,"second_positions": ud7_list, "projects": ud8_list} return ud_dict # except Exception as e: # print('user_portrait_action_statistic error') # print(e) def write_user_portrait_action_divided(cl_id,event_cn,first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions, projects): try: today = datetime.date.today() oneday = datetime.timedelta(days=1) yesterday = today - oneday sql = """insert into user_portrait_action_divided values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}','{}')""".format( yesterday, cl_id,event_cn, ",".join(first_solutions), ",".join(second_solutions), ",".join(first_demands), ",".join(second_demands), ",".join(first_positions), ",".join(second_positions), ",".join(projects)) db, cursor = get_jerry_test() cursor.execute(sql) db.commit() db.close() cursor.close() except Exception as e: print("write_user_portrait_action_divided error") print(e) def get_user_portrait_action_divided(cl_ids): first_solutions = [] first_positions = [] first_demands = [] second_demands = [] second_positions = [] second_solutions = [] projects = [] data = user_portrait_action_statistic(cl_ids) for key, values in data.items(): cl_id = key for item in values.keys(): if values[item]: first_solutions = values[item].get("first_solutions", []) first_positions = values[item].get("first_positions", []) first_demands = values[item].get("first_demands", []) second_demands = values[item].get("second_demands", []) second_positions = values[item].get("second_positions", []) second_solutions = values[item].get("second_solutions", []) projects = values[item].get("projects", []) first_solutions = [i for i in first_solutions if i!=''] second_solutions = [i for i in second_solutions if i != ''] first_demands = [i for i in first_demands if i != ''] second_demands = [i for i in second_demands if i != ''] first_positions = [i for i in first_positions if i != ''] second_positions = [i for i in second_positions if i != ''] projects = [i for i in projects if i!=''] write_user_portrait_action_divided(cl_id=cl_id, event_cn=item, first_solutions=set(first_solutions), first_positions=set(first_positions), first_demands=set(first_demands), second_demands=set(second_demands), second_positions=set(second_positions), second_solutions=set(second_solutions), projects=set(projects)) def get_jerry_test(): db = pymysql.connect(host="172.16.40.170", port=4000, user="st_user", passwd="aqpuBLYzEV7tML5RPsN1pntUzFy", db="jerry_test", charset="utf8") return db, db.cursor() def connect_doris_table(): db = pymysql.connect(host="172.16.30.136", port=3306, user="doris", password="o5gbA27hXHHm", db="doris_prod", charset="utf8") return db, db.cursor() def get_kafka(cl_id): # try: sql = """select cl_id,event_cn, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions, projects from kafka_tag3_log where cl_id = '{}' """.format(cl_id) db, cursor = get_jerry_test() cursor.execute(sql) data = list(cursor.fetchall()) db.close() cursor.close() return data def get_portrait(cl_id): # try: sql = """select cl_id,event_cn, first_solutions, second_solutions, first_demands, second_demands, first_positions, second_positions, projects from user_portrait_action_divided where cl_id = '{}' """.format(cl_id) db, cursor = connect_doris_table() cursor.execute(sql) data = list(cursor.fetchall()) db.close() cursor.close() return data def get_portrait_device_ids_lst(): sql = "select cl_id from user_portrait_action_divided limit 10" db, cursor = connect_doris_table() cursor.execute(sql) device_ids_lst = [i[0] for i in cursor.fetchall()] db.close() cursor.close() return device_ids_lst def show_kafka_and_portrait(): try: device_ids_lst = get_portrait_device_ids_lst() for cl_id in device_ids_lst: print('*' * 100) kafka_data = get_kafka(cl_id) doris_data = get_portrait(cl_id) for i,val in enumerate(kafka_data): try: print('kafka:',val) print('portrait:',doris_data[i]) # print('*' * 100) except: pass except Exception as e: print(e) return None def get_jerry_test(): # try: db = pymysql.connect(host="172.16.40.170", port=4000, user="st_user", passwd="aqpuBLYzEV7tML5RPsN1pntUzFy", db="jerry_test", charset="utf8") print('connection stauts 1:',connect.open) connect.close() connect.ping(reconnect=True) print('connection stauts 2:',connect.open) return db, db.cursor() # except Exception as e: # print(e) # return None if __name__ == '__main__': show_kafka_and_portrait() # cl_ids = [''] # get_user_portrait_action_divided(cl_ids)