Commit f837f92e authored by 高雅喆's avatar 高雅喆

add stat_device_order_portrait_score_1106_1206

parent 21efd7bb
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import redis
import datetime
from pyspark import SparkConf
import time
from pyspark.sql import SparkSession
import json
import numpy as np
import pandas as pd
from pyspark.sql.functions import lit
from pyspark.sql.functions import concat_ws
from tool import *
def get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name, size=None, pay_time=0):
pay_time = x[0]
cl_id = x[1]
order_tag_id = x[2]
user_df_service = get_user_log(cl_id, all_word_tags, pay_time=pay_time)
# 增加df字段(days_diff_now, tag_type, tag2)
if not user_df_service.empty:
user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60))
user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1)
user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])]
if not user_df_service.empty:
user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist()
user_df_service["tag2"] = user_df_service.apply(lambda x:
get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list)
if x.tag_type == '3' else x.tag_id, axis=1)
user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1)
# 算分及比例
user_df_service["tag_score"] = user_df_service.apply(
lambda x: compute_henqiang(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else (
compute_ai_scan(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else (
compute_ruoyixiang(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else
compute_validate(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time)))), axis=1)
tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg(
{'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"],
ascending=False)
tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum()
tag_score_sum["pay_type"] = tag_score_sum.apply(
lambda x: 3 if x.action == "api/order/validate" else (
2 if x.action == "api/settlement/alipay_callback" else 1
), axis=1
)
gmkv_tag_score2_sum = tag_score_sum[["tag2", "tag_score"]][:size].to_dict('record')
gmkv_tag_score2_sum_dict = {i["tag2"]: i["tag_score"] for i in gmkv_tag_score2_sum}
order_tag_id_score = gmkv_tag_score2_sum_dict.get(int(order_tag_id), 0)
if not portrait_result:
order_tag_id_score = 0
return pay_time, cl_id, order_tag_id, order_tag_id_score
else:
return pay_time, cl_id, order_tag_id, 0
# 获取近一个月设备下单的时间、设备id、标签id
def get_device_order_info(start_timestamp):
sql = "select distinct time, cl_id, tag_id from user_new_tag_log where action='api/settlement/alipay_callback' and time > {} and cl_id !=''".format(
start_timestamp)
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
cur_jerry_test.execute(sql)
data = list(cur_jerry_test.fetchall())
return data
# data
start_timestamp = 1572969600
device_info = get_device_order_info(start_timestamp)
# 获取搜索词及其近义词对应的tag
all_word_tags = get_all_word_tags()
all_tag_tag_type = get_all_tag_tag_type()
# 3级tag对应的2级tag
all_3tag_2tag = get_all_3tag_2tag()
# 标签id对应的中文名称
all_tags_name = get_all_tags_name()
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_info)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name, size=None)).filter(lambda x: x is not None)
print(result.count())
print(result.take(10))
df = spark.createDataFrame(result).na.drop().toDF("device", "search_words", "user_portrait").na.drop().toPandas()
df.to_csv("~/gyz/log/stat_device_order_portrait_score_1106_1206.csv", index=False)
spark.stop()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment