Commit 12327a44 authored by 郭羽's avatar 郭羽

embedding redis.close 注释

parent b02c8710
......@@ -705,6 +705,7 @@ def parseSource(_source):
second_demands = ','.join(_source.setdefault("second_demands",["-1"]))
second_solutions = ','.join(_source.setdefault("second_solutions",["-1"]))
second_positions = ','.join(_source.setdefault("second_positions",["-1"]))
tags_v3 = ','.join(_source.setdefault("tags_v3", ["-1"]))
# sku
sku_list = _source.setdefault("sku_list",[])
......@@ -719,7 +720,7 @@ def parseSource(_source):
if price > 0:
sku_price_list.append(price)
sku_tags = ",".join([str(i) for i in sku_tags_list]) if len(sku_tags_list) > 0 else "-1"
# sku_tags = ",".join([str(i) for i in sku_tags_list]) if len(sku_tags_list) > 0 else "-1"
# sku_show_tags = ",".join(sku_show_tags_list) if len(sku_show_tags_list) > 0 else "-1"
sku_price = min(sku_price_list) if len(sku_price_list) > 0 else 0.0
......@@ -754,7 +755,7 @@ def parseSource(_source):
second_demands,
second_solutions,
second_positions,
sku_tags,
tags_v3,
# sku_show_tags,
sku_price
]
......@@ -763,7 +764,7 @@ def parseSource(_source):
# es中获取特征
def get_service_feature_df():
es_columns = ["id","discount", "sales_count", "doctor", "case_count", "service_type","merchant_id","second_demands", "second_solutions", "second_positions", "sku_list"]
es_columns = ["id","discount", "sales_count", "doctor", "case_count", "service_type","merchant_id","second_demands", "second_solutions", "second_positions", "sku_list","tags_v3"]
query = init_es_query()
query["_source"]["includes"] = es_columns
print(json.dumps(query), flush=True)
......@@ -803,113 +804,118 @@ if __name__ == '__main__':
spark = get_spark("service_feature_csv_export")
spark.sparkContext.setLogLevel("ERROR")
# 行为数据
clickSql = getClickSql(startDay,endDay)
expSql = getExposureSql(startDay,endDay)
clickDF = spark.sql(clickSql)
expDF = spark.sql(expSql)
# ratingDF = samplesNegAndUnion(clickDF,expDF)
ratingDF = clickDF.union(expDF)
ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\
.withColumnRenamed("device_id", "userid")\
.withColumnRenamed("card_id", "item_id")\
.withColumnRenamed("page_stay", "rating")\
.withColumnRenamed("os", "user_os")\
.withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\
.withColumn("timestamp",F.col("timestamp").cast("long"))
print(ratingDF.columns)
print(ratingDF.show(10, truncate=False))
print("添加label...")
ratingSamplesWithLabel = addSampleLabel(ratingDF)
df = ratingSamplesWithLabel.toPandas()
df = pd.DataFrame(df)
posCount = df.loc[df["label"]==0]["label"].count()
negCount = df.loc[df["label"]==1]["label"].count()
print("pos size:"+str(posCount),"neg size:"+str(negCount))
itemDF = get_service_feature_df()
print(itemDF.columns)
print(itemDF.head(10))
# itemDF.to_csv("/tmp/service_{}.csv".format(endDay))
# df.to_csv("/tmp/service_train_{}.csv".format(endDay))
# 数据字典
dataVocab = {}
multiVocab = {}
print("处理item特征...")
timestmp1 = int(round(time.time()))
itemDF = addItemFeatures(itemDF, dataVocab,multiVocab)
timestmp2 = int(round(time.time()))
print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
print("multiVocab:")
for k,v in multiVocab.items():
print(k,len(v))
print("dataVocab:")
for k, v in dataVocab.items():
print(k, len(v))
itemDF_spark = spark.createDataFrame(itemDF)
itemDF_spark.printSchema()
itemDF_spark.show(10, truncate=False)
# item统计特征处理
itemStaticDF = addItemStaticFeatures(ratingSamplesWithLabel,itemDF_spark,dataVocab)
# 统计数据处理
# ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
samples = ratingSamplesWithLabel.join(itemStaticDF, on=['item_id'], how='inner')
print("处理user特征...")
samplesWithUserFeatures = addUserFeatures(samples,dataVocab,multiVocab)
timestmp3 = int(round(time.time()))
print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2))
# # 行为数据
# clickSql = getClickSql(startDay,endDay)
# expSql = getExposureSql(startDay,endDay)
#
# clickDF = spark.sql(clickSql)
# expDF = spark.sql(expSql)
# # ratingDF = samplesNegAndUnion(clickDF,expDF)
# ratingDF = clickDF.union(expDF)
# ratingDF = ratingDF.withColumnRenamed("time_stamp", "timestamp")\
# .withColumnRenamed("device_id", "userid")\
# .withColumnRenamed("card_id", "item_id")\
# .withColumnRenamed("page_stay", "rating")\
# .withColumnRenamed("os", "user_os")\
# .withColumn("user_city_id", F.when(F.col("user_city_id").isNull(), "-1").otherwise(F.col("user_city_id")))\
# .withColumn("timestamp",F.col("timestamp").cast("long"))
#
# print(ratingDF.columns)
# print(ratingDF.show(10, truncate=False))
#
# print("添加label...")
# ratingSamplesWithLabel = addSampleLabel(ratingDF)
# df = ratingSamplesWithLabel.toPandas()
# df = pd.DataFrame(df)
#
# posCount = df.loc[df["label"]==0]["label"].count()
# negCount = df.loc[df["label"]==1]["label"].count()
# print("pos size:"+str(posCount),"neg size:"+str(negCount))
#
# itemDF = get_service_feature_df()
# print(itemDF.columns)
# print(itemDF.head(10))
# # itemDF.to_csv("/tmp/service_{}.csv".format(endDay))
# # df.to_csv("/tmp/service_train_{}.csv".format(endDay))
#
# # 数据字典
# dataVocab = {}
# multiVocab = {}
#
# print("处理item特征...")
# timestmp1 = int(round(time.time()))
# itemDF = addItemFeatures(itemDF, dataVocab,multiVocab)
# timestmp2 = int(round(time.time()))
# print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
# print("multiVocab:")
# for k,v in multiVocab.items():
# print(k,len(v))
#
# print("dataVocab:")
# for k, v in dataVocab.items():
# print(k, len(v))
#
#
# itemDF_spark = spark.createDataFrame(itemDF)
# itemDF_spark.printSchema()
# itemDF_spark.show(10, truncate=False)
#
# # item统计特征处理
# itemStaticDF = addItemStaticFeatures(ratingSamplesWithLabel,itemDF_spark,dataVocab)
#
# # 统计数据处理
# # ratingSamplesWithLabel = addStaticsFeatures(ratingSamplesWithLabel,dataVocab)
#
# samples = ratingSamplesWithLabel.join(itemStaticDF, on=['item_id'], how='inner')
#
# print("处理user特征...")
# samplesWithUserFeatures = addUserFeatures(samples,dataVocab,multiVocab)
# timestmp3 = int(round(time.time()))
# print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2))
# #
# # user columns
# user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
# print("collect feature for user:{}".format(str(user_columns)))
# # item columns
# item_columns = [c for c in itemStaticDF.columns if c.startswith("item")]
# print("collect feature for item:{}".format(str(item_columns)))
# # model columns
# print("model columns to redis...")
# model_columns = user_columns + item_columns
# featureColumnsToRedis(model_columns)
#
# print("数据字典save...")
# print("dataVocab:", str(dataVocab.keys()))
# vocab_path = "../vocab/{}_vocab.json".format(VERSION)
# dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
# open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
#
# # item特征数据存入redis
# itemFeaturesToRedis(itemStaticDF, FEATURE_ITEM_KEY)
# timestmp6 = int(round(time.time()))
# print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp3))
#
# """特征数据存入redis======================================"""
# # user特征数据存入redis
# userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
# timestmp5 = int(round(time.time()))
# print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp6))
#
# """训练数据保存 ======================================"""
# timestmp3 = int(round(time.time()))
# train_columns = model_columns + ["label", "timestamp", "rating"]
# trainSamples = samplesWithUserFeatures.select(*train_columns)
# train_df = trainSamples.toPandas()
# train_df = pd.DataFrame(train_df)
# train_df.to_csv(DATA_PATH_TRAIN,sep="|")
# timestmp4 = int(round(time.time()))
# print("训练数据写入success 耗时s:{}".format(timestmp4 - timestmp3))
#
# print("总耗时m:{}".format((timestmp4 - start)/60))
#
# user columns
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
print("collect feature for user:{}".format(str(user_columns)))
# item columns
item_columns = [c for c in itemStaticDF.columns if c.startswith("item")]
print("collect feature for item:{}".format(str(item_columns)))
# model columns
print("model columns to redis...")
model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns)
print("数据字典save...")
print("dataVocab:", str(dataVocab.keys()))
vocab_path = "../vocab/{}_vocab.json".format(VERSION)
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
# item特征数据存入redis
itemFeaturesToRedis(itemStaticDF, FEATURE_ITEM_KEY)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp3))
"""特征数据存入redis======================================"""
# user特征数据存入redis
userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp6))
"""训练数据保存 ======================================"""
timestmp3 = int(round(time.time()))
train_columns = model_columns + ["label", "timestamp", "rating"]
trainSamples = samplesWithUserFeatures.select(*train_columns)
train_df = trainSamples.toPandas()
train_df = pd.DataFrame(train_df)
train_df.to_csv(DATA_PATH_TRAIN,sep="|")
timestmp4 = int(round(time.time()))
print("训练数据写入success 耗时s:{}".format(timestmp4 - timestmp3))
print("总耗时m:{}".format((timestmp4 - start)/60))
spark.stop()
\ No newline at end of file
# spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment