Commit 791aa476 authored by 高雅喆's avatar 高雅喆

pyspark argsparse test

parent 8dd00094
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import redis
import datetime
from pyspark import SparkConf
import time
from pyspark.sql import SparkSession
import json
import numpy as np
import pandas as pd
from pyspark.sql.functions import lit
from pyspark.sql.functions import concat_ws
from tool import *
import argparse
if __name__ == '__main__':
# cmd参数
parser = argparse.ArgumentParser(description='画像匹配度的统计')
my_yesterday = str(datetime.date.today() - datetime.timedelta(days=1))
parser.add_argument("-o", "--order_date", type=str, dest="order_date", default=my_yesterday, help="统计的行为日期")
parser.add_argument("-log1", "--log1_file", type=str, dest="portrait_stat_log_path",
default="portrait_stat.log", help="画像统计的日志地址")
parser.add_argument("-log2", "--log2_file", type=str, dest="debug_portrait_stat_log_path",
default="debug_portrait_stat.log", help="画像统计的日志地址")
parser.add_argument("-t", "--top", type=int, dest="portrait_top_n", default=3, help="选取画像的前n个tag去统计匹配度")
parser.add_argument("-c", "--coincide", type=int, dest="coincide_n", default=1, help="选取n个tag重合个数作为判断是否匹配的阈值")
parser.add_argument("-v", "--version", type=int, dest="version", default=1, help="选取翔宇(0),英赫(1)版本进行统计")
parser.add_argument("-e", "--exponential", type=int, dest="exponential", default=0, help="是否采用指数衰减")
parser.add_argument("-n", "--normalization_size", type=int, dest="normalization_size", default=7,
help="天数差归一化的区间")
parser.add_argument("-d", "--decay_days", type=int, dest="decay_days", default=180, help="分数衰减的天数")
parser.add_argument("-a", "--action_type", dest="action_type", nargs='+', help="计算匹配度的行为")
parser.add_argument("-s", "--save_tidb", type=int, dest="save_tidb", default=1, help="统计结果是否存tidb")
args = parser.parse_args()
order_date = args.order_date
order_date_tomorrow = str(datetime.datetime.strptime(order_date, '%Y-%m-%d') + datetime.timedelta(days=1))
portrait_stat_log_path = args.portrait_stat_log_path
debug_portrait_stat_log_path = args.debug_portrait_stat_log_path
cmd_portrait_top_n = args.portrait_top_n
cmd_coincide_n = args.coincide_n
version = args.version
exponential = args.exponential
normalization_size = args.normalization_size
decay_days = args.decay_days
action_type = args.action_type
save_tidb = args.save_tidb
# 获取最近30天内的用户设备id
sql_device_ids = "select distinct cl_id from user_new_tag_log " \
"where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
mysql_results = get_data_by_mysql('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC', 'jerry_test', sql_device_ids)
device_ids_lst = [i[0] for i in mysql_results]
print(device_ids_lstp[:10])
# 获取搜索词及其近义词对应的tag
all_word_tags = get_all_word_tags()
all_tag_tag_type = get_all_tag_tag_type()
# 3级tag对应的2级tag
all_3tag_2tag = get_all_3tag_2tag()
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(action_type)
print("="*100)
result = device_ids_lst_rdd.repartition(100).map(
lambda x: args_test(x))
result.collect()
...@@ -252,3 +252,7 @@ def exponential_decay(days_diff, decay_days=180, normalization_size=7): ...@@ -252,3 +252,7 @@ def exponential_decay(days_diff, decay_days=180, normalization_size=7):
# 天数差归一化到[0, normalization_size] # 天数差归一化到[0, normalization_size]
a = (normalization_size - 0) * (days_diff - min(x)) / (max(x) - min(x)) a = (normalization_size - 0) * (days_diff - min(x)) / (max(x) - min(x))
return a return a
def args_test(x):
return "gyz add" + str(x)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment