Commit b796b6de authored by litaolemo's avatar litaolemo

update

parent bbe82796
# -*- coding:UTF-8 -*-
# @Time : 2020/9/11 10:59
# @File : portary_article_distribution.py
# @email : litao@igengmei.com
# @author : litao
import hashlib
import json
import pymysql
import xlwt, datetime
import redis
# from pyhive import hive
from maintenance.func_send_email_with_file import send_file_email
from typing import Dict, List
from elasticsearch_7 import Elasticsearch
from elasticsearch_7.helpers import scan
import sys
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql.functions import lit
# import pytispark.pytispark as pti
def con_sql(sql):
# 从数据库的表里获取数据
db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
db='jerry_prod')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
startTime = time.time()
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
sparkConf.set("prod.gold.jdbcuri",
"jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true")
sparkConf.set("prod.mimas.jdbcuri",
"jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true")
sparkConf.set("prod.gaia.jdbcuri",
"jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true")
sparkConf.set("prod.tidb.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.jerry.jdbcuri",
"jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.158:2379")
sparkConf.set("prod.tispark.pd.addresses", "172.16.40.170:4000")
sparkConf.set("prod.tidb.database", "jerry_prod")
sparkConf.setAppName("search_tractate_ctr")
spark = (SparkSession.builder.config(conf=sparkConf).config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", "172.16.40.170:2379").enableHiveSupport().getOrCreate())
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sql("CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'")
def user_portrait_scan_info():
return_dict = {}
try:
round = 0
all_count = 0
empty_count = 0
just_projects_count = 0
keys = "doris:user_portrait:tag3:device_id:*"
cur, results = redis_client2.scan(0, keys, 3000)
while cur != 0:
round += 1
print("round: " + str(round))
cur, results = redis_client2.scan(cur, keys, 3000)
for key in results:
key = key
device_id = key.split(":")[-1]
all_count += 1
# print(key)
# if user_portrait_is_empty(device_id):
# print(device_id)
# empty_count += 1
# if user_portrait_just_projects(device_id):
# print(device_id)
# just_projects_count += 1
# user_portrait_get_empty_candidates(device_id)
try:
res_dic = get_user_portrait_tag3_from_redis(device_id)
# print(res_dic)
for data_type in res_dic:
for tag in res_dic[data_type]:
if tag == "眼窝":
print(return_dict.get(tag))
if return_dict.get(tag):
return_dict[tag] = (data_type, return_dict[tag][1] + 1)
else:
return_dict[tag] = (data_type, 1)
except:
continue
# for data_list in res_dic:
# for data in data_list:
return return_dict
except Exception as e:
print(e)
return {}
task_list = []
task_days = 3
for t in range(1, task_days):
day_num = 0 - t
now = (datetime.datetime.now() + datetime.timedelta(days=day_num))
last_30_day_str = (now + datetime.timedelta(days=-30)).strftime("%Y%m%d")
today_str = now.strftime("%Y%m%d")
yesterday_str = (now + datetime.timedelta(days=-1)).strftime("%Y%m%d")
one_week_age_str = (now + datetime.timedelta(days=-7)).strftime("%Y%m%d")
# for active_type in res_dict:
# for device_os_type in res_dict[active_type]:
# partition_date = yesterday_str
# pid = hashlib.md5((partition_date + device_os_type + active_type).encode("utf8")).hexdigest()
# click_num = res_dict[active_type][device_os_type]["click_num"]
# exposure = res_dict[active_type][device_os_type]["exposure"]
# try:
# search_ctr = round(click_num / exposure, 5)
# except:
# search_ctr = 0
# instert_sql = """replace into search_tractate_ctr(
# partition_date,device_os_type,active_type,pid,click_num,exposure,search_ctr) VALUES('{partition_date}','{device_os_type}','{active_type}','{pid}',{click_num},{exposure},{search_ctr});""".format(
# partition_date=partition_date, device_os_type=device_os_type, active_type=active_type, pid=pid, click_num=click_num,
# exposure=exposure, search_ctr=search_ctr
# )
# print(instert_sql)
# # cursor.execute("set names 'UTF8'")
# db = pymysql.connect(host='172.16.40.158', port=4000, user='st_user', passwd='aqpuBLYzEV7tML5RPsN1pntUzFy',
# db='jerry_prod')
# cursor = db.cursor()
# res = cursor.execute(instert_sql)
# db.commit()
# print(res)
# # cursor.executemany()
# db.close()
# -*- coding:UTF-8 -*-
# @Time : 2020/9/11 11:28
# @File : func_from_redis_get_portrait.py
# @email : litao@igengmei.com
# @author : litao
import json
import redis
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN6@172.16.40.133:6379", decode_responses=True)
redis_client2 = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379", decode_responses=True)
redis_client3 = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN12@172.16.40.164:6379", decode_responses=True)
redis_client4 = redis.StrictRedis.from_url("redis://:XfkMCCdWDIU%ls$h@172.16.50.145:6379", decode_responses=True)
def get_user_portrait_tag3_redis_key(device_id):
return "doris:user_portrait:tag3:device_id:" + str(device_id)
def get_user_portrait_tag3_from_redis(device_id, limit_score=0):
def items_gt_score(d):
new_d = dict(sorted(d.items(), key=lambda x: x[1], reverse=True))
res = {tag: float(score) for tag, score in new_d.items() if float(score) >= limit_score}
return list(res.keys())
portrait_key = get_user_portrait_tag3_redis_key(device_id)
if redis_client2.exists(portrait_key):
user_portrait = json.loads(redis_client2.get(portrait_key))
first_demands = items_gt_score(user_portrait.get("first_demands", {})) # 一级诉求
second_demands = items_gt_score(user_portrait.get("second_demands", {})) # 二级诉求
first_solutions = items_gt_score(user_portrait.get("first_solutions", {})) # 一级方式
second_solutions = items_gt_score(user_portrait.get("second_solutions", {})) # 二级方式
first_positions = items_gt_score(user_portrait.get("first_positions", {})) # 一级部位
second_positions = items_gt_score(user_portrait.get("second_positions", {}))
projects = items_gt_score(user_portrait.get("projects", {})) # 项目
anecdote_tags = items_gt_score(user_portrait.get("anecdote_tags", {})) # 八卦
return {
"first_demands": first_demands,
"second_demands": second_demands,
"first_solutions": first_solutions,
"second_solutions": second_solutions,
"first_positions": first_positions,
"second_positions": second_positions,
"projects": projects,
"anecdote_tags": anecdote_tags
}
return {}
def user_portrait_scan_info():
return_dict = {}
try:
round = 0
all_count = 0
empty_count = 0
just_projects_count = 0
keys = "doris:user_portrait:tag3:device_id:*"
cur, results = redis_client2.scan(0, keys, 3000)
while cur != 0:
round += 1
print("round: " + str(round))
cur, results = redis_client2.scan(cur, keys, 3000)
for key in results:
key = key
device_id = key.split(":")[-1]
all_count += 1
# print(key)
# if user_portrait_is_empty(device_id):
# print(device_id)
# empty_count += 1
# if user_portrait_just_projects(device_id):
# print(device_id)
# just_projects_count += 1
# user_portrait_get_empty_candidates(device_id)
try:
res_dic = get_user_portrait_tag3_from_redis(device_id)
yield res_dic
except:
continue
# for data_list in res_dic:
# for data in data_list:
return return_dict
except Exception as e:
print(e)
return {}
if __name__ == "__main__":
scan_res = user_portrait_scan_info()
for res in scan_res:
print(res)
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment