1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import redis
import datetime
from pyspark import SparkConf
import time
from pyspark.sql import SparkSession
import json
import zstd
def zstd_compress(x):
if x:
return zstd.compress(bytes(json.dumps(x), encoding='utf-8'), 22)
def zstd_decompress(x):
if x:
result = zstd.decompress(x)
return json.loads(result)
# ctr变化(更新用户的所有美购smart rank分
def update_device_smart_rank(device_id, result_all_dict, service_detail_view_count_30_dict, result_smart_rank_score_dict):
device_meigou_ctr_key = 'device_meigou_ctr:device_id:' + str(device_id)
gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=2, socket_timeout=2000)
if gm_kv_cli.exists(device_meigou_ctr_key):
ts_device_meigou_ctr = gm_kv_cli.hgetall(device_meigou_ctr_key)
device_meigou_smart_rank = dict()
for i in ts_device_meigou_ctr:
ts_ctr = float(ts_device_meigou_ctr[i])
service_id = str(i,encoding="utf-8")
meigou_smart_rank_score = get_meigou_smart_rank(service_id, result_all_dict, service_detail_view_count_30_dict, ts_ctr, result_smart_rank_score_dict)
device_meigou_smart_rank.update({service_id: meigou_smart_rank_score})
device_meigou_smart_rank_key = 'device_meigou_smart_rank_zstd:device_id:' + str(device_id)
REDIS_URL = 'redis://:ReDis!GmTx*0aN6@172.16.40.133:6379'
cli_ins = redis.StrictRedis.from_url(REDIS_URL)
cli_ins.set(device_meigou_smart_rank_key, zstd_compress(device_meigou_smart_rank))
cli_ins.expire(device_meigou_smart_rank_key, time=24 * 60 * 60)
return meigou_smart_rank_score
return "periodic update fail"
# 获取美购的smart rank分
def get_meigou_smart_rank(service_id, result_all_dict, service_detail_view_count_30_dict, meigou_ctr, result_smart_rank_score_dict, table_cpc_price=-1):
if service_id in result_all_dict:
consult_value = result_all_dict[service_id]["consult_value"]
if table_cpc_price == -1:
click_price = result_all_dict[service_id]["click_price"]
else:
click_price = table_cpc_price
service_detail_view_count_30 = service_detail_view_count_30_dict[service_id].get("service_detail_view_count_30", 0)
if click_price == 0 and service_detail_view_count_30 <= 500:
ctr_value = meigou_ctr
else:
return float('%.4g' % result_smart_rank_score_dict[service_id]["new_smart_rank"])
discount_value = result_all_dict[service_id]["discount_value"]
cpt_value = result_all_dict[service_id]["cpt_value"]
org_value = discount_value + 0.5 * cpt_value + click_price
else:
if service_id in result_smart_rank_score_dict:
return float('%.4g' % result_smart_rank_score_dict[service_id]["new_smart_rank"])
else:
consult_value = 0.001
ctr_value = 0.1
discount_value = 0.001
cpt_value = 0.001
click_price = 0
org_value = discount_value + 0.5 * cpt_value + click_price
meigou_smart_rank_score = consult_value * ctr_value * org_value
return float('%.4g' % meigou_smart_rank_score)
if __name__ == '__main__':
try:
start = time.time()
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work", password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select service_id,service_detail_view_count_30 from statistic_service_smart_rank_v3 where stat_date=(select max(stat_date) from statistic_service_smart_rank_v3)"
cur_zhengxing.execute(sql)
result = cur_zhengxing.fetchall()
service_detail_view_count_30_dict = dict()
for i in result:
service_detail_view_count_30_dict.update({str(i["service_id"]): i})
# meigou smart_rank所有因子
sql_smart_rank = "select service_id,discount_value,cpt_value,click_price,consult_value,ctr_value from api_smart_rank_factor"
cur_zhengxing.execute(sql_smart_rank)
result_all = cur_zhengxing.fetchall()
result_all_dict = dict()
for i in result_all:
result_all_dict.update({str(i["service_id"]): i})
# smart_rank_score
sql_smart_rank_score = "select service_id, new_smart_rank from api_smart_rank"
cur_zhengxing.execute(sql_smart_rank_score)
result_smart_rank_score = cur_zhengxing.fetchall()
result_smart_rank_score_dict = dict()
for i in result_smart_rank_score:
result_smart_rank_score_dict.update({str(i["service_id"]): i})
db_zhengxing.close()
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
REDIS_URL = 'redis://:ReDis!GmTx*0aN6@172.16.40.133:6379'
cli_ins = redis.StrictRedis.from_url(REDIS_URL)
gray_level_device_ids = "doris:ctr_estimate:device_id_list"
if cli_ins.exists(gray_level_device_ids):
device_ids = cli_ins.smembers(gray_level_device_ids)
# device_ids = [b"9C5E7C73-380C-4623-8F48-A64C8034E315" for i in range(1000)]
device_ids_rdd = spark.sparkContext.parallelize(device_ids)
result = device_ids_rdd.repartition(40).map(
lambda x: update_device_smart_rank(str(x, encoding='utf-8'), result_all_dict,
service_detail_view_count_30_dict, result_smart_rank_score_dict))
result.collect()
print(time.time() - start)
except Exception as e:
print(e)