Commit 56a62ef2 authored by 任婷婷's avatar 任婷婷

add write user_portrait_action_divided table

parent 08cd43e0
...@@ -8,7 +8,7 @@ import redis ...@@ -8,7 +8,7 @@ import redis
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait) from tool import (get_jerry_test, get_tag3_user_log, send_email, write_user_portrait,get_user_portrait_log,user_portrait_action_statistic,write_user_portrait_action_divided)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}]
...@@ -28,7 +28,7 @@ def make_dict_from_pair(x): ...@@ -28,7 +28,7 @@ def make_dict_from_pair(x):
def update_tag3_user_portrait(cl_id): def update_tag3_user_portrait(cl_id):
user_df = get_tag3_user_log(cl_id) user_df = sf(cl_id)
if not user_df.empty: if not user_df.empty:
user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["first_solutions"] = list(zip(user_df["first_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
user_df["second_solutions"] = list(zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"])) user_df["second_solutions"] = list(zip(user_df["second_solutions"].apply(lambda x: x.split(",")), user_df["tag_score"]))
...@@ -101,6 +101,46 @@ def update_tag3_user_portrait(cl_id): ...@@ -101,6 +101,46 @@ def update_tag3_user_portrait(cl_id):
",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
",".join(projects_score.keys())) ",".join(projects_score.keys()))
first_solutions = []
first_positions = []
first_demands = []
second_demands = []
second_positions = []
second_solutions = []
projects = []
# cl_ids = [1, 3]
data = user_portrait_action_statistic(cl_ids)
for key, values in data.items():
cl_id = key
print("val", values.keys())
for item in values.keys():
if values[item]:
first_solutions = values[item].get("first_solutions", [])
first_positions = values[item].get("first_positions", [])
first_demands = values[item].get("first_demands", [])
second_demands = values[item].get("second_demands", [])
second_positions = values[item].get("second_positions", [])
second_solutions = values[item].get("second_solutions", [])
projects = values[item].get("projects", [])
first_solutions = [i for i in first_solutions if i != '']
second_solutions = [i for i in second_solutions if i != '']
first_demands = [i for i in first_demands if i != '']
second_demands = [i for i in second_demands if i != '']
first_positions = [i for i in first_positions if i != '']
second_positions = [i for i in second_positions if i != '']
projects = [i for i in projects if i != '']
write_user_portrait_action_divided(cl_id=cl_id, event_cn=item, first_solutions=set(first_solutions),
first_positions=set(first_positions),
first_demands=set(first_demands),
second_demands=set(second_demands),
second_positions=set(second_positions),
second_solutions=set(second_solutions),
projects=set(projects))
return cl_id return cl_id
......
import pymysql
import pandas as pd
import datetime
def get_jerry_test():
db = pymysql.connect(host="bj-cdb-6slgqwlc.sql.tencentcdb.com",
port=62120,
user="work",
passwd="Gengmei1",
db="jerry_test",
charset="utf8")
return db, db.cursor()
def get_user_portrait_log(cl_id):
try:
sql = """select cl_id,event_cn, first_solutions, second_solutions, first_demands,
second_demands, first_positions, second_positions, projects
from kafka_tag3_log where cl_id in {} """.format(tuple(cl_id))
print("sql", sql)
db, cursor = get_jerry_test()
cursor.execute(sql)
data = list(cursor.fetchall())
db.close()
cursor.close()
return data
except Exception as e:
print(e)
return None
def user_portrait_action_statistic(cl_id):
try:
ud_dict = {}
user_df = get_user_portrait_log(cl_id)
for ud in user_df:
print("ud", ud)
ud2_list = ud[2].split(",")
ud3_list = ud[3].split(",")
ud4_list = ud[4].split(",")
ud5_list = ud[5].split(",")
ud6_list = ud[6].split(",")
ud7_list = ud[7].split(",")
ud8_list = ud[8].split(",")
if ud[0] in ud_dict.keys():
ud_dict[ud[0]][ud[1]]["first_solutions"].extend(ud2_list)
ud_dict[ud[0]][ud[1]]["second_solutions"].extend(ud3_list)
ud_dict[ud[0]][ud[1]]["first_demands"].extend(ud4_list)
ud_dict[ud[0]][ud[1]]["second_demands"].extend(ud5_list)
ud_dict[ud[0]][ud[1]]["first_positions"].extend(ud6_list)
ud_dict[ud[0]][ud[1]]["second_positions"].extend(ud7_list)
ud_dict[ud[0]][ud[1]]["projects"].extend(ud8_list)
else:
ud_dict[ud[0]] = {}
ud_dict[ud[0]][ud[1]] = {"first_solutions": ud2_list,"second_solutions": ud3_list,
"first_demands": ud4_list,"second_demands": ud5_list,
"first_positions": ud6_list,"second_positions": ud7_list,
"projects": ud8_list}
return ud_dict
except Exception as e:
print('user_portrait_action_statistic error')
print(e)
def write_user_portrait_action_divided(cl_id,event_cn,first_solutions, second_solutions, first_demands, second_demands,
first_positions, second_positions, projects):
try:
today = datetime.date.today()
oneday = datetime.timedelta(days=1)
yesterday = today - oneday
sql = """insert into user_portrait_action_divided values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}','{}')""".format(
yesterday, cl_id,event_cn, ",".join(first_solutions), ",".join(second_solutions), ",".join(first_demands),
",".join(second_demands), ",".join(first_positions), ",".join(second_positions), ",".join(projects))
db, cursor = get_jerry_test()
cursor.execute(sql)
db.commit()
db.close()
cursor.close()
except Exception as e:
print("write_user_portrait_action_divided error")
print(e)
if __name__ == '__main__':
# def get_user_portrait_action_divided(cl_ids):
first_solutions = []
first_positions = []
first_demands = []
second_demands = []
second_positions = []
second_solutions = []
projects = []
# cl_ids = [1, 3]
data = user_portrait_action_statistic(cl_ids)
for key, values in data.items():
cl_id = key
print("val", values.keys())
for item in values.keys():
if values[item]:
first_solutions = values[item].get("first_solutions", [])
first_positions = values[item].get("first_positions", [])
first_demands = values[item].get("first_demands", [])
second_demands = values[item].get("second_demands", [])
second_positions = values[item].get("second_positions", [])
second_solutions = values[item].get("second_solutions", [])
projects = values[item].get("projects", [])
first_solutions = [i for i in first_solutions if i!='']
second_solutions = [i for i in second_solutions if i != '']
first_demands = [i for i in first_demands if i != '']
second_demands = [i for i in second_demands if i != '']
first_positions = [i for i in first_positions if i != '']
second_positions = [i for i in second_positions if i != '']
projects = [i for i in projects if i!='']
write_user_portrait_action_divided(cl_id=cl_id, event_cn=item, first_solutions=set(first_solutions),
first_positions=set(first_positions),
first_demands=set(first_demands),
second_demands=set(second_demands),
second_positions=set(second_positions),
second_solutions=set(second_solutions),
projects=set(projects))
...@@ -421,6 +421,7 @@ def get_tag3_user_log(cl_id): ...@@ -421,6 +421,7 @@ def get_tag3_user_log(cl_id):
# `projects` text NOT NULL, # `projects` text NOT NULL,
# PRIMARY KEY(`id`) # PRIMARY KEY(`id`)
# ) # )
def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, second_demands, def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, second_demands,
first_positions, second_positions, projects): first_positions, second_positions, projects):
try: try:
...@@ -439,3 +440,77 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands, ...@@ -439,3 +440,77 @@ def write_user_portrait(cl_id, first_solutions, second_solutions, first_demands,
except Exception as e: except Exception as e:
print("write db error") print("write db error")
print(e) print(e)
def get_user_portrait_log(cl_id):
try:
sql = """select cl_id,event_cn, first_solutions, second_solutions, first_demands,
second_demands, first_positions, second_positions, projects
from kafka_tag3_log where cl_id in {} """.format(tuple(cl_id))
print("sql", sql)
db, cursor = get_jerry_test()
cursor.execute(sql)
data = list(cursor.fetchall())
db.close()
cursor.close()
return data
except Exception as e:
print(e)
return None
def user_portrait_action_statistic(cl_id):
try:
ud_dict = {}
user_df = get_user_portrait_log(cl_id)
for ud in user_df:
print("ud", ud)
ud2_list = ud[2].split(",")
ud3_list = ud[3].split(",")
ud4_list = ud[4].split(",")
ud5_list = ud[5].split(",")
ud6_list = ud[6].split(",")
ud7_list = ud[7].split(",")
ud8_list = ud[8].split(",")
if ud[0] in ud_dict.keys():
ud_dict[ud[0]][ud[1]]["first_solutions"].extend(ud2_list)
ud_dict[ud[0]][ud[1]]["second_solutions"].extend(ud3_list)
ud_dict[ud[0]][ud[1]]["first_demands"].extend(ud4_list)
ud_dict[ud[0]][ud[1]]["second_demands"].extend(ud5_list)
ud_dict[ud[0]][ud[1]]["first_positions"].extend(ud6_list)
ud_dict[ud[0]][ud[1]]["second_positions"].extend(ud7_list)
ud_dict[ud[0]][ud[1]]["projects"].extend(ud8_list)
else:
ud_dict[ud[0]] = {}
ud_dict[ud[0]][ud[1]] = {"first_solutions": ud2_list,"second_solutions": ud3_list,
"first_demands": ud4_list,"second_demands": ud5_list,
"first_positions": ud6_list,"second_positions": ud7_list,
"projects": ud8_list}
return ud_dict
except Exception as e:
print('user_portrait_action_statistic error')
print(e)
def write_user_portrait_action_divided(cl_id,event_cn,first_solutions, second_solutions, first_demands, second_demands,
first_positions, second_positions, projects):
try:
today = datetime.date.today()
oneday = datetime.timedelta(days=1)
yesterday = today - oneday
sql = """insert into user_portrait_action_divided values(null, '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}','{}')""".format(
yesterday, cl_id,event_cn, ",".join(first_solutions), ",".join(second_solutions), ",".join(first_demands),
",".join(second_demands), ",".join(first_positions), ",".join(second_positions), ",".join(projects))
db, cursor = get_jerry_test()
cursor.execute(sql)
db.commit()
db.close()
cursor.close()
except Exception as e:
print("write_user_portrait_action_divided error")
print(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment