From 55a246309c0c02811dcdb9aa689549c01303e688 Mon Sep 17 00:00:00 2001 From: litaolemo <593516104@qq.com> Date: Wed, 23 Sep 2020 16:15:12 +0800 Subject: [PATCH] update --- output/out_put_diary_0923.py | 221 +++++++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 output/out_put_diary_0923.py diff --git a/output/out_put_diary_0923.py b/output/out_put_diary_0923.py new file mode 100644 index 0000000..590879e --- /dev/null +++ b/output/out_put_diary_0923.py @@ -0,0 +1,221 @@ +# -*- coding:UTF-8 -*- +# @Time : 2020/9/23 16:10 +# @File : out_put_diary_0923.py +# @email : litao@igengmei.com +# @author : litao + + + +import hashlib +import json + +import pymysql +import xlwt, datetime +import redis +from meta_base_code.utils.func_from_redis_get_portrait import user_portrait_scan_info,get_user_portrait_tag3_from_redis +from meta_base_code.utils.func_from_es_get_article import get_es_article_num,get_user_post_from_mysql +# from pyhive import hive +from maintenance.func_send_email_with_file import send_file_email +from typing import Dict, List +from elasticsearch_7 import Elasticsearch +from elasticsearch_7.helpers import scan +import sys +import time +from pyspark import SparkConf +from pyspark.sql import SparkSession, DataFrame + + +# from pyspark.sql.functions import lit +# import pytispark.pytispark as pti +startTime = time.time() +sparkConf = SparkConf() +sparkConf.set("spark.sql.crossJoin.enabled", True) +sparkConf.set("spark.debug.maxToStringFields", "100") +sparkConf.set("spark.tispark.plan.allow_index_double_read", False) +sparkConf.set("spark.tispark.plan.allow_index_read", True) +sparkConf.set("spark.hive.mapred.supports.subdirectories", True) +sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True) +sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +sparkConf.set("mapreduce.output.fileoutputformat.compress", False) +sparkConf.set("mapreduce.map.output.compress", False) +sparkConf.set("prod.gold.jdbcuri", + "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true") +sparkConf.set("prod.mimas.jdbcuri", + "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true") +sparkConf.set("prod.gaia.jdbcuri", + "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true") +sparkConf.set("prod.tidb.jdbcuri", + "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") +sparkConf.set("prod.jerry.jdbcuri", + "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true") +sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379") +sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000") +sparkConf.set("prod.tidb.database", "jerry_prod") +sparkConf.setAppName("test") + +spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") + .config("spark.tispark.pd.addresses", "172.16.40.170:2379").enableHiveSupport().getOrCreate()) + +spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") +spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") +spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") +spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") +spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'") + +task_list = [] +task_days = 2 + + +def con_sql(sql): + # 从数æ®åº“çš„è¡¨é‡ŒèŽ·å–æ•°æ® + + db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy', + db='jerry_prod') + cursor = db.cursor() + cursor.execute(sql) + result = cursor.fetchall() + db.close() + return result + + +second_demands_zero_dict = { + # "answer":{}, + # "tractate":{}, + "diary":{}, +} +project_zero_dict = { + # "answer":{}, + # "tractate":{}, + "diary":{}, +} +t = 1 +day_num = 0 - t +now = (datetime.datetime.now() + datetime.timedelta(days=day_num)) +last_30_day_str = (now + datetime.timedelta(days=-31)).strftime("%Y%m%d") +today_str = now.strftime("%Y%m%d") +today_str_format = now.strftime("%Y-%m-%d") +yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d") +yesterday_str_format = (now + datetime.timedelta(days=-1)).strftime("%Y-%m-%d") +one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d") +sql = """select first_device from online.ml_user_history_detail where partition_date = {today_str} and last_active_date >= {last_30_day_str} +""".format(today_str=today_str,last_30_day_str=last_30_day_str) +print(sql) +new_urser_device_id_df = spark.sql(sql) +new_urser_device_id_df.createOrReplaceTempView("device_id_view") +new_urser_device_id_df.show(1) +sql_res = new_urser_device_id_df.collect() + +bulk_dict = { + 0: [0, 0, 0], + 10: [0, 0, 0], + 50: [0, 0, 0], + 100: [0, 0, 0], + 200: [0, 0, 0], + 500: [0, 0, 0], + 1000: [0, 0, 0], +} + + +task_list = [] +sql = """ +select card_id from strategy_content_exposure_index where card_content_type="diary" and preciseexposure_num>=50 and ctr>=0.05 and avg_page_stay>=20 and create_day="2020-09-17"; + """ +second_demands_count_dict, tags_v3_count_dict, second_demands_card_id_list,tags_v3_card_id_list,second_demands_tractate_dict,tags_v3_tractate_dict=get_user_post_from_mysql(sql) +# second_demands_count_dict= {'地包天': 4, '近视': 20, '洗眉': 16, '阴é“缩紧': 338, 'é¿å•': 1, '缩窄下巴': 1, '翘ç«': 2, '腿形矫æ£': 7, '脱手脚毛': 80, '注射物å–出': 1, '脱æ•': 2, '脱腋毛': 6, '祛纹身': 8, 'å¡«å……é¢éƒ¨': 7, '生ç§å¯†æ¯›å‘': 339, '祛红血ä¸': 2, '自体脂肪修å¤': 3, '除螨': 1, '生眉毛': 10, '网红脸': 1, '缩胸': 11, '阴茎美化': 11, '耳洞': 3, '生头å‘': 85, 'AI测试': 3, '凸嘴': 3, '缩çŸä¸‹å·´': 6, '纹身': 11, 'ç”Ÿç«æ¯›': 2, 'å‡ä½“å–出': 1, 'é¼»å”矫æ£': 1, '下颌缘æå‡': 2, '奥美定': 1, '皮肤病': 6, '性快感': 3, '眼线': 10, 'é¢è‚Œæ”¾æ¾': 3, '洗眼线': 2, 'äº§åŽæ¢å¤': 1, '祛腋è‡': 20, '脱背毛': 79, '脱毛å‘': 2, '鼻尖延长': 12, 'è…¿éƒ¨åŠ é•¿': 2, 'æ´é¢': 2, 'é¼»ä¸éš”å»¶é•¿': 3, '唇è…裂': 2, '脱唇毛': 3, 'å¡«å……å§èš•': 3, '丰眼çª': 3, '脱å‘际线': 5, '脸型': 241, '脱ç§å¯†æ¯›å‘': 1, '缩鼻背': 5, '生å‘际线': 4, '脱腿毛': 4, '生胡须': 1, '鼻部缩çŸ': 2, 'å¥åº·è°ƒç†': 4} + +# tags_v3_count_dict = {'牙齿': 3, '缩下巴': 3, '洗眉': 12, '玻尿酸': 1, '超声溶脂': 2, '注射物å–出': 1, 'ç”Ÿé•¿å› å': 3, 'G点注射': 3, 'æ£éª¨æœ¯': 1, '真皮填充å§èš•': 2, '激光祛皱': 4, '胶原蛋白填充é¢éƒ¨': 3, '脱ç§å¯†æ¯›å‘': 1, '瘦身': 3, '玻尿酸丰唇ç ': 1, 'åŠé£žç§’激光术': 1, '乳牙': 1, '产åŽä¿®å¤': 1, '视力检查': 2, '干细胞疗法': 2, '童颜针': 4, 'å‡†åˆ†åæ¿€å…‰æœ¯': 1, 'ä¸åŒ»æŒ‰æ‘©': 1, '祛黑头': 1, '祛黑眼圈': 4, '自体软骨垫鼻尖': 14, '洗眼线': 2, '耳部矫æ£': 2, '新手精选': 1, 'æ¤ç§å¯†æ¯›å‘': 338, '玻尿酸填充å§èš•': 1, '阴茎增大': 6, '乳房缩å°': 1, 'æ¤èƒ¡é¡»': 1, 'åŸºå› æ£€æµ‹': 1, 'é¼»å”矫æ£': 1, '䏋巴剿ލ': 10, '激光脱毛': 2, 'PRP生å‘': 80, '玻尿酸祛皱': 1, '注射祛腋è‡': 14, 'é¢è†œ': 4, 'åŠæ°¸ä¹…纹眉1': 2, '毛囊检测': 4, '激光脱腋毛': 2, '翘ç«': 2, '下巴截骨术': 2, '激光脱唇毛': 1, '激光祛腋è‡': 1, '全飞秒激光术': 1, 'æ‹”ç½': 1, '美臀': 1, '激光祛纹身': 7, 'å–·ç ‚æ´—ç‰™': 1, 'åŠæ°¸ä¹…纹å‘际线': 7, '自体脂肪é¢éƒ¨å¡«å……': 2, '祛斑': 1, 'ç§å¯†ç´§è‡´': 1, '激光脱手脚毛': 80, '腿形矫æ£': 7, '激光脱背毛': 80, '埋线缩鼻翼': 2, 'åŠæ°¸ä¹…': 4, 'å¡«å……é¢éƒ¨': 2, '生ç§å¯†æ¯›å‘': 1, '洗唇线': 2, '自体脂肪': 1, 'åŠæ°¸ä¹…纹眼线': 10, '胶原蛋白注射': 2, 'è‚‰æ¯’ç´ æ²»ç–—å¤šæ±—': 1, '黑脸娃娃': 6, '玻尿酸丰眼çª': 2, '微笑唇': 1, '打耳洞': 3, 'ç«æ¯›å¢žé•¿': 2, 'åŒçœ¼çš®': 8, '全飞秒': 3, '自体脂肪ç§å¯†ç´§è‡´': 337, 'å¸è„‚': 2, '皮肤病': 6, 'å£è…”溃疡': 1, '激光洗眉': 5, 'è¯ç‰©è„±æ¯›': 5, 'æ–骨增高': 2, 'é¢å¤´ç¼©å°': 4, '肤质检测': 1, '激光近视矫æ£': 4, '自体脂肪填充修å¤': 3, 'è‚‰æ¯’ç´ é¢è‚Œæ”¾æ¾': 1, '祛眼袋': 1, '晶体æ¤å…¥': 3, '阴茎延长': 5, '激光脱å‘际线': 1, '唇ç 唇弓': 2, '包皮手术': 5, '唇è…裂': 4, '乳头缩å°': 338, '臀部整形': 1, 'æ¤çœ‰': 10, '阴茎增粗': 9, '抗衰紧致': 1, '缩鼻背': 2, '手术祛腋è‡': 7, '射频祛眼袋': 4, '上眼ç‘祛脂': 3, '鼻部硅胶å‡ä½“å–出': 1, '激光脱腿毛': 3, 'å‘质护ç†': 6, '抗衰': 1} + +print(second_demands_count_dict,tags_v3_count_dict) +time.sleep(10) + + +second_demands_tag_count = {} +projects_demands_tag_count = {} +total_tag_count = {} +total_tag_count_pro = {} +temp_null_count = 0 +for redis_count,spark_res in enumerate(sql_res): + # if redis_count >= 50:break + second_demands = [] + projects = [] + total_answer_content_num = 0 + total_tractate_content_num = 0 + total_diary_content_num = 0 + # print(sql_res) + try: + res = get_user_portrait_tag3_from_redis(spark_res.first_device) + except: + continue + if res.get("second_demands"): + second_demands = res.get("second_demands") + # print(count_res) + for tag in second_demands: + if tag in second_demands_tag_count: + second_demands_tag_count[tag] += 1 + else: + second_demands_tag_count[tag] = 1 + if tag in second_demands_count_dict: + total_tractate_content_num += second_demands_count_dict[tag] + if res.get("projects"): + projects = res.get("projects") + # print(count_res) + for tag in projects: + if tag in projects_demands_tag_count: + projects_demands_tag_count[tag] += 1 + else: + projects_demands_tag_count[tag] = 1 + + if tag in tags_v3_count_dict: + total_tractate_content_num += tags_v3_count_dict[tag] + # print(total_answer_content_num, total_tractate_content_num, total_diary_content_num) + tmp_count_num = 0 + + + if 0 <= total_tractate_content_num < 10: + bulk_dict[0][1] += 1 + if not second_demands and not projects: + temp_null_count += 1 + if second_demands: + for tag in second_demands: + if tag in total_tag_count: + total_tag_count[tag] += 1 + else: + total_tag_count[tag] = 1 + if projects: + for tag in projects: + if tag in total_tag_count_pro: + total_tag_count_pro[tag] += 1 + else: + total_tag_count_pro[tag] = 1 + elif 10 <= total_tractate_content_num < 50: + bulk_dict[10][1] += 1 + elif 50 <= total_tractate_content_num < 100: + bulk_dict[50][1] += 1 + elif 100 <= total_tractate_content_num < 200: + bulk_dict[100][1] += 1 + elif 200 <= total_tractate_content_num < 500: + bulk_dict[200][1] += 1 + elif 500 <= total_tractate_content_num < 1000: + bulk_dict[500][1] += 1 + else: + bulk_dict[1000][1] += 1 + + # if redis_count % 5000 == 0: + # print(redis_count,bulk_dict) + # print(temp_null_count) + # print(second_demands_tag_count,projects_demands_tag_count) +print("total_tag_count" , total_tag_count) +print("total_tag_count_pro" ,total_tag_count_pro) +print("bulk_dict", bulk_dict) +print("temp_null_count", temp_null_count) +# +print("second_demands_tag_count",second_demands_tag_count) +print("projects_demands_tag_count",projects_demands_tag_count) +with open("log.log","w",encoding='utf-8') as f: + f.write(str(total_tag_count)) + f.write(str(total_tag_count_pro)) + f.write(str(second_demands_tag_count)) + f.write(str(projects_demands_tag_count)) + + + -- 2.18.0