Commit bfa8847f authored by 郭羽's avatar 郭羽

排序模型优化

parent 75815fee
......@@ -315,7 +315,7 @@ def addUserFeatures(samples,dataVocab,multiVocab):
return samples
def addSampleLabel(ratingSamples):
ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 5, 1).otherwise(0))
ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 1, 1).otherwise(0))
ratingSamples.show(5, truncate=False)
ratingSamples.printSchema()
return ratingSamples
......@@ -852,8 +852,8 @@ if __name__ == '__main__':
df = ratingSamplesWithLabel.toPandas()
df = pd.DataFrame(df)
posCount = df.loc[df["label"]==0]["label"].count()
negCount = df.loc[df["label"]==1]["label"].count()
posCount = df.loc[df["label"]==1]["label"].count()
negCount = df.loc[df["label"]==0]["label"].count()
print("pos size:"+str(posCount),"neg size:"+str(negCount))
itemDF = get_service_feature_df()
......
import sys
import os
from datetime import date, timedelta
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
......@@ -11,48 +10,24 @@ import redis
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import when,col
from pyspark.sql.functions import when
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, QuantileDiscretizer, MinMaxScaler
from collections import defaultdict
import json
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import utils.configUtils as configUtils
# import utils.connUtils as connUtils
import pandas as pd
import math
# os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
"""
特征工程
"""
ITEM_MULTI_COLUMN_EXTRA_MAP = {"first_demands": 1,
"second_demands": 5,
"first_solutions": 1,
"second_solutions": 5,
"first_positions": 1,
"second_positions": 5,
"tags_v3": 10,
}
USER_MULTI_COLUMN_EXTRA_MAP = {"first_demands": 1,
"second_demands": 3,
"first_solutions": 1,
"second_solutions": 3,
"first_positions": 1,
"second_positions": 3,
"tags_v3": 5,
}
ITEM_NUMBER_COLUMNS = ["lowest_price","smart_rank2","case_count","ordered_user_ids_count"]
ITEM_CATE_COLUMNS = ["service_type","merchant_id","doctor_type","doctor_id","doctor_famous","hospital_id","hospital_city_tag_id","hospital_type","hospital_is_high_quality"]
NUMBER_PRECISION = 2
VERSION = configUtils.SERVICE_VERSION
......@@ -61,174 +36,333 @@ FEATURE_ITEM_KEY = "Strategy:rec:feature:service:" + VERSION + ":item:"
FEATURE_VOCAB_KEY = "Strategy:rec:vocab:service:" + VERSION
FEATURE_COLUMN_KEY = "Strategy:rec:column:service:" + VERSION
TRAIN_FILE_PATH = "service_feature_" + VERSION
ITEM_PREFIX = "item_"
DATA_PATH_TRAIN = "/data/files/service_feature_{}_train.csv".format(VERSION)
def getRedisConn():
pool = redis.ConnectionPool(host="172.16.50.145",password="XfkMCCdWDIU%ls$h",port=6379,db=0)
pool = redis.ConnectionPool(host="172.16.50.145", password="XfkMCCdWDIU%ls$h", port=6379, db=0)
conn = redis.Redis(connection_pool=pool)
# conn = redis.Redis(host="172.16.50.145", port=6379, password="XfkMCCdWDIU%ls$h",db=0)
# conn = redis.Redis(host="172.18.51.10", port=6379,db=0) #test
return conn
def addItemFeatures(samples,itemDF,dataVocab,multiVocab):
itemDF = itemDF.withColumnRenamed("id", "itemid")
# 数据过滤:无医生
itemDF = itemDF.filter(col("doctor_id") != "-1")
def parseTags(tags, i):
tags_arr = tags.split(",")
if len(tags_arr) >= i:
return tags_arr[i - 1]
else:
return "-1"
# itemid
vocabList = collectColumnToVocab(itemDF, "itemid")
dataVocab["itemid"] = vocabList
# null处理
for c in ITEM_NUMBER_COLUMNS:
print("null count:",c,itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(ITEM_PREFIX+c,when(col(c).isNull(),0).otherwise(col(c)).cast("float")).drop(c)
def numberToBucket(num):
res = 0
if not num:
return str(res)
if num >= 1000:
res = 1000 // 10
else:
res = int(num) // 10
return str(res)
for c in ITEM_CATE_COLUMNS:
print("null count:", c, itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(ITEM_PREFIX+c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))).drop(c)
# 字典添加
dataVocab[ITEM_PREFIX+c] = collectColumnToVocab(itemDF,ITEM_PREFIX+c)
# 离散特征处理
for c, v in ITEM_MULTI_COLUMN_EXTRA_MAP.items():
print("null count:", c, itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c)))
def priceToBucket(num):
res = 0
if not num:
return str(res)
if num >= 100000:
res = 100000 // 1000
else:
res = int(num) // 1000
return str(res)
multiVocab[c] = collectMutiColumnToVocab(itemDF, c)
for i in range(1, v + 1):
new_c = ITEM_PREFIX + c + "__" + str(i)
itemDF = itemDF.withColumn(new_c, F.split(F.col(c), ",")[i - 1])
itemDF = itemDF.withColumn(new_c, F.when(F.col(new_c).isNull(), "-1").otherwise(F.col(new_c)))
dataVocab[new_c] = multiVocab[c]
numberToBucketUdf = F.udf(numberToBucket, StringType())
priceToBucketUdf = F.udf(priceToBucket, StringType())
samples = samples.join(itemDF, on=['itemid'], how='inner')
# 统计特征处理
print("统计特征处理...")
staticFeatures = samples.groupBy('itemid').agg(F.count(F.lit(1)).alias('itemRatingCount'),
def addItemStaticFeatures(samples, itemDF, dataVocab):
ctrUdf = F.udf(wilson_ctr, FloatType())
# item不设置over窗口,原因:item可能一直存在,统计数据按照最新即可
print("item统计特征处理...")
staticFeatures = samples.groupBy('item_id').agg(F.count(F.lit(1)).alias('itemRatingCount'),
F.avg(F.col('rating')).alias('itemRatingAvg'),
F.stddev(F.col('rating')).alias('itemRatingStddev')).fillna(0) \
F.stddev(F.col('rating')).alias('itemRatingStddev'),
F.sum(
when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).alias(
"itemClickCount"),
F.sum(
when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).alias(
"itemExpCount")
).fillna(0) \
.withColumn('itemRatingStddev', F.format_number(F.col('itemRatingStddev'), NUMBER_PRECISION).cast("float")) \
.withColumn('itemRatingAvg', F.format_number(F.col('itemRatingAvg'), NUMBER_PRECISION).cast("float"))
# join item rating features
samples = samples.join(staticFeatures, on=['itemid'], how='left')
.withColumn('itemRatingAvg', F.format_number(F.col('itemRatingAvg'), NUMBER_PRECISION).cast("float")) \
.withColumn('itemCtr',
F.format_number(ctrUdf(F.col("itemClickCount"), (F.col("itemExpCount"))), NUMBER_PRECISION).cast(
"float"))
staticFeatures.show(20, truncate=False)
staticFeatures = itemDF.join(staticFeatures, on=["item_id"], how='left')
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ["itemRatingCount", "itemRatingAvg", "itemClickCount", "itemExpCount"]:
new_col = col + bucket_suffix
staticFeatures = staticFeatures.withColumn(new_col, numberToBucketUdf(F.col(col))) \
.drop(col) \
.withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col)))
dataVocab[new_col] = bucket_vocab
# 方差处理
number_suffix = "_number"
for col in ["itemRatingStddev"]:
new_col = col + number_suffix
staticFeatures = staticFeatures.withColumn(new_col,
F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(
col)
for col in ["itemCtr"]:
new_col = col + number_suffix
staticFeatures = staticFeatures.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(
F.col(col))).withColumnRenamed(col, new_col)
print("item size:", staticFeatures.count())
staticFeatures.show(5, truncate=False)
return staticFeatures
def addUserStaticsFeatures(samples, dataVocab):
print("user统计特征处理...")
samples = samples \
.withColumn('userRatingCount', F.format_number(
F.sum(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingAvg", F.format_number(
F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingStddev", F.format_number(
F.stddev(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userClickCount", F.format_number(
F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).over(
sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
"float")) \
.withColumn("userExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over(
sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
"float")) \
.withColumn("userCtr",
F.format_number(F.col("userClickCount") / (F.col("userExpCount") + 1), NUMBER_PRECISION).cast(
"float")) \
.filter(F.col("userRatingCount") > 1)
print("连续特征处理...")
# todo 分桶比较耗时,可以考虑做非线性转换
# 连续特征处理
pipelineStage = []
# Normalization
# for c in ["itemRatingAvg","itemRatingStddev"]:
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c+"Scale"))
samples.show(20, truncate=False)
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ["userRatingCount", "userRatingAvg", "userClickCount", "userExpCount"]:
new_col = col + bucket_suffix
samples = samples.withColumn(new_col, numberToBucketUdf(F.col(col))) \
.drop(col) \
.withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col)))
dataVocab[new_col] = bucket_vocab
# 方差处理
number_suffix = "_number"
for col in ["userRatingStddev"]:
new_col = col + number_suffix
samples = samples.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
for col in ["userCtr"]:
new_col = col + number_suffix
samples = samples.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col,
new_col)
# bucketing
bucketColumns = [ITEM_PREFIX+"case_count", ITEM_PREFIX+"ordered_user_ids_count", ITEM_PREFIX+"lowest_price", "itemRatingCount", "itemRatingStddev","itemRatingAvg"]
for c in bucketColumns:
pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket"))
samples.printSchema()
samples.show(20, truncate=False)
return samples
featurePipeline = Pipeline(stages=pipelineStage)
samples = featurePipeline.fit(samples).transform(samples)
# 转string
for c in bucketColumns:
samples = samples.withColumn(c + "Bucket",F.col(c + "Bucket").cast("string")).drop(c)
def addItemFeatures(itemDF, dataVocab, multi_col_vocab):
# multi_col = ['sku_tags', 'sku_show_tags','second_demands', 'second_solutions', 'second_positions']
multi_col = ['tags_v3', 'second_demands', 'second_solutions', 'second_positions']
onehot_col = ['id', 'service_type', 'merchant_id', 'doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id',
'hospital_city_tag_id', 'hospital_type', 'hospital_is_high_quality']
dataVocab[c + "Bucket"] = [str(float(i)) for i in range(11)]
for col in onehot_col:
new_c = ITEM_PREFIX + col
dataVocab[new_c] = list(set(itemDF[col].tolist()))
itemDF[new_c] = itemDF[col]
itemDF = itemDF.drop(columns=onehot_col)
for c in multi_col:
multi_col_vocab[c] = list(set(itemDF[c].tolist()))
samples.printSchema()
# samples.show(5, truncate=False)
for i in range(1, 6):
new_c = ITEM_PREFIX + c + "__" + str(i)
itemDF[new_c] = itemDF[c].map(lambda x: parseTags(x, i))
dataVocab[new_c] = multi_col_vocab[c]
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ['case_count', 'sales_count']:
new_col = ITEM_PREFIX + col + bucket_suffix
itemDF[new_col] = itemDF[col].map(numberToBucket)
itemDF = itemDF.drop(columns=[col])
dataVocab[new_col] = bucket_vocab
for col in ['sku_price']:
new_col = ITEM_PREFIX + col + bucket_suffix
itemDF[new_col] = itemDF[col].map(priceToBucket)
itemDF = itemDF.drop(columns=[col])
dataVocab[new_col] = bucket_vocab
# 连续数据处理
number_suffix = "_number"
for col in ["discount"]:
new_col = ITEM_PREFIX + col + number_suffix
itemDF[new_col] = itemDF[col]
itemDF = itemDF.drop(columns=[col])
return itemDF
return samples
def extractTags(genres_list):
# 根据点击列表顺序加权
genres_dict = defaultdict(int)
for genres in genres_list:
for i, genres in enumerate(genres_list):
for genre in genres.split(','):
genres_dict[genre] += 1
genres_dict[genre] += i
sortedGenres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
return [x[0] for x in sortedGenres]
# sql版本不支持F.reverse
def arrayReverse(arr):
arr.reverse()
return arr
def addUserFeatures(samples,dataVocab,multiVocab):
"""
p —— 概率,即点击的概率,也就是 CTR
n —— 样本总数,即曝光数
z —— 在正态分布里,均值 + z * 标准差会有一定的置信度。例如 z 取 1.96,就有 95% 的置信度。
Wilson区间的含义就是,就是指在一定置信度下,真实的 CTR 范围是多少
"""
def wilson_ctr(num_pv, num_click):
num_pv = float(num_pv)
num_click = float(num_click)
if num_pv * num_click == 0 or num_pv < num_click:
return 0.0
z = 1.96;
n = num_pv;
p = num_click / num_pv;
score = (p + z * z / (2 * n) - z * math.sqrt((p * (1.0 - p) + z * z / (4.0 * n)) / n)) / (1.0 + z * z / n);
return float(score);
def addUserFeatures(samples, dataVocab, multiVocab):
dataVocab["userid"] = collectColumnToVocab(samples, "userid")
dataVocab["user_os"] = ["ios","android","-1"]
dataVocab["user_city_id"] = collectColumnToVocab(samples, "user_city_id")
dataVocab["user_os"] = ["ios", "android"]
extractTagsUdf = F.udf(extractTags, ArrayType(StringType()))
arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType()))
samples = samples.withColumnRenamed("cl_id","userid")
ctrUdf = F.udf(wilson_ctr, FloatType())
print("user历史数据处理...")
# user历史记录
samples = samples\
.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('itemid')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
.withColumn("userPositiveHistory", arrayReverseUdf(F.col("userPositiveHistory")))
for i in range(1,11):
samples = samples.withColumn("userRatedHistory"+str(i), F.when(F.col("userPositiveHistory")[i-1].isNotNull(),F.col("userPositiveHistory")[i-1]).otherwise("-1"))
dataVocab["userRatedHistory"+str(i)] = dataVocab["itemid"]
samples = samples.drop("userPositiveHistory")
samples = samples.withColumn('userPositiveHistory', F.collect_list(
when(F.col('label') == 1, F.col('item_id')).otherwise(F.lit(None))).over(
sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)))
# user历史点击分值统计
print("统计特征处理...")
samples = samples\
.withColumn('userRatingCount',F.count(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))) \
.withColumn("userRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingStddev", F.format_number(F.stddev(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.filter(F.col("userRatingCount") > 1)
samples = samples.withColumn("userPositiveHistory", arrayReverseUdf(F.col("userPositiveHistory")))
for i in range(1, 11):
samples = samples.withColumn("userRatedHistory" + str(i),
F.when(F.col("userPositiveHistory")[i - 1].isNotNull(),
F.col("userPositiveHistory")[i - 1]).otherwise("-1"))
dataVocab["userRatedHistory" + str(i)] = dataVocab["item_id"]
samples = samples.drop("userPositiveHistory")
# user偏好
for c,v in USER_MULTI_COLUMN_EXTRA_MAP.items():
print("user 偏好数据")
for c, v in multiVocab.items():
new_col = "user" + "__" + c
samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
for i in range(1, v+1):
samples = samples.withColumn(new_col + "__" + str(i),F.when(F.col(new_col)[i - 1].isNotNull(), F.col(new_col)[i - 1]).otherwise("-1"))
dataVocab[new_col + "__" + str(i)] = multiVocab[c]
samples = samples.drop(new_col)
# .drop(c).drop(new_col)
print("连续特征处理...")
pipelineStage = []
# Normalization
# for c in ["userRatingAvg", "userRatingStddev"]:
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c + "Scale"))
# bucketing
bucketColumns = ["userRatingCount","userRatingAvg","userRatingStddev"]
for c in bucketColumns:
pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket"))
featurePipeline = Pipeline(stages=pipelineStage)
samples = featurePipeline.fit(samples).transform(samples)
# 转string
for c in bucketColumns:
samples = samples.withColumn(c + "Bucket", F.col(c + "Bucket").cast("string")).drop(c)
samples = samples.withColumn(new_col, extractTagsUdf(
F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(
sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
for i in range(1, 6):
samples = samples.withColumn(new_col + "__" + str(i),
F.when(F.col(new_col)[i - 1].isNotNull(), F.col(new_col)[i - 1]).otherwise(
"-1"))
dataVocab[new_col + "__" + str(i)] = v
samples = samples.drop(new_col).drop(c)
print("user统计特征处理...")
samples = samples \
.withColumn('userRatingCount', F.format_number(
F.sum(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingAvg", F.format_number(
F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingStddev", F.format_number(
F.stddev(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),
NUMBER_PRECISION).cast("float")) \
.withColumn("userClickCount", F.format_number(
F.sum(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(0))).over(
sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
"float")) \
.withColumn("userExpCount", F.format_number(F.sum(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(0))).over(
sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1)), NUMBER_PRECISION).cast(
"float")) \
.withColumn("userCtr",
F.format_number(ctrUdf(F.col("userClickCount"), F.col("userExpCount")), NUMBER_PRECISION)) \
.filter(F.col("userRatingCount") > 1)
dataVocab[c + "Bucket"] = [str(float(i)) for i in range(11)]
samples.show(10, truncate=False)
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ["userRatingCount", "userRatingAvg", "userClickCount", "userExpCount"]:
new_col = col + bucket_suffix
samples = samples.withColumn(new_col, numberToBucketUdf(F.col(col))) \
.drop(col) \
.withColumn(new_col, F.when(F.col(new_col).isNull(), "0").otherwise(F.col(new_col)))
dataVocab[new_col] = bucket_vocab
# 方差处理
number_suffix = "_number"
for col in ["userRatingStddev"]:
new_col = col + number_suffix
samples = samples.withColumn(new_col, F.when(F.col(col).isNull(), 0).otherwise(1 / (F.col(col) + 1))).drop(col)
for col in ["userCtr"]:
new_col = col + number_suffix
samples = samples.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col))).withColumnRenamed(col,
new_col)
samples.printSchema()
# samples.show(5,truncate=False)
samples.show(10, truncate=False)
return samples
def addSampleLabel(ratingSamples):
ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 5, 1).otherwise(0))
ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 1, 1).otherwise(0))
# ratingSamples = ratingSamples.withColumn('label', when(F.col('rating') >= 5, 1).otherwise(0))
ratingSamples.show(5, truncate=False)
ratingSamples.printSchema()
return ratingSamples
def samplesNegAndUnion(samplesPos,samplesNeg):
def samplesNegAndUnion(samplesPos, samplesNeg):
# 正负样本 1:4
pos_count = samplesPos.count()
neg_count = samplesNeg.count()
......@@ -241,28 +375,30 @@ def samplesNegAndUnion(samplesPos,samplesNeg):
print("dataSize:{}".format(str(dataSize)))
return samples
def splitAndSaveTrainingTestSamplesByTimeStamp(samples,splitTimestamp, file_path):
def splitAndSaveTrainingTestSamplesByTimeStamp(samples, splitTimestamp, file_path):
samples = samples.withColumn("timestampLong", F.col("timestamp").cast(LongType()))
# quantile = smallSamples.stat.approxQuantile("timestampLong", [0.8], 0.05)
# splitTimestamp = quantile[0]
train = samples.where(F.col("timestampLong") <= splitTimestamp).drop("timestampLong")
test = samples.where(F.col("timestampLong") > splitTimestamp).drop("timestampLong")
print("split train size:{},test size:{}".format(str(train.count()),str(test.count())))
print("split train size:{},test size:{}".format(str(train.count()), str(test.count())))
trainingSavePath = file_path + '_train'
testSavePath = file_path + '_test'
train.write.option("header", "true").option("delimiter", "|").mode('overwrite').csv(trainingSavePath)
test.write.option("header", "true").option("delimiter", "|").mode('overwrite').csv(testSavePath)
def collectColumnToVocab(samples,column):
def collectColumnToVocab(samples, column):
datas = samples.select(column).distinct().collect()
vocabSet = set()
for d in datas:
if d[column]:
vocabSet.add(str(d[column]))
vocabSet.add("-1") # 空值的默认
return list(vocabSet)
def collectMutiColumnToVocab(samples,column):
def collectMutiColumnToVocab(samples, column):
datas = samples.select(column).distinct().collect()
tagSet = set()
for d in datas:
......@@ -273,72 +409,30 @@ def collectMutiColumnToVocab(samples,column):
tagSet.add("-1") # 空值默认
return list(tagSet)
def getDataVocab(samples,model_columns):
dataVocab = {}
multiVocab = {}
# 多值特征
for c in ITEM_MULTI_COLUMN_EXTRA_MAP.keys():
print(c)
multiVocab[c] = collectMutiColumnToVocab(samples,c)
samples = samples.drop(c)
# id类特征 和 类别特征
for c in ["userid"]:
print(c)
dataVocab[c] = collectColumnToVocab(samples,c)
for c in model_columns:
# 判断是否以Bucket结尾
if c.endswith("Bucket"):
datas = samples.select(c).distinct().collect()
vocabSet = set()
for d in datas:
if d[c]:
vocabSet.add(str(d[c]))
vocabSet.add("-1")# 空值的默认
dataVocab[c] = list(vocabSet)
# elif c.count("userRatedHistory") > 0:
# dataVocab[c] = dataVocab["itemid"]
else:
# 判断是否多值离散列
for cc, v in multiVocab.items():
if c.count(cc) > 0:
dataVocab[c] = v
return dataVocab
def dataVocabToRedis(dataVocab):
conn = getRedisConn()
conn.set(FEATURE_VOCAB_KEY,dataVocab)
conn.expire(FEATURE_VOCAB_KEY,60 * 60 * 24 * 7)
conn.set(FEATURE_VOCAB_KEY, dataVocab)
conn.expire(FEATURE_VOCAB_KEY, 60 * 60 * 24 * 7)
def featureColumnsToRedis(columns):
conn = getRedisConn()
conn.set(FEATURE_COLUMN_KEY, json.dumps(columns))
conn.expire(FEATURE_COLUMN_KEY, 60 * 60 * 24 * 7)
def featureToRedis(key,datas):
def featureToRedis(key, datas):
conn = getRedisConn()
for k,v in datas.items():
newKey = key+k
conn.set(newKey,v)
for k, v in datas.items():
newKey = key + k
conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7)
def collectFeaturesToDict(samples,columns,prefix):
idCol = prefix+"id"
timestampCol = idCol+"_timestamp"
#根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
resDatas = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
resDatas = resDatas.select(*columns).distinct().collect()
print(prefix,len(resDatas))
return {d[idCol]:json.dumps(d.asDict(),ensure_ascii=False) for d in resDatas}
def featuresToRedis(samples,columns,prefix,redisKey):
idCol = prefix+"id"
timestampCol = idCol+"_timestamp"
def userFeaturesToRedis(samples, columns, prefix, redisKey):
idCol = prefix + "id"
timestampCol = idCol + "_timestamp"
def toRedis(datas):
conn = getRedisConn()
......@@ -349,19 +443,34 @@ def featuresToRedis(samples,columns,prefix,redisKey):
conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7)
#根据timestamp获取每个user最新的记录
# 根据timestamp获取每个user最新的记录
prefixSamples = samples.groupBy(idCol).agg(F.max("timestamp").alias(timestampCol))
resDatas = samples.join(prefixSamples, on=[idCol], how='left').where(F.col("timestamp") == F.col(timestampCol))
resDatas = prefixSamples.join(samples, on=[idCol], how='inner').where(F.col("timestamp") == F.col(timestampCol))
resDatas = resDatas.select(*columns).distinct()
resDatas.show(10, truncate=False)
print(prefix, resDatas.count())
resDatas.repartition(8).foreachPartition(toRedis)
def itemFeaturesToRedis(itemStaticDF, redisKey):
idCol = "item_id"
def toRedis(datas):
conn = getRedisConn()
for d in datas:
k = d[idCol]
v = json.dumps(d.asDict(), ensure_ascii=False)
newKey = redisKey + k
conn.set(newKey, v)
conn.expire(newKey, 60 * 60 * 24 * 7)
itemStaticDF.repartition(8).foreachPartition(toRedis)
"""
数据加载
"""
CONTENT_TYPE = "service"
SERVICE_HOSTS = [
{'host': "172.16.52.33", 'port': 9200},
......@@ -375,21 +484,24 @@ ES_INDEX_TEST = "gm_test-service-read"
ACTION_REG = r"""^\\d+$"""
def getEsConn_test():
host_config = [{'host': '172.18.52.14', 'port': 9200}, {'host': '172.18.52.133', 'port': 9200},
{'host': '172.18.52.7', 'port': 9200}]
return Elasticsearch(host_config, http_auth=('elastic', 'gm_test'), timeout=3600)
def getEsConn():
return Elasticsearch(SERVICE_HOSTS, http_auth=('elastic', 'gengmei!@#'), timeout=3600)
def getClickSql(start, end):
sql = """
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.card_id,t1.time_stamp,t1.page_stay,t1.cl_type as os
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.card_id,t1.time_stamp,t1.page_stay,t1.cl_type as os,t1.city_id as user_city_id
FROM
(
select partition_date,cl_id,business_id as card_id,time_stamp,page_stay,cl_type
select partition_date,city_id,cl_id,business_id as card_id,time_stamp,page_stay,cl_type
from online.bl_hdfs_maidian_updates
where action = 'page_view'
AND partition_date>='{startDay}' and partition_date<='{endDay}'
......@@ -399,7 +511,7 @@ def getClickSql(start, end):
AND cl_id != ''
AND business_id is not null
AND business_id != ''
group by partition_date,cl_id,business_id,time_stamp,page_stay,cl_type
group by partition_date,city_id,cl_id,business_id,time_stamp,page_stay,cl_type
) AS t1
join
( --渠道,新老
......@@ -427,29 +539,31 @@ def getClickSql(start, end):
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
""".format(startDay=start,endDay=end)
""".format(startDay=start, endDay=end)
print(sql)
return sql
def getExposureSql(start, end):
sql = """
SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay,cl_type as os
SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay,cl_type as os,t1.city_id as user_city_id
from
( --新首页卡片曝光
SELECT partition_date,cl_id,card_id,time_stamp,cl_type
SELECT partition_date,city_id,cl_type,cl_id,card_id,max(time_stamp) as time_stamp
FROM online.ml_community_precise_exposure_detail
where partition_date>='{startDay}' and partition_date<='{endDay}'
and action in ('page_precise_exposure','home_choiceness_card_exposure')
and cl_id IS NOT NULL
and card_id IS NOT NULL
and is_exposure='1'
and page_name='home'
and tab_name='精选'
--and page_name='home'
--and tab_name='精选'
--and page_name in ('home','search_result_more')
and ((page_name='home' and tab_name='精选') or (page_name='category' and tab_name = '商品'))
and card_type in ('card','video')
and card_content_type in ('service')
and (get_json_object(exposure_card,'$.in_page_pos') is null or get_json_object(exposure_card,'$.in_page_pos') != 'seckill')
group by partition_date,cl_id,card_id,time_stamp,cl_type
group by partition_date,city_id,cl_type,cl_id,card_id,app_session_id
) t1
join
......@@ -478,10 +592,11 @@ def getExposureSql(start, end):
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
""".format(startDay=start,endDay=end)
""".format(startDay=start, endDay=end)
print(sql)
return sql
def getClickSql2(start, end):
sql = """
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.business_id card_id,t1.time_stamp time_stamp,t1.page_stay as page_stay
......@@ -526,6 +641,7 @@ def getClickSql2(start, end):
print(sql)
return sql
def getExposureSql2(start, end):
sql = """
SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay
......@@ -612,6 +728,7 @@ def getExposureSql2(start, end):
print(sql)
return sql
def connectDoris(spark, table):
return spark.read \
.format("jdbc") \
......@@ -622,6 +739,7 @@ def connectDoris(spark, table):
.option("password", "o5gbA27hXHHm") \
.load()
def get_spark(appName):
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
......@@ -641,14 +759,15 @@ def get_spark(appName):
.getOrCreate())
return spark
def init_es_query():
q = {
"_source": {
"includes":[]
"includes": []
},
"query": {
"bool": {
"must": [],
"must": [{"term": {"is_online": True}}],
"must_not": [],
"should": []
}
......@@ -656,44 +775,55 @@ def init_es_query():
}
return q
def parseSource(_source):
id = str(_source.setdefault("id",-1))
smart_rank2 = _source.setdefault("smart_rank2",0.0)
case_count = _source.setdefault("case_count",0)
service_type = str(_source.setdefault("service_type",-1))
first_demands = ','.join(_source.setdefault("first_demands",[]))
second_demands = ','.join(_source.setdefault("second_demands",[]))
first_solutions = ','.join(_source.setdefault("first_solutions",[]))
second_solutions = ','.join(_source.setdefault("second_solutions",[]))
first_positions = ','.join(_source.setdefault("first_positions",[]))
second_positions = ','.join(_source.setdefault("second_positions",[]))
tags_v3 = ','.join(_source.setdefault("tags_v3",[]))
ordered_user_ids_count = len(_source.setdefault("ordered_user_ids",[]))
lowest_price_arr = _source.setdefault("lowest_price",[])
lowest_price = lowest_price_arr[0].setdefault("price",0.0) if len(lowest_price_arr) > 0 else 0.0
#merchant_id
merchant_id = _source.setdefault("merchant_id","-1")
id = str(_source.setdefault("id", -1))
discount = _source.setdefault("discount", 0)
case_count = _source.setdefault("case_count", 0)
sales_count = _source.setdefault("sales_count", 0)
service_type = str(_source.setdefault("service_type", -1))
second_demands = ','.join(_source.setdefault("second_demands", ["-1"]))
second_solutions = ','.join(_source.setdefault("second_solutions", ["-1"]))
second_positions = ','.join(_source.setdefault("second_positions", ["-1"]))
tags_v3 = ','.join(_source.setdefault("tags_v3", ["-1"]))
# sku
sku_list = _source.setdefault("sku_list", [])
sku_tags_list = []
sku_show_tags_list = []
sku_price_list = []
for sku in sku_list:
sku_tags_list += sku.setdefault("sku_tags", [])
# sku_tags_list += sku.setdefault("sku_tags_id",[])
sku_show_tags_list.append(sku.setdefault("show_project_type_name", ""))
price = sku.setdefault("price", 0.0)
if price > 0:
sku_price_list.append(price)
# sku_tags = ",".join([str(i) for i in sku_tags_list]) if len(sku_tags_list) > 0 else "-1"
# sku_show_tags = ",".join(sku_show_tags_list) if len(sku_show_tags_list) > 0 else "-1"
sku_price = min(sku_price_list) if len(sku_price_list) > 0 else 0.0
# merchant_id
merchant_id = str(_source.setdefault("merchant_id", "-1"))
# doctor_type id famous_doctor
doctor = _source.setdefault("doctor",{})
doctor_type = doctor.setdefault("doctor_type","-1")
doctor_id = doctor.setdefault("id","-1")
doctor_famous = str(int(doctor.setdefault("famous_doctor",False)))
doctor = _source.setdefault("doctor", {})
doctor_type = str(doctor.setdefault("doctor_type", "-1"))
doctor_id = str(doctor.setdefault("id", "-1"))
doctor_famous = str(int(doctor.setdefault("famous_doctor", False)))
# hospital id city_tag_id hospital_type is_high_quality
hospital = doctor.setdefault("hospital", {})
hospital_id = hospital.setdefault("id", "-1")
hospital_id = str(hospital.setdefault("id", "-1"))
hospital_city_tag_id = str(hospital.setdefault("city_tag_id", -1))
hospital_type = hospital.setdefault("hospital_type", "-1")
hospital_type = str(hospital.setdefault("hospital_type", "-1"))
hospital_is_high_quality = str(int(hospital.setdefault("is_high_quality", False)))
data = [id,
lowest_price,
smart_rank2,
discount,
case_count,
sales_count,
service_type,
ordered_user_ids_count,
merchant_id,
doctor_type,
doctor_id,
......@@ -702,20 +832,21 @@ def parseSource(_source):
hospital_city_tag_id,
hospital_type,
hospital_is_high_quality,
first_demands,
second_demands,
first_solutions,
second_solutions,
first_positions,
second_positions,
tags_v3
tags_v3,
# sku_show_tags,
sku_price
]
return data
# es中获取特征
def get_service_feature_df(spark):
es_columns = ["id", "lowest_price", "smart_rank2", "doctor", "case_count", "service_type", "first_demands", "second_demands", "first_solutions", "second_solutions", "first_positions", "second_positions", "tags_v3","ordered_user_ids"]
def get_service_feature_df():
es_columns = ["id", "discount", "sales_count", "doctor", "case_count", "service_type", "merchant_id",
"second_demands", "second_solutions", "second_positions", "sku_list", "tags_v3"]
query = init_es_query()
query["_source"]["includes"] = es_columns
print(json.dumps(query), flush=True)
......@@ -727,27 +858,16 @@ def get_service_feature_df(spark):
_source = res['_source']
data = parseSource(_source)
datas.append(data)
print("item size:",len(datas))
print("item size:", len(datas))
dataRDD = spark.sparkContext.parallelize(datas)
itemColumns = ['id', 'lowest_price', 'smart_rank2', 'case_count', 'service_type', 'ordered_user_ids_count','merchant_id',
itemColumns = ['id', 'discount', 'case_count', 'sales_count', 'service_type', 'merchant_id',
'doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type',
'hospital_is_high_quality', 'first_demands', 'second_demands', 'first_solutions',
'second_solutions', 'first_positions', 'second_positions', 'tags_v3']
df = dataRDD.toDF(schema=itemColumns)
'hospital_is_high_quality', 'second_demands', 'second_solutions', 'second_positions',
'tags_v3', 'sku_price']
# 'sku_tags','sku_show_tags','sku_price']
df = pd.DataFrame(datas, columns=itemColumns)
return df
# mysql中获取用户画像
def get_user_portrait(spark):
return spark.read \
.format("jdbc") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("url", "jdbc:mysql://172.16.50.175:3306/doris_olap") \
.option("dbtable", "user_tag3_portrait") \
.option("user", "doris") \
.option("password", "o5gbA27hXHHm") \
.load()
def addDays(n, format="%Y%m%d"):
return (date.today() + timedelta(days=n)).strftime(format)
......@@ -756,43 +876,55 @@ def addDays(n, format="%Y%m%d"):
if __name__ == '__main__':
start = time.time()
#入参
# 入参
trainDays = int(sys.argv[1])
print('trainDays:{}'.format(trainDays),flush=True)
print('trainDays:{}'.format(trainDays), flush=True)
endDay = addDays(0)
startDay = addDays(-int(trainDays))
print("train_data start:{} end:{}".format(startDay,endDay))
print("train_data start:{} end:{}".format(startDay, endDay))
spark = get_spark("service_feature_csv_export")
spark.sparkContext.setLogLevel("ERROR")
itemDF = get_service_feature_df()
print(itemDF.columns)
print(itemDF.head(10))
# 行为数据
clickSql = getClickSql(startDay,endDay)
expSql = getExposureSql(startDay,endDay)
clickSql = getClickSql(startDay, endDay)
expSql = getExposureSql(startDay, endDay)
clickDF = spark.sql(clickSql)
expDF = spark.sql(expSql)
# ratingDF = samplesNegAndUnion(clickDF,expDF)
ratingDF = clickDF.union(expDF)
ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\
.withColumnRenamed("device_id", "userid")\
.withColumnRenamed("card_id", "itemid")\
.withColumnRenamed("page_stay", "rating")\
.withColumnRenamed("os", "user_os")
ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp") \
.withColumnRenamed("device_id", "userid") \
.withColumnRenamed("card_id", "item_id") \
.withColumnRenamed("page_stay", "rating") \
.withColumnRenamed("os", "user_os") \
.withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id"))) \
.withColumn("timestamp", F.col("timestamp").cast("long"))
print(ratingDF.columns)
print(ratingDF.show(10, truncate=False))
itemDF = get_service_feature_df(spark)
print(itemDF.columns)
print(itemDF.show(10, truncate=False))
print("添加label...")
ratingSamplesWithLabel = addSampleLabel(ratingDF)
posCount = ratingSamplesWithLabel.filter(F.col("label")==1).count()
negCount = ratingSamplesWithLabel.filter(F.col("label")==0).count()
print("pos size:"+str(posCount),"neg size:"+str(negCount))
df = ratingSamplesWithLabel.toPandas()
df = pd.DataFrame(df)
posCount = df.loc[df["label"] == 1]["label"].count()
negCount = df.loc[df["label"] == 0]["label"].count()
print("pos size:" + str(posCount), "neg size:" + str(negCount))
itemDF = get_service_feature_df()
print(itemDF.columns)
print(itemDF.head(10))
# itemDF.to_csv("/tmp/service_{}.csv".format(endDay))
# df.to_csv("/tmp/service_train_{}.csv".format(endDay))
# 数据字典
dataVocab = {}
......@@ -800,22 +932,39 @@ if __name__ == '__main__':
print("处理item特征...")
timestmp1 = int(round(time.time()))
samplesWithItemFeatures = addItemFeatures(ratingSamplesWithLabel, itemDF, dataVocab,multiVocab)
itemDF = addItemFeatures(itemDF, dataVocab, multiVocab)
timestmp2 = int(round(time.time()))
print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
print("multiVocab:")
print(multiVocab.keys())
for k, v in multiVocab.items():
print(k, len(v))
print("dataVocab:")
for k, v in dataVocab.items():
print(k, len(v))
itemDF_spark = spark.createDataFrame(itemDF)
itemDF_spark.printSchema()
itemDF_spark.show(10, truncate=False)
# item统计特征处理
itemStaticDF = addItemStaticFeatures(ratingSamplesWithLabel, itemDF_spark, dataVocab)
# 统计数据处理
# ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
samples = ratingSamplesWithLabel.join(itemStaticDF, on=['item_id'], how='inner')
print("处理user特征...")
samplesWithUserFeatures = addUserFeatures(samplesWithItemFeatures,dataVocab,multiVocab)
samplesWithUserFeatures = addUserFeatures(samples, dataVocab, multiVocab)
timestmp3 = int(round(time.time()))
print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2))
#
# user columns
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
print("collect feature for user:{}".format(str(user_columns)))
# item columns
item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
item_columns = [c for c in itemStaticDF.columns if c.startswith("item")]
print("collect feature for item:{}".format(str(item_columns)))
# model columns
print("model columns to redis...")
......@@ -828,33 +977,27 @@ if __name__ == '__main__':
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
# item特征数据存入redis
itemFeaturesToRedis(itemStaticDF, FEATURE_ITEM_KEY)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp3))
"""特征数据存入redis======================================"""
# user特征数据存入redis
featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp3))
# userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
# featureToRedis(FEATURE_USER_KEY, userDatas)
# itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
# featureToRedis(FEATURE_ITEM_KEY, itemDatas)
# item特征数据存入redis
# todo 添加最近一个月有行为的item,待优化:扩大item范围
featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp6))
"""训练数据保存 ======================================"""
timestmp3 = int(round(time.time()))
train_columns = model_columns + ["label", "timestamp", "rating"]
trainSamples = samplesWithUserFeatures.select(*train_columns)
print("write to hdfs start...")
splitTimestamp = int(time.mktime(time.strptime(addDays(0), "%Y%m%d")))
splitAndSaveTrainingTestSamplesByTimeStamp(trainSamples, splitTimestamp, TRAIN_FILE_PATH)
print("write to hdfs success...")
train_df = trainSamples.toPandas()
train_df = pd.DataFrame(train_df)
train_df.to_csv(DATA_PATH_TRAIN, sep="|")
timestmp4 = int(round(time.time()))
print("数据写入hdfs 耗时s:{}".format(timestmp4 - timestmp3))
print("训练数据写入success 耗时s:{}".format(timestmp4 - timestmp3))
print("总耗时m:{}".format((timestmp4 - start)/60))
print("总耗时m:{}".format((timestmp4 - start) / 60))
spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment