Commit f3f46616 authored by 宋柯's avatar 宋柯

模型调试

parent a6862152
import sys import sys
import os import os
from datetime import date, timedelta from datetime import date, timedelta
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan from elasticsearch.helpers import scan
import time import time
import redis import redis
from pyspark import SparkContext, SparkConf from pyspark import SparkContext, SparkConf
...@@ -14,7 +11,6 @@ import pyspark.sql as sql ...@@ -14,7 +11,6 @@ import pyspark.sql as sql
from pyspark.sql.functions import when from pyspark.sql.functions import when
from pyspark.sql.types import * from pyspark.sql.types import *
from pyspark.sql import functions as F from pyspark.sql import functions as F
from collections import defaultdict from collections import defaultdict
import json import json
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))) sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
...@@ -999,63 +995,34 @@ if __name__ == '__main__': ...@@ -999,63 +995,34 @@ if __name__ == '__main__':
# | -- ITEM_NUMERIC_sku_price: double(nullable=true) # | -- ITEM_NUMERIC_sku_price: double(nullable=true)
# #
fields = [field.name for field in samples.schema.fields] fields = [field.name for field in samples.schema.fields]
multi_categoty_fields = []
categoty_fields = []
fields_na_value_dict = {} fields_na_value_dict = {}
for field in fields: for field in fields:
if field.startswith(ITEM_PREFIX + CATEGORY_PREFIX) or field.startswith(USER_PREFIX + CATEGORY_PREFIX): if field.startswith(ITEM_PREFIX + CATEGORY_PREFIX) or field.startswith(USER_PREFIX + CATEGORY_PREFIX):
fields_na_value_dict[field] = '-1' fields_na_value_dict[field] = '-1'
categoty_fields.append(field)
elif field.startswith(ITEM_PREFIX + MULTI_CATEGORY_PREFIX) or field.startswith(USER_PREFIX + MULTI_CATEGORY_PREFIX): elif field.startswith(ITEM_PREFIX + MULTI_CATEGORY_PREFIX) or field.startswith(USER_PREFIX + MULTI_CATEGORY_PREFIX):
fields_na_value_dict[field] = ['-1'] fields_na_value_dict[field] = '-1'
multi_categoty_fields.append(field)
elif field.startswith(ITEM_PREFIX + NUMERIC_PREFIX) or field.startswith(USER_PREFIX + NUMERIC_PREFIX): elif field.startswith(ITEM_PREFIX + NUMERIC_PREFIX) or field.startswith(USER_PREFIX + NUMERIC_PREFIX):
fields_na_value_dict[field] = 0 fields_na_value_dict[field] = 0
samples.na.fill(fields_na_value_dict) samples = samples.na.fill(fields_na_value_dict).coalesce(1)
samples.cache()
samples.printSchema() samples.printSchema()
samples.show(20, False) samples.show(20, False)
sys.exit()
# user columns write_time_start = time.time()
user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")] for categoty_field in categoty_fields:
print("collect feature for user:{}".format(str(user_columns))) output_file = "file:///home/gmuser/" + categoty_field + "_vocab"
# item columns samples.select(categoty_field).where(F.col(categoty_field) != '-1').distinct().write.mode("overwrite").options(header="false").csv(output_file)
item_columns = [c for c in itemStaticDF.columns if c.startswith("item")] for multi_categoty_field in multi_categoty_fields:
print("collect feature for item:{}".format(str(item_columns))) output_file = "file:///home/gmuser/" + multi_categoty_field + "_vocab"
# model columns samples.selectExpr("explode(split({multi_categoty_field},','))".format(multi_categoty_field = multi_categoty_field)).where(F.col(multi_categoty_field) != '-1').distinct().write.mode("overwrite").options(header="false").csv(output_file)
print("model columns to redis...") print("训练数据写入 耗时s:{}".format(time.time() - write_time_start))
model_columns = user_columns + item_columns
featureColumnsToRedis(model_columns) print("总耗时:{} mins".format((time.time() - start)/60))
#特征字典
dataVocab = {}
multiVocab = {}
print("数据字典save...")
print("dataVocab:", str(dataVocab.keys()))
vocab_path = "../vocab/{}_vocab.json".format(VERSION)
dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
# item特征数据存入redis
itemFeaturesToRedis(itemStaticDF, FEATURE_ITEM_KEY)
timestmp6 = int(round(time.time()))
print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp3))
"""特征数据存入redis======================================"""
# user特征数据存入redis
userFeaturesToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
timestmp5 = int(round(time.time()))
print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp6))
"""训练数据保存 ======================================"""
timestmp3 = int(round(time.time()))
train_columns = model_columns + ["label", "timestamp", "rating"]
trainSamples = samplesWithUserFeatures.select(*train_columns)
train_df = trainSamples.toPandas()
train_df = pd.DataFrame(train_df)
train_df.to_csv(DATA_PATH_TRAIN,sep="|")
timestmp4 = int(round(time.time()))
print("训练数据写入success 耗时s:{}".format(timestmp4 - timestmp3))
print("总耗时m:{}".format((timestmp4 - start)/60))
spark.stop() spark.stop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment