# -*- coding:UTF-8 -*- # @Time : 2020/8/20 9:42 # @File : recommend_strategy_d.py # @email : litao@igengmei.com # @author : litao import hashlib import json import pymysql import xlwt, datetime import redis # from pyhive import hive from maintenance.func_send_email_with_file import send_file_email from typing import Dict, List from elasticsearch_7 import Elasticsearch from elasticsearch_7.helpers import scan import sys import time from pyspark import SparkConf from pyspark.sql import SparkSession, DataFrame # from pyspark.sql.functions import lit # import pytispark.pytispark as pti db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db='jerry_prod') cursor = db.cursor() def con_sql(sql): # 从数据库的表里获取数据 db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', db='jerry_prod') cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() db.close() return result startTime = time.time() sparkConf = SparkConf() sparkConf.set("spark.sql.crossJoin.enabled", True) sparkConf.set("spark.debug.maxToStringFields", "100") sparkConf.set("spark.tispark.plan.allow_index_double_read", False) sparkConf.set("spark.tispark.plan.allow_index_read", True) sparkConf.set("spark.hive.mapred.supports.subdirectories", True) sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True) sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("mapreduce.output.fileoutputformat.compress", False) sparkConf.set("mapreduce.map.output.compress", False) sparkConf.set("prod.gold.jdbcuri", "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true") sparkConf.set("prod.mimas.jdbcuri", "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true") sparkConf.set("prod.gaia.jdbcuri", "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true") sparkConf.set("prod.tidb.jdbcuri", "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") sparkConf.set("prod.jerry.jdbcuri", "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") sparkConf.set("prod.tidb.database", "jerry_prod") spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") .config("spark.tispark.pd.addresses", "172.16.40.170:2379").appName( "LR PYSPARK TEST").enableHiveSupport().getOrCreate()) spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") t = 1 day_num = 0 - t now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) last_30_day_str = (now + datetime.timedelta(days=-31)).strftime("%Y%m%d") today_str = now.strftime("%Y%m%d") today_str_format = now.strftime("%Y-%m-%d") yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") yesterday_str_format = (now + datetime.timedelta(days=-1)).strftime("%Y-%m-%d") one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") grey_select_sql = """SELECT *, NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_exposure,5),0) as recommend_ctr, NVL(ROUND(card_click/card_exposure,5),0) as click_ctr, NVL(ROUND((navbar_search+highlight_word+self_welfare_card+recommend_welfare_card+recommend_content_card*0.2+transfer_card+video_consultation)/card_click,5),0) as second_jump_ctr , NVL(ROUND(page_pv_20/page_pv,5),0) as page_pv_20_percent FROM pm.tl_pm_recommend_strategy_d_v2""" device_df = spark.sql(grey_select_sql) device_df.show(1, False) sql_res = device_df.collect() print("-----------------------------------------------------------------------------") for res in sql_res: # print(res) day_id = res.day_id device_os_type = res.device_os_type active_type = res.active_type card_content_type = res.card_content_type recommend_type = res.recommend_type card_click = res.card_click card_exposure = res.card_exposure avg_page_stay = res.avg_page_stay navbar_search = res.navbar_search highlight_word = res.highlight_word self_welfare_card = res.self_welfare_card page_pv_20 = res.page_pv_20 page_pv_20_percent = res.page_pv_20_percent recommend_welfare_card = res.recommend_welfare_card recommend_content_card = res.recommend_content_card if not recommend_content_card: recommend_content_card = 0 recommend_special_card = res.recommend_special_card if not recommend_special_card: recommend_special_card = 0 transfer_card = res.transfer_card video_consultation = res.video_consultation partition_day = res.partition_day recommend_ctr = res.recommend_ctr second_jump_ctr = res.second_jump_ctr click_ctr = res.click_ctr pid = hashlib.md5((day_id + device_os_type + active_type + card_content_type + recommend_type).encode("utf8")).hexdigest() instert_sql = """replace into recommend_strategy_d( day_id,device_os_type,active_type,card_content_type,recommend_type,card_click,card_exposure,avg_page_stay,navbar_search, highlight_word,self_welfare_card,recommend_welfare_card,recommend_content_card,recommend_special_card,transfer_card,video_consultation, partition_day,pid,recommend_ctr,second_jump_ctr,click_ctr,page_pv_20_percent ) VALUES('{day_id}','{device_os_type}','{active_type}','{card_content_type}','{recommend_type}',{card_click},{card_exposure}, {avg_page_stay},{navbar_search},{highlight_word},{self_welfare_card},{recommend_welfare_card},{recommend_content_card},{recommend_special_card}, {transfer_card},{video_consultation},'{partition_day}','{pid}',{recommend_ctr},{second_jump_ctr},{click_ctr},{page_pv_20_percent});""".format( day_id=day_id, device_os_type=device_os_type, active_type=active_type, card_content_type=card_content_type, card_click=card_click, recommend_type=recommend_type, card_exposure=card_exposure, avg_page_stay=avg_page_stay, navbar_search=navbar_search, self_welfare_card=self_welfare_card, recommend_welfare_card=recommend_welfare_card, recommend_content_card=recommend_content_card, recommend_special_card=recommend_special_card,page_pv_20_percent=page_pv_20_percent, transfer_card=transfer_card, video_consultation=video_consultation, partition_day=partition_day, pid=pid, recommend_ctr=recommend_ctr, second_jump_ctr=second_jump_ctr, click_ctr=click_ctr,highlight_word=highlight_word ) print(instert_sql) # cursor.execute("set names 'UTF8'") res = cursor.execute(instert_sql) db.commit() print(res) # cursor.executemany() db.close()