From eedc308c31ed69faf5fffd1f299a1abf075039ef Mon Sep 17 00:00:00 2001 From: litaolemo <593516104@qq.com> Date: Mon, 24 Aug 2020 15:49:01 +0800 Subject: [PATCH] update --- task/search_strategy_d.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/task/search_strategy_d.py b/task/search_strategy_d.py index 507151e..95eeab7 100644 --- a/task/search_strategy_d.py +++ b/task/search_strategy_d.py @@ -22,9 +22,9 @@ from pyspark.sql import SparkSession, DataFrame from pyspark.sql.functions import lit import pytispark.pytispark as pti -# db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', -# db='jerry_prod') -# cursor = db.cursor() +db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', + db='jerry_prod') +cursor = db.cursor() startTime = time.time() @@ -261,8 +261,8 @@ for t in range(0, task_days): collects_sql = """ - SELECT device_type,active_type,channel_type,if(NVL(sum(1_uv),0) <> 0 ,concat(cast((NVL(sum(1_search_core_pv),0)/NVL(sum(1_uv),0)) as decimal(18,2)),'') , '-') as 1_core_pv_division_uv - ,if(NVL(sum(1_uv),0) <> 0 ,concat(cast((NVL(sum(1_search_pv),0)/NVL(sum(1_uv),0)) as decimal(18,2)),'') , '-') as 1_pv_division_uv + SELECT device_type,active_type,channel_type,if(NVL(sum(1_uv),0) <> 0 ,concat((NVL(sum(1_search_core_pv),0)/NVL(sum(1_uv),0)) as decimal(18,4)) , 0) as 1_core_pv_division_uv + ,if(NVL(sum(1_uv),0) <> 0 ,concat((NVL(sum(1_search_pv),0)/NVL(sum(1_uv),0)) as decimal(18,4)) , 0) as 1_pv_division_uv FROM data_table GROUP BY device_type,active_type,channel_type """ finnal_df = spark.sql(collects_sql) @@ -271,3 +271,24 @@ for t in range(0, task_days): sql_res = finnal_df.collect() for res in sql_res: print(res) + device_type = res.device_type + active_type = res.active_type + channel_type = res.channel_type + core_pv_division_uv = res["1_core_pv_division_uv"] + pv_division_uv = res["1_pv_division_uv"] + pid = hashlib.md5( + (today_str + device_type + active_type + channel_type).encode("utf8")).hexdigest() + instert_sql = """replace into search_strategy_d( + day_id,device_type,active_type,channel_type,1_core_pv_division_uv,1_pv_division_uv + ) VALUES('{day_id}','{device_type}','{active_type}','{channel_type}','{core_pv_division_uv}',{pv_division_uv});""".format( + day_id=today_str, device_type=device_type, + active_type=active_type, channel_type=channel_type, core_pv_division_uv=core_pv_division_uv,pv_division_uv=pv_division_uv,pid=pid + + ) + print(instert_sql) + # cursor.execute("set names 'UTF8'") + res = cursor.execute(instert_sql) + db.commit() + print(res) +db.close() + -- 2.18.0