Commit eedc308c authored by litaolemo's avatar litaolemo

update

parent b7cea630
......@@ -22,9 +22,9 @@ from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit
import pytispark.pytispark as pti
# db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
# db='jerry_prod')
# cursor = db.cursor()
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
startTime = time.time()
......@@ -261,8 +261,8 @@ for t in range(0, task_days):
collects_sql = """
SELECT device_type,active_type,channel_type,if(NVL(sum(1_uv),0) <> 0 ,concat(cast((NVL(sum(1_search_core_pv),0)/NVL(sum(1_uv),0)) as decimal(18,2)),'') , '-') as 1_core_pv_division_uv
,if(NVL(sum(1_uv),0) <> 0 ,concat(cast((NVL(sum(1_search_pv),0)/NVL(sum(1_uv),0)) as decimal(18,2)),'') , '-') as 1_pv_division_uv
SELECT device_type,active_type,channel_type,if(NVL(sum(1_uv),0) <> 0 ,concat((NVL(sum(1_search_core_pv),0)/NVL(sum(1_uv),0)) as decimal(18,4)) , 0) as 1_core_pv_division_uv
,if(NVL(sum(1_uv),0) <> 0 ,concat((NVL(sum(1_search_pv),0)/NVL(sum(1_uv),0)) as decimal(18,4)) , 0) as 1_pv_division_uv
FROM data_table GROUP BY device_type,active_type,channel_type
"""
finnal_df = spark.sql(collects_sql)
......@@ -271,3 +271,24 @@ for t in range(0, task_days):
sql_res = finnal_df.collect()
for res in sql_res:
print(res)
device_type = res.device_type
active_type = res.active_type
channel_type = res.channel_type
core_pv_division_uv = res["1_core_pv_division_uv"]
pv_division_uv = res["1_pv_division_uv"]
pid = hashlib.md5(
(today_str + device_type + active_type + channel_type).encode("utf8")).hexdigest()
instert_sql = """replace into search_strategy_d(
day_id,device_type,active_type,channel_type,1_core_pv_division_uv,1_pv_division_uv
) VALUES('{day_id}','{device_type}','{active_type}','{channel_type}','{core_pv_division_uv}',{pv_division_uv});""".format(
day_id=today_str, device_type=device_type,
active_type=active_type, channel_type=channel_type, core_pv_division_uv=core_pv_division_uv,pv_division_uv=pv_division_uv,pid=pid
)
print(instert_sql)
# cursor.execute("set names 'UTF8'")
res = cursor.execute(instert_sql)
db.commit()
print(res)
db.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment