From f837f92efa8a09ada2e077eadbb5574d742d24c1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=AB=98=E9=9B=85=E5=96=86?= <gaoyazhe@igengmei.com>
Date: Fri, 6 Dec 2019 10:58:41 +0800
Subject: [PATCH] add stat_device_order_portrait_score_1106_1206

---
 .../stat_device_order_portrait_score.py       | 111 ++++++++++++++++++
 1 file changed, 111 insertions(+)
 create mode 100644 eda/smart_rank/stat_device_order_portrait_score.py

diff --git a/eda/smart_rank/stat_device_order_portrait_score.py b/eda/smart_rank/stat_device_order_portrait_score.py
new file mode 100644
index 00000000..f5cecc69
--- /dev/null
+++ b/eda/smart_rank/stat_device_order_portrait_score.py
@@ -0,0 +1,111 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+import pymysql
+import smtplib
+from email.mime.text import MIMEText
+from email.utils import formataddr
+from email.mime.multipart import MIMEMultipart
+from email.mime.application import MIMEApplication
+import redis
+import datetime
+from pyspark import SparkConf
+import time
+from pyspark.sql import SparkSession
+import json
+import numpy as np
+import pandas as pd
+from pyspark.sql.functions import lit
+from pyspark.sql.functions import concat_ws
+from tool import *
+
+
+def get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name, size=None, pay_time=0):
+    pay_time = x[0]
+    cl_id = x[1]
+    order_tag_id = x[2]
+    user_df_service = get_user_log(cl_id, all_word_tags, pay_time=pay_time)
+
+    # 增加df字段(days_diff_now, tag_type, tag2)
+    if not user_df_service.empty:
+        user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60))
+        user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1)
+        user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])]
+        if not user_df_service.empty:
+            user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist()
+            user_df_service["tag2"] = user_df_service.apply(lambda x:
+                                                    get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list)
+                                                    if x.tag_type == '3' else x.tag_id, axis=1)
+            user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1)
+            # 算分及比例
+            user_df_service["tag_score"] = user_df_service.apply(
+                lambda x: compute_henqiang(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else (
+                    compute_jiaoqiang(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else (
+                        compute_ai_scan(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else (
+                        compute_ruoyixiang(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else
+                        compute_validate(x.days_diff_now, exponential=1)/get_action_tag_count(user_df_service, x.time)))), axis=1)
+            tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg(
+                {'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"],
+                                                                                                     ascending=False)
+            tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum()
+            tag_score_sum["pay_type"] = tag_score_sum.apply(
+                lambda x: 3 if x.action == "api/order/validate" else (
+                    2 if x.action == "api/settlement/alipay_callback" else 1
+                ), axis=1
+            )
+            gmkv_tag_score2_sum = tag_score_sum[["tag2", "tag_score"]][:size].to_dict('record')
+            gmkv_tag_score2_sum_dict = {i["tag2"]: i["tag_score"] for i in gmkv_tag_score2_sum}
+            order_tag_id_score = gmkv_tag_score2_sum_dict.get(int(order_tag_id), 0)
+            if not portrait_result:
+                order_tag_id_score = 0
+            return pay_time, cl_id, order_tag_id, order_tag_id_score
+    else:
+        return pay_time, cl_id, order_tag_id, 0
+
+
+# 获取近一个月设备下单的时间、设备id、标签id
+def get_device_order_info(start_timestamp):
+    sql = "select distinct time, cl_id, tag_id from user_new_tag_log where action='api/settlement/alipay_callback' and time > {} and cl_id !=''".format(
+        start_timestamp)
+    db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
+                                    db='jerry_test', charset='utf8')
+    cur_jerry_test = db_jerry_test.cursor()
+    cur_jerry_test.execute(sql)
+    data = list(cur_jerry_test.fetchall())
+    return data
+
+# data
+start_timestamp = 1572969600
+device_info = get_device_order_info(start_timestamp)
+
+
+# 获取搜索词及其近义词对应的tag
+all_word_tags = get_all_word_tags()
+all_tag_tag_type = get_all_tag_tag_type()
+
+# 3级tag对应的2级tag
+all_3tag_2tag = get_all_3tag_2tag()
+
+# 标签id对应的中文名称
+all_tags_name = get_all_tags_name()
+
+# rdd
+sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
+    .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
+    .set("spark.tispark.plan.allow_index_double_read", "false") \
+    .set("spark.tispark.plan.allow_index_read", "true") \
+    .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
+    .set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
+    .set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
+
+
+spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
+spark.sparkContext.setLogLevel("WARN")
+spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
+device_ids_lst_rdd = spark.sparkContext.parallelize(device_info)
+result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name, size=None)).filter(lambda x: x is not None)
+print(result.count())
+print(result.take(10))
+df = spark.createDataFrame(result).na.drop().toDF("device", "search_words", "user_portrait").na.drop().toPandas()
+df.to_csv("~/gyz/log/stat_device_order_portrait_score_1106_1206.csv", index=False)
+spark.stop()
-- 
2.18.0