Commit bba179c6 authored by 任婷婷's avatar 任婷婷

.

parent 8cfe399e
import pymysql
from tool import (get_jerry_test,get_user_portrait_log,user_portrait_action_statistic,write_user_portrait_action_divided)
def get_device_list():
sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
db, cursor = get_jerry_test()
cursor.execute(sql)
device_ids_lst = [i[0] for i in cursor.fetchall()]
db.close()
cursor.close()
return device_ids_lst
def divide_list(interval,data):
divided_lists = [data[i:i+interval] for i in range(0,len(data),interval)]
# divided_lists = [device_ids_lst[i,i+interval] for i in range(0,len(device_ids_lst),interval)]
return divided_lists
def user_portrait_action_divided():
first_solutions = []
first_positions = []
first_demands = []
second_demands = []
second_positions = []
second_solutions = []
projects = []
print('*' * 100)
print('INPUT cl_ids:', cl_id)
print('OUTPUT data:')
data = user_portrait_action_statistic(cl_id)
print(data)
print('-' * 100)
for key, values in data.items():
cl_id_single = key
for item in values.keys():
if values[item]:
first_solutions = values[item].get("first_solutions", [])
first_positions = values[item].get("first_positions", [])
first_demands = values[item].get("first_demands", [])
second_demands = values[item].get("second_demands", [])
second_positions = values[item].get("second_positions", [])
second_solutions = values[item].get("second_solutions", [])
projects = values[item].get("projects", [])
first_solutions = [i for i in first_solutions if i != '']
second_solutions = [i for i in second_solutions if i != '']
first_demands = [i for i in first_demands if i != '']
second_demands = [i for i in second_demands if i != '']
first_positions = [i for i in first_positions if i != '']
second_positions = [i for i in second_positions if i != '']
projects = [i for i in projects if i != '']
write_user_portrait_action_divided(cl_id=cl_id_single, event_cn=item,
first_solutions=set(first_solutions),
first_positions=set(first_positions),
first_demands=set(first_demands),
second_demands=set(second_demands),
second_positions=set(second_positions),
second_solutions=set(second_solutions),
projects=set(projects))
def get_kafka_sql():
sql = "select distinct cl_id from kafka_tag3_log where log_time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
db, cursor = get_jerry_test()
cursor.execute(sql)
device_ids_lst = [i[0] for i in cursor.fetchall()]
db.close()
cursor.close()
return device_ids_lst
def data_handler(device_lst):
for device_id in device_lst:
update_user_portrait(device_id)
# def multi_process_read_sql(device_lsts):
# gevent.joinall([gevent.spawn(data_handler, device_lst) for device_lst in device_lsts])
# def data_handler(divice_lst):
# conn = pymysql.connect(host='172.18.3.204',user='root',password='xinwei',database='btree',charset='utf8')
# cursor = conn.cursor()
# for i in range(anum,num):
# sql = 'insert into aaa(sid,name,email) values(%s,%s,concat(%s,"hael","@163"));'
# res = cursor.execute(sql,[i,"root",i])
# conn.commit()
# cursor.close()
# conn.close()
if __name__ == '__main__':
divice_list_all = get_device_list()
divided_lists = divide_list(10,divice_list_all)
start_time = time.time()
gevent.joinall([gevent.spawn(data_handler,divice_lst) for divice_lst in divided_lists])
stop_time = time.time()
time = stop_time - start_time
print('*'*100)
print('Time:',time)
...@@ -174,7 +174,7 @@ def consume_kafka(): ...@@ -174,7 +174,7 @@ def consume_kafka():
# try: # try:
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py") spark.sparkContext.addPyFile("/srv/apps/ffm-baseline/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x)) result = device_ids_lst_rdd.repartition(100).map(lambda x: update_tag3_user_portrait(x))
......
...@@ -2,6 +2,10 @@ ...@@ -2,6 +2,10 @@
import pymysql import pymysql
import pandas as pd import pandas as pd
import datetime import datetime
import gevent
from gevent import monkey
import requests
import time
# #
# def get_jerry_test(): # def get_jerry_test():
...@@ -111,8 +115,6 @@ def get_user_portrait_action_divided(cl_ids): ...@@ -111,8 +115,6 @@ def get_user_portrait_action_divided(cl_ids):
second_solutions = [] second_solutions = []
projects = [] projects = []
# cl_ids = [1,3]
cl_ids = '3D878D3E-9C19-4CC9-BB7A-0082B38996F2'
data = user_portrait_action_statistic(cl_ids) data = user_portrait_action_statistic(cl_ids)
for key, values in data.items(): for key, values in data.items():
...@@ -188,7 +190,7 @@ def get_portrait(cl_id): ...@@ -188,7 +190,7 @@ def get_portrait(cl_id):
def get_portrait_device_ids_lst(): def get_portrait_device_ids_lst():
sql = "select cl_id from user_portrait_action_divided limit 10" sql = "select cl_id from user_portrait_action_divided limit 10"
db, cursor = get_jerry_test() db, cursor = connect_doris_table()
cursor.execute(sql) cursor.execute(sql)
device_ids_lst = [i[0] for i in cursor.fetchall()] device_ids_lst = [i[0] for i in cursor.fetchall()]
db.close() db.close()
...@@ -217,6 +219,7 @@ def show_kafka_and_portrait(): ...@@ -217,6 +219,7 @@ def show_kafka_and_portrait():
if __name__ == '__main__': if __name__ == '__main__':
show_kafka_and_portrait() show_kafka_and_portrait()
# cl_ids = [''] # cl_ids = ['']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment