Commit dc6b471e authored by 郭羽's avatar 郭羽

service model 优化

parent 87a8b395
......@@ -14,3 +14,5 @@ if [ -n "${spark_mode}" ]; then
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 8g --executor-memory 2g --executor-cores 1 --num-executors 8 --conf spark.pyspark.python=${python_os} --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
fi
#/opt/spark/bin/spark-submit --master local[4] --deploy-mode client --driver-memory 8g --executor-memory 2g --executor-cores 1 --num-executors 4 --conf spark.pyspark.python=/srv/envs/serviceRec/bin/python --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar ${pythonFile} $day_count
......@@ -73,85 +73,118 @@ def getRedisConn():
# conn = redis.Redis(host="172.18.51.10", port=6379,db=0) #test
return conn
def addItemFeatures(samples,itemDF,dataVocab,multiVocab):
itemDF = itemDF.withColumnRenamed("id", "itemid")
# 数据过滤:无医生
itemDF = itemDF.filter(col("doctor_id") != "-1")
# itemid
vocabList = collectColumnToVocab(itemDF, "itemid")
dataVocab["itemid"] = vocabList
# null处理
for c in ITEM_NUMBER_COLUMNS:
print("null count:",c,itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(ITEM_PREFIX+c,when(col(c).isNull(),0).otherwise(col(c)).cast("float")).drop(c)
for c in ITEM_CATE_COLUMNS:
print("null count:", c, itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(ITEM_PREFIX+c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c))).drop(c)
# 字典添加
dataVocab[ITEM_PREFIX+c] = collectColumnToVocab(itemDF,ITEM_PREFIX+c)
# 离散特征处理
for c, v in ITEM_MULTI_COLUMN_EXTRA_MAP.items():
print("null count:", c, itemDF.filter(col(c).isNull()).count())
itemDF = itemDF.withColumn(c, F.when(F.col(c).isNull(), "-1").otherwise(F.col(c)))
multiVocab[c] = collectMutiColumnToVocab(itemDF, c)
for i in range(1, v + 1):
new_c = ITEM_PREFIX + c + "__" + str(i)
itemDF = itemDF.withColumn(new_c, F.split(F.col(c), ",")[i - 1])
itemDF = itemDF.withColumn(new_c, F.when(F.col(new_c).isNull(), "-1").otherwise(F.col(new_c)))
dataVocab[new_c] = multiVocab[c]
samples = samples.join(itemDF, on=['itemid'], how='inner')
# 统计特征处理
print("统计特征处理...")
staticFeatures = samples.groupBy('itemid').agg(F.count(F.lit(1)).alias('itemRatingCount'),
F.avg(F.col('rating')).alias('itemRatingAvg'),
F.stddev(F.col('rating')).alias('itemRatingStddev')).fillna(0) \
.withColumn('itemRatingStddev', F.format_number(F.col('itemRatingStddev'), NUMBER_PRECISION).cast("float")) \
.withColumn('itemRatingAvg', F.format_number(F.col('itemRatingAvg'), NUMBER_PRECISION).cast("float"))
# join item rating features
samples = samples.join(staticFeatures, on=['itemid'], how='left')
print("连续特征处理...")
# todo 分桶比较耗时,可以考虑做非线性转换
# 连续特征处理
pipelineStage = []
# Normalization
# for c in ["itemRatingAvg","itemRatingStddev"]:
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c+"Scale"))
# bucketing
bucketColumns = [ITEM_PREFIX+"case_count", ITEM_PREFIX+"ordered_user_ids_count", ITEM_PREFIX+"lowest_price", "itemRatingCount", "itemRatingStddev","itemRatingAvg"]
for c in bucketColumns:
pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket"))
def parseTags(tags,i):
tags_arr = tags.split(",")
if len(tags_arr) >= i:
return tags_arr[i-1]
else:
return "-1"
def numberToBucket(num):
res = 0
if num >= 1000:
res = 1000//10
else:
res = int(num)//10
return str(res)
def priceToBucket(num):
res = 0
if num >= 100000:
res = 100000//1000
else:
res = int(num)//1000
return str(res)
numberToBucketUdf = F.udf(numberToBucket, FloatType())
priceToBucketUdf = F.udf(priceToBucket, FloatType())
def addStaticsFeatures(samples,dataVocab):
print("user统计特征处理...")
samples = samples \
.withColumn('userRatingCount',F.format_number(F.count(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))), NUMBER_PRECISION).cast("float") \
.withColumn("userRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingStddev", F.format_number(F.stddev(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("userClickCount", F.format_number(F.count(F.collect_list(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))),NUMBER_PRECISION).cast("float")) \
.withColumn("userExpCount", F.format_number(F.count(F.collect_list(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))),NUMBER_PRECISION).cast("float")) \
.withColumn("userCtr", F.format_number(F.col("userClickCount")/(F.col("userExpCount")+1),NUMBER_PRECISION).cast("float")) \
.filter(F.col("userRatingCount") > 1)
featurePipeline = Pipeline(stages=pipelineStage)
samples = featurePipeline.fit(samples).transform(samples)
print("item统计特征处理...")
samples = samples \
.withColumn('itemRatingCount',F.format_number(F.count(F.lit(1)).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1))), NUMBER_PRECISION).cast("float") \
.withColumn("itemRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION).cast("float")) \
.withColumn("itemRatingStddev", F.format_number(F.stddev(F.col("rating")).over(sql.Window.partitionBy('item_id').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("itemClickCount", F.format_number(F.count(F.collect_list(when(F.col('label') == 1, F.lit(1)).otherwise(F.lit(None))).over(sql.Window.partitionBy("item_id").orderBy(F.col("timestamp")).rowsBetween(-100, -1))),NUMBER_PRECISION).cast("float")) \
.withColumn("itemExpCount", F.format_number(F.count(F.collect_list(when(F.col('label') == 0, F.lit(1)).otherwise(F.lit(None))).over(sql.Window.partitionBy("item_id").orderBy(F.col("timestamp")).rowsBetween(-100, -1))),NUMBER_PRECISION).cast("float")) \
.withColumn("itemCtr", F.format_number(F.col("itemClickCount")/(F.col("itemExpCount")+1),NUMBER_PRECISION).cast("float")) \
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ["userRatingCount","userRatingAvg","userClickCount","userExpCount","itemRatingCount","itemRatingAvg","itemClickCount","itemExpCount"]:
new_col = col + bucket_suffix
samples = samples.withColumn(new_col, numberToBucketUdf(F.col(col))).drop(col)
dataVocab[new_col] = bucket_vocab
# 方差处理
number_suffix = "_number"
for col in ["userRatingStddev","itemRatingStddev"]:
new_col = col + number_suffix
samples = samples.withColumn(new_col,1/(F.col(col)+1)).drop(col)
for col in ["userCtr", "itemCtr"]:
new_col = col + number_suffix
samples = samples.withColumnRenamed(col, new_col)
# 转string
for c in bucketColumns:
samples = samples.withColumn(c + "Bucket",F.col(c + "Bucket").cast("string")).drop(c)
samples.printSchema()
return samples
dataVocab[c + "Bucket"] = [str(float(i)) for i in range(11)]
def addItemFeatures(itemDF,dataVocab,multi_col_vocab):
# multi_col = ['sku_tags', 'sku_show_tags','second_demands', 'second_solutions', 'second_positions']
multi_col = ['tags_v3','second_demands', 'second_solutions', 'second_positions']
onehot_col = ['id','service_type', 'merchant_id','doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type','hospital_is_high_quality']
for col in onehot_col:
new_c = ITEM_PREFIX + col
dataVocab[new_c] = list(set(itemDF[col].tolist()))
samples.printSchema()
# samples.show(5, truncate=False)
for c in multi_col:
multi_col_vocab[c] = list(set(itemDF[c].tolist()))
return samples
for i in range(1, 6):
new_c = ITEM_PREFIX + c + "__" + str(i)
itemDF[new_c] = itemDF[c].map(lambda x:parseTags(x,i))
dataVocab[new_c] = multi_col_vocab[c]
# 连续特征分桶
bucket_vocab = [str(i) for i in range(101)]
bucket_suffix = "_Bucket"
for col in ['case_count', 'sales_count']:
new_col = ITEM_PREFIX + col + bucket_suffix
itemDF = itemDF.withColumn(new_col, numberToBucketUdf(F.col(col))).drop(col)
dataVocab[new_col] = bucket_vocab
for col in ['sku_price']:
new_col = ITEM_PREFIX + col + bucket_suffix
itemDF = itemDF.withColumn(new_col, priceToBucketUdf(F.col(col))).drop(col)
dataVocab[new_col] = bucket_vocab
# 连续数据处理
number_suffix = "_number"
for col in ["discount"]:
new_col = ITEM_PREFIX + col + number_suffix
itemDF = itemDF.withColumnRenamed(col, new_col)
itemDF.show(10, truncate=False)
return itemDF
def extractTags(genres_list):
# 根据点击列表顺序加权
genres_dict = defaultdict(int)
for genres in genres_list:
for i,genres in enumerate(genres_list):
for genre in genres.split(','):
genres_dict[genre] += 1
genres_dict[genre] += i
sortedGenres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
return [x[0] for x in sortedGenres]
......@@ -162,64 +195,32 @@ def arrayReverse(arr):
def addUserFeatures(samples,dataVocab,multiVocab):
dataVocab["userid"] = collectColumnToVocab(samples, "userid")
dataVocab["user_os"] = ["ios","android","-1"]
dataVocab["user_os"] = ["ios","android"]
extractTagsUdf = F.udf(extractTags, ArrayType(StringType()))
arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType()))
samples = samples.withColumnRenamed("cl_id","userid")
# arrayReverseUdf = F.udf(arrayReverse, ArrayType(StringType()))
print("user历史数据处理...")
# user历史记录
samples = samples\
.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('itemid')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
.withColumn("userPositiveHistory", arrayReverseUdf(F.col("userPositiveHistory")))
.withColumn('userPositiveHistory',F.collect_list(when(F.col('label') == 1, F.col('item_id')).otherwise(F.lit(None))).over(sql.Window.partitionBy("userid").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
.withColumn("userPositiveHistory", F.reverse(F.col("userPositiveHistory")))
for i in range(1,11):
samples = samples.withColumn("userRatedHistory"+str(i), F.when(F.col("userPositiveHistory")[i-1].isNotNull(),F.col("userPositiveHistory")[i-1]).otherwise("-1"))
dataVocab["userRatedHistory"+str(i)] = dataVocab["itemid"]
dataVocab["userRatedHistory"+str(i)] = dataVocab["item_id"]
samples = samples.drop("userPositiveHistory")
# user历史点击分值统计
print("统计特征处理...")
samples = samples\
.withColumn('userRatingCount',F.count(F.lit(1)).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))) \
.withColumn("userRatingAvg", F.format_number(F.avg(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.withColumn("userRatingStddev", F.format_number(F.stddev(F.col("rating")).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1)),NUMBER_PRECISION).cast("float")) \
.filter(F.col("userRatingCount") > 1)
# user偏好
for c,v in USER_MULTI_COLUMN_EXTRA_MAP.items():
for c,v in multiVocab.items():
new_col = "user" + "__" + c
samples = samples.withColumn(new_col, extractTagsUdf(F.collect_list(when(F.col('label') == 1, F.col(c)).otherwise(F.lit(None))).over(sql.Window.partitionBy('userid').orderBy('timestamp').rowsBetween(-100, -1))))
for i in range(1, v+1):
for i in range(1, 6):
samples = samples.withColumn(new_col + "__" + str(i),F.when(F.col(new_col)[i - 1].isNotNull(), F.col(new_col)[i - 1]).otherwise("-1"))
dataVocab[new_col + "__" + str(i)] = multiVocab[c]
dataVocab[new_col + "__" + str(i)] = v
samples = samples.drop(new_col)
# .drop(c).drop(new_col)
print("连续特征处理...")
pipelineStage = []
# Normalization
# for c in ["userRatingAvg", "userRatingStddev"]:
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c + "Scale"))
# bucketing
bucketColumns = ["userRatingCount","userRatingAvg","userRatingStddev"]
for c in bucketColumns:
pipelineStage.append(QuantileDiscretizer(numBuckets=10, inputCol=c, outputCol=c + "Bucket"))
featurePipeline = Pipeline(stages=pipelineStage)
samples = featurePipeline.fit(samples).transform(samples)
# 转string
for c in bucketColumns:
samples = samples.withColumn(c + "Bucket", F.col(c + "Bucket").cast("string")).drop(c)
dataVocab[c + "Bucket"] = [str(float(i)) for i in range(11)]
samples.printSchema()
# samples.show(5,truncate=False)
samples.show(10,truncate=False)
return samples
......@@ -255,6 +256,8 @@ def splitAndSaveTrainingTestSamplesByTimeStamp(samples,splitTimestamp, file_path
test.write.option("header", "true").option("delimiter", "|").mode('overwrite').csv(testSavePath)
def collectColumnToVocab(samples,column):
pd.DataFrame()[""].tolist()
list(set(samples[column].tolist()))
datas = samples.select(column).distinct().collect()
vocabSet = set()
for d in datas:
......@@ -300,7 +303,7 @@ def getDataVocab(samples,model_columns):
vocabSet.add("-1")# 空值的默认
dataVocab[c] = list(vocabSet)
# elif c.count("userRatedHistory") > 0:
# dataVocab[c] = dataVocab["itemid"]
# dataVocab[c] = dataVocab["item_id"]
else:
# 判断是否多值离散列
for cc, v in multiVocab.items():
......@@ -660,6 +663,7 @@ def init_es_query():
def parseSource(_source):
id = str(_source.setdefault("id",-1))
discount = _source.setdefault("discount",0)
case_count = _source.setdefault("case_count",0)
sales_count = _source.setdefault("sales_count",0)
service_type = str(_source.setdefault("service_type",-1))
......@@ -678,7 +682,7 @@ def parseSource(_source):
sku_price_list.append(sku.setdefault("price",0.0))
sku_tags = ",".join([str(i) for i in sku_tags_list]) if len(sku_tags_list) > 0 else "-1"
sku_show_tags = ",".join(sku_show_tags_list) if len(sku_show_tags_list) > 0 else "-1"
# sku_show_tags = ",".join(sku_show_tags_list) if len(sku_show_tags_list) > 0 else "-1"
sku_price = min(sku_price_list)
#merchant_id
......@@ -697,6 +701,7 @@ def parseSource(_source):
hospital_is_high_quality = str(int(hospital.setdefault("is_high_quality", False)))
data = [id,
discount,
case_count,
sales_count,
service_type,
......@@ -712,7 +717,7 @@ def parseSource(_source):
second_solutions,
second_positions,
sku_tags,
sku_show_tags,
# sku_show_tags,
sku_price
]
......@@ -720,7 +725,7 @@ def parseSource(_source):
# es中获取特征
def get_service_feature_df():
es_columns = ["id", "sales_count", "doctor", "case_count", "service_type","merchant_id","second_demands", "second_solutions", "second_positions", "sku_list"]
es_columns = ["id","discount", "sales_count", "doctor", "case_count", "service_type","merchant_id","second_demands", "second_solutions", "second_positions", "sku_list"]
query = init_es_query()
query["_source"]["includes"] = es_columns
print(json.dumps(query), flush=True)
......@@ -734,10 +739,11 @@ def get_service_feature_df():
datas.append(data)
print("item size:",len(datas))
itemColumns = ['id', 'case_count', 'sales_count', 'service_type','merchant_id',
itemColumns = ['id','discount', 'case_count', 'sales_count', 'service_type','merchant_id',
'doctor_type', 'doctor_id', 'doctor_famous', 'hospital_id', 'hospital_city_tag_id', 'hospital_type',
'hospital_is_high_quality', 'second_demands','second_solutions', 'second_positions',
'sku_tags','sku_show_tags','sku_price']
'tags_v3','sku_price']
# 'sku_tags','sku_show_tags','sku_price']
df = pd.DataFrame(datas,columns=itemColumns)
return df
......@@ -769,7 +775,7 @@ if __name__ == '__main__':
ratingDF = clickDF.union(expDF)
ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\
.withColumnRenamed("device_id", "userid")\
.withColumnRenamed("card_id", "itemid")\
.withColumnRenamed("card_id", "item_id")\
.withColumnRenamed("page_stay", "rating")\
.withColumnRenamed("os", "user_os")
......@@ -785,44 +791,48 @@ if __name__ == '__main__':
negCount = df.loc[df["label"]==1]["label"].count()
print("pos size:"+str(posCount),"neg size:"+str(negCount))
itemDF = get_service_feature_df(spark)
#统计数据处理
ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel)
itemDF = get_service_feature_df()
print(itemDF.columns)
print(itemDF.show(10, truncate=False))
itemDF.to_csv("/tmp/service_{}.csv".format(endDay))
df.to_csv("/tmp/service_train_{}.csv".format(endDay))
# itemDF.to_csv("/tmp/service_{}.csv".format(endDay))
# df.to_csv("/tmp/service_train_{}.csv".format(endDay))
# 数据字典
dataVocab = {}
multiVocab = {}
# # 数据字典
# dataVocab = {}
# multiVocab = {}
#
# print("处理item特征...")
# timestmp1 = int(round(time.time()))
# samplesWithItemFeatures = addItemFeatures(ratingSamplesWithLabel, itemDF, dataVocab,multiVocab)
# timestmp2 = int(round(time.time()))
# print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
# print("multiVocab:")
# print(multiVocab.keys())
#
# print("处理user特征...")
# samplesWithUserFeatures = addUserFeatures(samplesWithItemFeatures,dataVocab,multiVocab)
# timestmp3 = int(round(time.time()))
# print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2))
#
# # user columns
# user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
# print("collect feature for user:{}".format(str(user_columns)))
# # item columns
# item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
# print("collect feature for item:{}".format(str(item_columns)))
# # model columns
# print("model columns to redis...")
# model_columns = user_columns + item_columns
# featureColumnsToRedis(model_columns)
print("处理item特征...")
timestmp1 = int(round(time.time()))
itemDF = addItemFeatures(itemDF, dataVocab,multiVocab)
timestmp2 = int(round(time.time()))
print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
print("multiVocab:")
print(multiVocab.keys())
samples = ratingSamplesWithLabel.join(itemDF, on=['item_id'], how='inner')
print("处理user特征...")
samplesWithUserFeatures = addUserFeatures(samples,dataVocab,multiVocab)
timestmp3 = int(round(time.time()))
print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2))
#
# print("数据字典save...")
# print("dataVocab:", str(dataVocab.keys()))
# user columns
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
print("collect feature for user:{}".format(str(user_columns)))
# item columns
item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
print("collect feature for item:{}".format(str(item_columns)))
# model columns
print("model columns to redis...")
model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns)
print("数据字典save...")
print("dataVocab:", str(dataVocab.keys()))
# vocab_path = "../vocab/{}_vocab.json".format(VERSION)
# dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
# open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment