from __future__ import absolute_import from __future__ import division from __future__ import print_function import pymysql import smtplib from email.mime.text import MIMEText from email.utils import formataddr from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication import redis import datetime from pyspark import SparkConf import time from pyspark.sql import SparkSession import json import numpy as np import pandas as pd from pyspark.sql.functions import lit from pyspark.sql.functions import concat_ws from tool import * import argparse if __name__ == '__main__': # cmd参数 parser = argparse.ArgumentParser(description='画像匹配度的统计') my_yesterday = str(datetime.date.today() - datetime.timedelta(days=1)) parser.add_argument("-o", "--order_date", type=str, dest="order_date", default=my_yesterday, help="统计的行为日期") parser.add_argument("-log1", "--log1_file", type=str, dest="portrait_stat_log_path", default="portrait_stat.log", help="画像统计的日志地址") parser.add_argument("-log2", "--log2_file", type=str, dest="debug_portrait_stat_log_path", default="debug_portrait_stat.log", help="画像统计的日志地址") parser.add_argument("-t", "--top", type=int, dest="portrait_top_n", default=3, help="选取画像的前n个tag去统计匹配度") parser.add_argument("-c", "--coincide", type=int, dest="coincide_n", default=1, help="选取n个tag重合个数作为判断是否匹配的阈值") parser.add_argument("-v", "--version", type=int, dest="version", default=1, help="选取翔宇(0),英赫(1)版本进行统计") parser.add_argument("-e", "--exponential", type=int, dest="exponential", default=0, help="是否采用指数衰减") parser.add_argument("-n", "--normalization_size", type=int, dest="normalization_size", default=7, help="天数差归一化的区间") parser.add_argument("-d", "--decay_days", type=int, dest="decay_days", default=180, help="分数衰减的天数") parser.add_argument("-a", "--action_type", dest="action_type", nargs='+', help="计算匹配度的行为") parser.add_argument("-s", "--save_tidb", type=int, dest="save_tidb", default=1, help="统计结果是否存tidb") args = parser.parse_args() order_date = args.order_date order_date_tomorrow = str(datetime.datetime.strptime(order_date, '%Y-%m-%d') + datetime.timedelta(days=1)) portrait_stat_log_path = args.portrait_stat_log_path debug_portrait_stat_log_path = args.debug_portrait_stat_log_path cmd_portrait_top_n = args.portrait_top_n cmd_coincide_n = args.coincide_n version = args.version exponential = args.exponential normalization_size = args.normalization_size decay_days = args.decay_days action_type = args.action_type save_tidb = args.save_tidb # 获取最近30天内的用户设备id sql_device_ids = "select distinct cl_id from user_new_tag_log " \ "where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))" mysql_results = get_data_by_mysql('172.16.40.158', 4000, 'st_user', 'aqpuBLYzEV7tML5RPsN1pntUzFy', 'jerry_test', sql_device_ids) device_ids_lst = [i["cl_id"] for i in mysql_results] print(device_ids_lst[:10]) # 获取搜索词及其近义词对应的tag all_word_tags = get_all_word_tags() all_tag_tag_type = get_all_tag_tag_type() # 3级tag对应的2级tag all_3tag_2tag = get_all_3tag_2tag() # rdd sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .set("spark.tispark.plan.allow_index_double_read", "false") \ .set("spark.tispark.plan.allow_index_read", "true") \ .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \ .set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \ .set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy") spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel("WARN") spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py") device_ids_lst_rdd = spark.sparkContext.parallelize(action_type) print("="*100) print(action_type) print(type(device_ids_lst_rdd)) print(device_ids_lst_rdd) print("=" * 100) result = device_ids_lst_rdd.repartition(100).map( lambda x: args_test(x)) print(result) print(result.collect()) print(result.foreach(print))