Commit c1f6542c authored by 高雅喆's avatar 高雅喆

test_df

parent b3da204d
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import redis
import datetime
from pyspark import SparkConf
import time
from pyspark.sql import SparkSession
import json
import numpy as np
import pandas as pd
from pyspark.sql.functions import lit
from pyspark.sql.functions import concat_ws
from tool import *
def get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name, size=None, pay_time=0):
cl_id = x[0]
search_info = x[1]
user_df_service = get_user_log(cl_id, all_word_tags, pay_time=pay_time)
# 增加df字段(days_diff_now, tag_type, tag2)
if not user_df_service.empty:
user_df_service["days_diff_now"] = round((int(time.time()) - user_df_service["time"].astype(float)) / (24 * 60 * 60))
user_df_service["tag_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag_id"]), axis=1)
user_df_service = user_df_service[user_df_service['tag_type'].isin(['2','3'])]
if not user_df_service.empty:
user_log_df_tag2_list = user_df_service[user_df_service['tag_type'] == '2']['tag_id'].unique().tolist()
user_df_service["tag2"] = user_df_service.apply(lambda x:
get_tag2_from_tag3(x.tag_id, all_3tag_2tag, user_log_df_tag2_list)
if x.tag_type == '3' else x.tag_id, axis=1)
user_df_service["tag2_type"] = user_df_service.apply(lambda x: all_tag_tag_type.get(x["tag2"]), axis=1)
# 算分及比例
user_df_service["tag_score"] = user_df_service.apply(
lambda x: compute_henqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "henqiang" else (
compute_jiaoqiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "jiaoqiang" else (
compute_ai_scan(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ai_scan" else (
compute_ruoyixiang(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time) if x.score_type == "ruoyixiang" else
compute_validate(x.days_diff_now, exponential=0)/get_action_tag_count(user_df_service, x.time)))), axis=1)
tag_score_sum = user_df_service.groupby(by=["tag2", "tag2_type"]).agg(
{'tag_score': 'sum', 'cl_id': 'first', 'action': 'first'}).reset_index().sort_values(by=["tag_score"],
ascending=False)
tag_score_sum['weight'] = 100 * tag_score_sum['tag_score'] / tag_score_sum['tag_score'].sum()
tag_score_sum["pay_type"] = tag_score_sum.apply(
lambda x: 3 if x.action == "api/order/validate" else (
2 if x.action == "api/settlement/alipay_callback" else 1
), axis=1
)
gmkv_tag_score_sum = tag_score_sum[["tag2", "tag_score", "weight"]][:size].to_dict('record')
gmkv_tag_score2_sum = tag_score_sum[["tag2", "tag_score"]][:size].to_dict('record')
gmkv_tag_score2_sum_dict = {i["tag2"]: i["tag_score"] for i in gmkv_tag_score2_sum}
gmkv_tag_score3_sum_dict = {all_tags_name[i]: gmkv_tag_score2_sum_dict[i] for i in gmkv_tag_score2_sum_dict}
gmkv_tag_score3_sum_dict_sort_list = sorted(gmkv_tag_score3_sum_dict.items(), key=lambda x:x[1],reverse=True)
portrait_result = [i[0] for i in gmkv_tag_score3_sum_dict_sort_list]
return [cl_id, search_info, portrait_result]
# data
device_info = []
with open("/home/gmuser/gyz/log/have_search_device_20191105.csv", "r") as f:
for line in f.readlines():
data = line.strip().split("=")
device = data[0]
search_words = eval(data[1])
device_info.append([device, search_words])
pay_time = 1572883200
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_info)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name, size=None, pay_time=pay_time))
# result.foreach(print)
result_df = result.toDF()
result_df.to_csv("~/test_df.csv")
spark.stop()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment