Commit ca265a7c authored by litaolemo's avatar litaolemo

update

parent dca24616
...@@ -22,6 +22,7 @@ from pyspark import SparkConf ...@@ -22,6 +22,7 @@ from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame from pyspark.sql import SparkSession, DataFrame
from meta_base_code.utils.func_from_redis_get_portrait import * from meta_base_code.utils.func_from_redis_get_portrait import *
# from pyspark.sql.functions import lit # from pyspark.sql.functions import lit
# import pytispark.pytispark as pti # import pytispark.pytispark as pti
...@@ -78,13 +79,11 @@ spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDF ...@@ -78,13 +79,11 @@ spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDF
task_list = [] task_list = []
task_days = 3 task_days = 3
for t in range(2, task_days): for t in range(2, task_days):
day_num = 0 - t day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d") last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d")
tomorrow_str = (datetime.datetime.now() + datetime.timedelta(days=day_num+1)).strftime("%Y%m%d") tomorrow_str = (datetime.datetime.now() + datetime.timedelta(days=day_num + 1)).strftime("%Y%m%d")
today_str = now.strftime("%Y%m%d") today_str = now.strftime("%Y%m%d")
today_str_format = now.strftime("%Y-%m-%d") today_str_format = now.strftime("%Y-%m-%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
...@@ -97,89 +96,105 @@ SELECT * FROM online.bl_hdfs_maidian_updates ...@@ -97,89 +96,105 @@ SELECT * FROM online.bl_hdfs_maidian_updates
AND ((action in ('on_click_topic_card','on_click_diary_card','search_result_click_infomation_item') AND ((action in ('on_click_topic_card','on_click_diary_card','search_result_click_infomation_item')
AND page_name in ('search_result_more','search_result_diary','search_result_post')) AND page_name in ('search_result_more','search_result_diary','search_result_post'))
or (action = 'on_click_card' AND params['card_content_type'] in ('answer','diary') AND page_name in ('search_result_more','search_result_diary','search_result_question_answer'))) or (action = 'on_click_card' AND params['card_content_type'] in ('answer','diary') AND page_name in ('search_result_more','search_result_diary','search_result_question_answer')))
""".format(partition_day="20210224", end_date="20210225",tomorrow_str=tomorrow_str) """.format(partition_day="20210224", end_date="20210225", tomorrow_str=tomorrow_str)
# print(new_urser_device_id_sql) # print(new_urser_device_id_sql)
new_urser_device_id_df = spark.sql(new_urser_device_id_sql) new_urser_device_id_df = spark.sql(new_urser_device_id_sql)
new_urser_device_id_df.createOrReplaceTempView("device_id_view") new_urser_device_id_df.createOrReplaceTempView("device_id_view")
new_urser_device_id_df.show(1) new_urser_device_id_df.show(1)
sql_res = new_urser_device_id_df.collect() sql_res = new_urser_device_id_df.collect()
res_list = []
for res in sql_res: for res in sql_res:
print(res) print(res)
# sql_res = new_urser_device_id_df.collect() query = res.params["query"]
# res_dict = {} card_name = res.params["card_name"]
# portrait_dict = { card_id = res.params["card_id"]
# "first_demands": {}, user_id = res.user_id
# "second_demands": {}, time_str = res.time_str
# "first_solutions": {}, page_name = res.page_name
# "second_solutions": {}, res_list.append({"query": query,
# "first_positions": {}, "card_name": card_name,
# "second_positions": {}, "card_id": card_id,
# "projects": {}, "user_id": user_id,
# 'anecdote_tags':{} "time_str": time_str,
# } "page_name": page_name
# no_portrait_device_id_list = [] })
# print("-------------------------------") import pandas
# count_not_has_portratit = 0
# data = pandas.DataFrame(res_list)
# for count_user_count, res in enumerate(sql_res): data.to_csv("data.csv",encoding="gb18030")
# # print(count, res) from maintenance.send_email_with_file_auto_task import *
# portratit_res = get_user_portrait_tag3_from_redis(res.device_id) send_file_email("",'',sender="litao@igengmei.com",email_group=["litao@igengmei.com"],email_msg_body_str="test",title_str="test",cc_group=["litao@igengmei.com"],file="data.csv")
# sql = """select cl_id, projects from kafka_tag3_log # sql_res = new_urser_device_id_df.collect()
# where cl_id = '%s' and event_cn = 'kyc' """ % res.device_id # res_dict = {}
# # print(count_user_count, res, portratit_res) # portrait_dict = {
# sql_res_list = con_sql(sql) # "first_demands": {},
# kyc_str_list= [] # "second_demands": {},
# if sql_res_list: # "first_solutions": {},
# print(sql_res_list,type(sql_res_list)) # "second_solutions": {},
# kyc_str_list = sql_res_list[0][1].split(",") # "first_positions": {},
# # "second_positions": {},
# temp_count = 0 # "projects": {},
# for demand in portratit_res: # 'anecdote_tags':{}
# if portratit_res[demand]: # }
# try: # no_portrait_device_id_list = []
# for tag in portratit_res[demand][0:3]: # print("-------------------------------")
# if tag in portrait_dict[demand]: # count_not_has_portratit = 0
# portrait_dict[demand][tag] += 1 #
# else: # for count_user_count, res in enumerate(sql_res):
# portrait_dict[demand][tag] = 1 # # print(count, res)
# if tag in kyc_str_list and demand == "projects": # portratit_res = get_user_portrait_tag3_from_redis(res.device_id)
# if portrait_dict["projects"].get(tag): # sql = """select cl_id, projects from kafka_tag3_log
# portrait_dict["projects"][tag] -= 1 # where cl_id = '%s' and event_cn = 'kyc' """ % res.device_id
# except Exception as e: # # print(count_user_count, res, portratit_res)
# print("error ", e) # sql_res_list = con_sql(sql)
# # kyc_str_list= []
# temp_count += 1 # if sql_res_list:
# if not temp_count: # print(sql_res_list,type(sql_res_list))
# count_not_has_portratit += 1 # kyc_str_list = sql_res_list[0][1].split(",")
# no_portrait_device_id_list.append(res.device_id) #
# # temp_count = 0
# # for demand in portratit_res:
# print(portrait_dict) # if portratit_res[demand]:
# print(count_user_count+1,count_not_has_portratit) # try:
# print("-------------------------------") # for tag in portratit_res[demand][0:3]:
# # if tag in portrait_dict[demand]:
# # portrait_dict[demand][tag] += 1
# for protratit_type in portrait_dict["projects"]: # else:
# partition_date = today_str # portrait_dict[demand][tag] = 1
# pid = hashlib.md5((partition_date + protratit_type).encode("utf8")).hexdigest() # if tag in kyc_str_list and demand == "projects":
# action_count = portrait_dict["projects"][protratit_type] # if portrait_dict["projects"].get(tag):
# # portrait_dict["projects"][tag] -= 1
# instert_sql = """replace into new_user_project_count( # except Exception as e:
# partition_day,pid,protratit_count,protratit_type) VALUES('{partition_day}','{pid}',{protratit_count},'{protratit_type}');""".format( # print("error ", e)
# partition_day=today_str, pid=pid, protratit_count=action_count #
# , protratit_type=protratit_type # temp_count += 1
# ) # if not temp_count:
# print(instert_sql) # count_not_has_portratit += 1
# # cursor.execute("set names 'UTF8'") # no_portrait_device_id_list.append(res.device_id)
# db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm', #
# db='doris_olap') #
# cursor = db.cursor() # print(portrait_dict)
# res = cursor.execute(instert_sql) # print(count_user_count+1,count_not_has_portratit)
# db.commit() # print("-------------------------------")
# print(res) #
# #
# for protratit_type in portrait_dict["projects"]:
# partition_date = today_str
# pid = hashlib.md5((partition_date + protratit_type).encode("utf8")).hexdigest()
# action_count = portrait_dict["projects"][protratit_type]
#
# instert_sql = """replace into new_user_project_count(
# partition_day,pid,protratit_count,protratit_type) VALUES('{partition_day}','{pid}',{protratit_count},'{protratit_type}');""".format(
# partition_day=today_str, pid=pid, protratit_count=action_count
# , protratit_type=protratit_type
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
# db='doris_olap')
# cursor = db.cursor()
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
#
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment