1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# -*- coding:UTF-8 -*-
# @Time : 2020/9/23 16:10
# @File : out_put_diary_0923.py
# @email : litao@igengmei.com
# @author : litao
import hashlib
import json
import pymysql
import xlwt, datetime
import redis
from meta_base_code.utils.func_from_redis_get_portrait import user_portrait_scan_info,get_user_portrait_tag3_from_redis
from meta_base_code.utils.func_from_es_get_article import get_es_article_num, get_user_post_from_mysql, \
get_diary_from_mysql
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti
startTime = time.time()
# sparkConf = SparkConf()
# sparkConf.set("spark.sql.crossJoin.enabled", True)
# sparkConf.set("spark.debug.maxToStringFields", "100")
# sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
# sparkConf.set("spark.tispark.plan.allow_index_read", True)
# sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
# sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
# sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
# sparkConf.set("mapreduce.map.output.compress", False)
# sparkConf.set("prod.gold.jdbcuri",
# "jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
# sparkConf.set("prod.mimas.jdbcuri",
# "jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
# sparkConf.set("prod.gaia.jdbcuri",
# "jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
# sparkConf.set("prod.tidb.jdbcuri",
# "jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
# sparkConf.set("prod.jerry.jdbcuri",
# "jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
# sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
# sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
# sparkConf.set("prod.tidb.database", "jerry_prod")
# sparkConf.setAppName("test")
#
# spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
# .config("spark.tispark.pd.addresses", "172.16.40.170:2379").enableHiveSupport().getOrCreate())
#
# spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
# spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
# spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
# spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
# spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
task_list = []
task_days = 2
def con_sql(sql):
# 从数据库的表里获取数据
db = pymysql.connect(host='172.16.50.175', port=3306, user='doris', passwd='o5gbA27hXHHm',
db='doris_olap')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
second_demands_zero_dict = {
# "answer":{},
# "tractate":{},
"diary":{},
}
project_zero_dict = {
# "answer":{},
# "tractate":{},
"diary":{},
}
t = 1
day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
last_30_day_str = (now + datetime.timedelta(days=-31)).strftime("%Y%m%d")
today_str = now.strftime("%Y%m%d")
today_str_format = now.strftime("%Y-%m-%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
yesterday_str_format = (now + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
# sql = """select first_device from online.ml_user_history_detail where partition_date = {today_str} and last_active_date >= {last_30_day_str}
# """.format(today_str=today_str,last_30_day_str=last_30_day_str)
# print(sql)
# new_urser_device_id_df = spark.sql(sql)
# new_urser_device_id_df.createOrReplaceTempView("device_id_view")
# new_urser_device_id_df.show(1)
# sql_res = new_urser_device_id_df.collect()
bulk_dict = {
0: [0, 0, 0],
10: [0, 0, 0],
50: [0, 0, 0],
100: [0, 0, 0],
200: [0, 0, 0],
500: [0, 0, 0],
1000: [0, 0, 0],
}
task_list = []
sql = """
select card_id from strategy_content_exposure_index where card_content_type="diary" and preciseexposure_num>=50 and ctr>=0.05 and avg_page_stay>=20 and create_day="2020-09-17";
"""
second_demands_count_dict, tags_v3_count_dict, second_demands_card_id_list,tags_v3_card_id_list,second_demands_tractate_dict,tags_v3_tractate_dict=get_diary_from_mysql(sql,id_type='diary')
# second_demands_count_dict= {'地包天': 4, '近视': 20, '洗眉': 16, '阴道缩紧': 338, '避孕': 1, '缩窄下巴': 1, '翘睫': 2, '腿形矫正': 7, '脱手脚毛': 80, '注射物取出': 1, '脱敏': 2, '脱腋毛': 6, '祛纹身': 8, '填充面部': 7, '生私密毛发': 339, '祛红血丝': 2, '自体脂肪修复': 3, '除螨': 1, '生眉毛': 10, '网红脸': 1, '缩胸': 11, '阴茎美化': 11, '耳洞': 3, '生头发': 85, 'AI测试': 3, '凸嘴': 3, '缩短下巴': 6, '纹身': 11, '生睫毛': 2, '假体取出': 1, '鼻孔矫正': 1, '下颌缘提升': 2, '奥美定': 1, '皮肤病': 6, '性快感': 3, '眼线': 10, '颏肌放松': 3, '洗眼线': 2, '产后恢复': 1, '祛腋臭': 20, '脱背毛': 79, '脱毛发': 2, '鼻尖延长': 12, '腿部加长': 2, '洁面': 2, '鼻中隔延长': 3, '唇腭裂': 2, '脱唇毛': 3, '填充卧蚕': 3, '丰眼窝': 3, '脱发际线': 5, '脸型': 241, '脱私密毛发': 1, '缩鼻背': 5, '生发际线': 4, '脱腿毛': 4, '生胡须': 1, '鼻部缩短': 2, '健康调理': 4}
# tags_v3_count_dict = {'牙齿': 3, '缩下巴': 3, '洗眉': 12, '玻尿酸': 1, '超声溶脂': 2, '注射物取出': 1, '生长因子': 3, 'G点注射': 3, '正骨术': 1, '真皮填充卧蚕': 2, '激光祛皱': 4, '胶原蛋白填充面部': 3, '脱私密毛发': 1, '瘦身': 3, '玻尿酸丰唇珠': 1, '半飞秒激光术': 1, '乳牙': 1, '产后修复': 1, '视力检查': 2, '干细胞疗法': 2, '童颜针': 4, '准分子激光术': 1, '中医按摩': 1, '祛黑头': 1, '祛黑眼圈': 4, '自体软骨垫鼻尖': 14, '洗眼线': 2, '耳部矫正': 2, '新手精选': 1, '植私密毛发': 338, '玻尿酸填充卧蚕': 1, '阴茎增大': 6, '乳房缩小': 1, '植胡须': 1, '基因检测': 1, '鼻孔矫正': 1, '下巴前推': 10, '激光脱毛': 2, 'PRP生发': 80, '玻尿酸祛皱': 1, '注射祛腋臭': 14, '面膜': 4, '半永久纹眉1': 2, '毛囊检测': 4, '激光脱腋毛': 2, '翘睫': 2, '下巴截骨术': 2, '激光脱唇毛': 1, '激光祛腋臭': 1, '全飞秒激光术': 1, '拔罐': 1, '美臀': 1, '激光祛纹身': 7, '喷砂洗牙': 1, '半永久纹发际线': 7, '自体脂肪面部填充': 2, '祛斑': 1, '私密紧致': 1, '激光脱手脚毛': 80, '腿形矫正': 7, '激光脱背毛': 80, '埋线缩鼻翼': 2, '半永久': 4, '填充面部': 2, '生私密毛发': 1, '洗唇线': 2, '自体脂肪': 1, '半永久纹眼线': 10, '胶原蛋白注射': 2, '肉毒素治疗多汗': 1, '黑脸娃娃': 6, '玻尿酸丰眼窝': 2, '微笑唇': 1, '打耳洞': 3, '睫毛增长': 2, '双眼皮': 8, '全飞秒': 3, '自体脂肪私密紧致': 337, '吸脂': 2, '皮肤病': 6, '口腔溃疡': 1, '激光洗眉': 5, '药物脱毛': 5, '断骨增高': 2, '额头缩小': 4, '肤质检测': 1, '激光近视矫正': 4, '自体脂肪填充修复': 3, '肉毒素颏肌放松': 1, '祛眼袋': 1, '晶体植入': 3, '阴茎延长': 5, '激光脱发际线': 1, '唇珠唇弓': 2, '包皮手术': 5, '唇腭裂': 4, '乳头缩小': 338, '臀部整形': 1, '植眉': 10, '阴茎增粗': 9, '抗衰紧致': 1, '缩鼻背': 2, '手术祛腋臭': 7, '射频祛眼袋': 4, '上眼睑祛脂': 3, '鼻部硅胶假体取出': 1, '激光脱腿毛': 3, '发质护理': 6, '抗衰': 1}
print("second_demands_count_dict",second_demands_count_dict)
print("second_demands_count_dict",tags_v3_count_dict)
time.sleep(10)
#
# second_demands_tag_count = {}
# projects_demands_tag_count = {}
# total_tag_count = {}
# total_tag_count_pro = {}
# temp_null_count = 0
# for redis_count,spark_res in enumerate(sql_res):
# # if redis_count >= 50:break
# second_demands = []
# projects = []
# total_answer_content_num = 0
# total_tractate_content_num = 0
# total_diary_content_num = 0
# # print(sql_res)
# try:
# res = get_user_portrait_tag3_from_redis(spark_res.first_device)
# except:
# continue
# if res.get("second_demands"):
# second_demands = res.get("second_demands")
# # print(count_res)
# for tag in second_demands:
# if tag in second_demands_tag_count:
# second_demands_tag_count[tag] += 1
# else:
# second_demands_tag_count[tag] = 1
# if tag in second_demands_count_dict:
# total_tractate_content_num += second_demands_count_dict[tag]
# if res.get("projects"):
# projects = res.get("projects")
# # print(count_res)
# for tag in projects:
# if tag in projects_demands_tag_count:
# projects_demands_tag_count[tag] += 1
# else:
# projects_demands_tag_count[tag] = 1
#
# if tag in tags_v3_count_dict:
# total_tractate_content_num += tags_v3_count_dict[tag]
# # print(total_answer_content_num, total_tractate_content_num, total_diary_content_num)
# tmp_count_num = 0
#
#
# if 0 <= total_tractate_content_num < 10:
# bulk_dict[0][1] += 1
# if not second_demands and not projects:
# temp_null_count += 1
# if second_demands:
# for tag in second_demands:
# if tag in total_tag_count:
# total_tag_count[tag] += 1
# else:
# total_tag_count[tag] = 1
# if projects:
# for tag in projects:
# if tag in total_tag_count_pro:
# total_tag_count_pro[tag] += 1
# else:
# total_tag_count_pro[tag] = 1
# elif 10 <= total_tractate_content_num < 50:
# bulk_dict[10][1] += 1
# elif 50 <= total_tractate_content_num < 100:
# bulk_dict[50][1] += 1
# elif 100 <= total_tractate_content_num < 200:
# bulk_dict[100][1] += 1
# elif 200 <= total_tractate_content_num < 500:
# bulk_dict[200][1] += 1
# elif 500 <= total_tractate_content_num < 1000:
# bulk_dict[500][1] += 1
# else:
# bulk_dict[1000][1] += 1
#
# # if redis_count % 5000 == 0:
# # print(redis_count,bulk_dict)
# # print(temp_null_count)
# # print(second_demands_tag_count,projects_demands_tag_count)
# print("total_tag_count" , total_tag_count)
# print("total_tag_count_pro" ,total_tag_count_pro)
# print("bulk_dict", bulk_dict)
# print("temp_null_count", temp_null_count)
# #
# print("second_demands_tag_count",second_demands_tag_count)
# print("projects_demands_tag_count",projects_demands_tag_count)
# with open("log.log","w",encoding='utf-8') as f:
# f.write(str(total_tag_count))
# f.write(str(total_tag_count_pro))
# f.write(str(second_demands_tag_count))
# f.write(str(projects_demands_tag_count))
#
#
#