Commit 487f24de authored by 高雅喆's avatar 高雅喆

dict periodic update smart rank score

parent 14285cc4
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import redis
import datetime
from pyspark import SparkConf
import time
from pyspark.sql import SparkSession
# ctr变化(更新用户的所有美购smart rank分_
def update_device_smart_rank(device_id, result_all_dict, service_detail_view_count_30_dict, result_smart_rank_score_dict):
device_meigou_ctr_key = 'device_meigou_ctr:device_id:' + str(device_id)
REDIS_URL = 'redis://:ReDis!GmTx*0aN6@172.16.40.133:6379'
cli_ins = redis.StrictRedis.from_url(REDIS_URL)
if cli_ins.exists(device_meigou_ctr_key):
ts_device_meigou_ctr = cli_ins.hgetall(device_meigou_ctr_key)
device_meigou_smart_rank = dict()
for i in ts_device_meigou_ctr:
ts_ctr = float(ts_device_meigou_ctr[i])
service_id = str(i,encoding="utf-8")
meigou_smart_rank_score = get_meigou_smart_rank(service_id, result_all_dict, service_detail_view_count_30_dict, ts_ctr, result_smart_rank_score_dict)
device_meigou_smart_rank.update({service_id: meigou_smart_rank_score})
device_meigou_smart_rank_key = 'device_meigou_smart_rank:device_id:' + str(device_id)
cli_ins.hmset(device_meigou_smart_rank_key, device_meigou_smart_rank)
cli_ins.expire(device_meigou_smart_rank_key, time=24 * 60 * 60)
return meigou_smart_rank_score
return "periodic update fail"
# cpc变化(更新每个用户cpc发生变化的美购smart rank分)
def update_all_device_meigou_smart_rank(service_id, result_all_dict, service_detail_view_count_30_dict, result_smart_rank_score_dict):
#cpc价格
db_artemis_prod = pymysql.connect(host="172.16.30.137", port=3306, user="artemis", password="gaehau9shai9Ae",
db="artemis_prod", cursorclass=pymysql.cursors.DictCursor)
cur_artemis_prod = db_artemis_prod.cursor()
sql_cpc = "select service_id,is_promote,click_price from cpc_promote where service_id={}".format(service_id)
cur_artemis_prod.execute(sql_cpc)
result_cpc = cur_artemis_prod.fetchall()
if len(result_cpc) > 0:
if result_cpc[0]["is_promote"]:
click_price = result_cpc[0]["click_price"]
else:
click_price = 0
else:
click_price = 0
db_artemis_prod.close()
Today_active_device_key = 'Today_active_device:device_id:' + str(datetime.datetime.now().strftime('%Y-%m-%d'))
REDIS_URL = 'redis://:ReDis!GmTx*0aN6@172.16.40.133:6379'
cli_ins = redis.StrictRedis.from_url(REDIS_URL)
if cli_ins.exists(Today_active_device_key):
Today_active_device = cli_ins.smembers(Today_active_device_key)
for i in Today_active_device:
device_meigou_smart_rank_key = 'device_meigou_smart_rank:device_id:' + str(i, encoding='utf-8')
device_meigou_ctr_key = 'device_meigou_ctr:device_id:' + str(i, encoding='utf-8')
if cli_ins.exists(device_meigou_ctr_key):
ts_device_meigou_ctr = cli_ins.hget(device_meigou_ctr_key, service_id)
if not ts_device_meigou_ctr:
ts_device_meigou_ctr = 1.0
else:
ts_device_meigou_ctr = 1.0
score = get_meigou_smart_rank(service_id, result_all_dict, service_detail_view_count_30_dict, float(ts_device_meigou_ctr), result_smart_rank_score_dict, click_price)
if cli_ins.exists(device_meigou_smart_rank_key):
cli_ins.hmset(device_meigou_smart_rank_key, {service_id : score})
# 获取美购的smart rank分
def get_meigou_smart_rank(service_id, result_all_dict, service_detail_view_count_30_dict, meigou_ctr, result_smart_rank_score_dict, table_cpc_price=-1):
if service_id in result_all_dict:
consult_value = result_all_dict[service_id]["consult_value"]
if table_cpc_price == -1:
click_price = result_all_dict[service_id]["click_price"]
else:
click_price = table_cpc_price
service_detail_view_count_30 = service_detail_view_count_30_dict[service_id].get("service_detail_view_count_30", 0)
if click_price == 0 and service_detail_view_count_30 <= 500:
ctr_value = meigou_ctr
else:
return float('%.4g' % result_smart_rank_score_dict[service_id]["new_smart_rank"])
discount_value = result_all_dict[service_id]["discount_value"]
cpt_value = result_all_dict[service_id]["cpt_value"]
org_value = discount_value + 0.5 * cpt_value + click_price
else:
if service_id in result_smart_rank_score_dict:
return float('%.4g' % result_smart_rank_score_dict[service_id]["new_smart_rank"])
else:
consult_value = 0.001
ctr_value = 0.1
discount_value = 0.001
cpt_value = 0.001
click_price = 0
org_value = discount_value + 0.5 * cpt_value + click_price
meigou_smart_rank_score = consult_value * ctr_value * org_value
return float('%.4g' % meigou_smart_rank_score)
def send_email(app,id,e):
# 第三方 SMTP 服务
mail_host = 'smtp.exmail.qq.com' # 设置服务器
mail_user = "gaoyazhe@igengmei.com" # 用户名
mail_pass = "VCrKTui99a7ALhiK" # 口令
sender = 'gaoyazhe@igengmei.com'
receivers = ['gaoyazhe@igengmei.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e = str(e)
msg = MIMEMultipart()
part = MIMEText('app_id:'+id+':fail', 'plain', 'utf-8')
msg.attach(part)
msg['From'] = formataddr(["gaoyazhe", sender])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg['To'] = ";".join(receivers)
# message['Cc'] = ";".join(cc_reciver)
msg['Subject'] = 'spark streaming:app_name:'+app
with open('error.txt','w') as f:
f.write(e)
f.close()
part = MIMEApplication(open('error.txt', 'r').read())
part.add_header('Content-Disposition', 'attachment', filename="error.txt")
msg.attach(part)
try:
smtpObj = smtplib.SMTP_SSL(mail_host, 465)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, msg.as_string())
except smtplib.SMTPException:
print('error')
if __name__ == '__main__':
try:
start = time.time()
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work", password="BJQaT9VzDcuPBqkd",
db="zhengxing", cursorclass=pymysql.cursors.DictCursor)
cur_zhengxing = db_zhengxing.cursor()
sql = "select service_id,service_detail_view_count_30 from statistic_service_smart_rank_v3 where stat_date=(select max(stat_date) from statistic_service_smart_rank_v3)"
cur_zhengxing.execute(sql)
result = cur_zhengxing.fetchall()
service_detail_view_count_30_dict = dict()
for i in result:
service_detail_view_count_30_dict.update({str(i["service_id"]): i})
# meigou smart_rank所有因子
sql_smart_rank = "select service_id,discount_value,cpt_value,click_price,consult_value,ctr_value from api_smart_rank_factor"
cur_zhengxing.execute(sql_smart_rank)
result_all = cur_zhengxing.fetchall()
result_all_dict = dict()
for i in result_all:
result_all_dict.update({str(i["service_id"]): i})
# smart_rank_score
sql_smart_rank_score = "select service_id, new_smart_rank from api_smart_rank"
cur_zhengxing.execute(sql_smart_rank_score)
result_smart_rank_score = cur_zhengxing.fetchall()
result_smart_rank_score_dict = dict()
for i in result_smart_rank_score:
result_smart_rank_score_dict.update({str(i["service_id"]): i})
db_zhengxing.close()
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
REDIS_URL = 'redis://:ReDis!GmTx*0aN6@172.16.40.133:6379'
cli_ins = redis.StrictRedis.from_url(REDIS_URL)
gray_level_device_ids = "doris:ctr_estimate:device_id_list"
if cli_ins.exists(gray_level_device_ids):
device_ids = cli_ins.smembers(gray_level_device_ids)
# device_ids = [b"9C5E7C73-380C-4623-8F48-A64C8034E315" for i in range(1000)]
device_ids_rdd = spark.parallelize(device_ids)
result = device_ids_rdd.repartition(10).map(
lambda x: update_device_smart_rank(str(x, encoding='utf-8'), result_all_dict,
service_detail_view_count_30_dict, result_smart_rank_score_dict))
result.foreach(print)
print(time.time() - start)
except Exception as e:
print(e)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment