Commit 1d6d767c authored by 张彦钊's avatar 张彦钊

按照雅喆的要求,修改特征工程文件,增加用户、城市、日记三个字段

parent 8d1e7234
......@@ -192,7 +192,8 @@ def feature_engineer():
"u.channel,c.top,cut.time,dl.app_list,feat.level3_ids,doctor.hospital_id," \
"wiki.tag as tag1,question.tag as tag2,search.tag as tag3,budan.tag as tag4," \
"ot.tag as tag5,sixin.tag as tag6,cart.tag as tag7,doris.search_tag2,doris.search_tag3," \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time " \
"k.treatment_method,k.price_min,k.price_max,k.treatment_time,k.maintain_time,k.recover_time," \
"e.device_id,e.cid_id " \
"from jerry_test.esmm_train_data_dwell e left join jerry_test.user_feature u on e.device_id = u.device_id " \
"left join jerry_test.cid_type_top c on e.device_id = c.device_id " \
"left join jerry_test.cid_time_cut cut on e.cid_id = cut.cid " \
......@@ -223,7 +224,7 @@ def feature_engineer():
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time", "search_tag2", "search_tag3")\
"maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\
.rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
......@@ -234,7 +235,7 @@ def feature_engineer():
value_map.get(x[20], 9), value_map.get(x[21], 10),
value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13),
value_map.get(x[25], 14), value_map.get(x[26], 15)],
app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map)
app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30]
))
......@@ -244,11 +245,12 @@ def feature_engineer():
train = rdd.map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15]))
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
f = time.time()
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list") \
"tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \
.repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
h = time.time()
print("train tfrecord done")
......@@ -261,11 +263,12 @@ def feature_engineer():
test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15]))
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list") \
"tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \
.repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
print("va tfrecord done")
......@@ -335,30 +338,21 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
print(rdd.count())
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id")
print("native csv")
native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
x[12], x[13], x[14], x[15], x[16], x[17], x[18]))) \
x[12], x[13], x[14], x[15], x[16], x[17], x[18],x[3],x[4],x[5]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list") \
"tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list","city","uid","cid_id") \
.repartition(1).write.format("tfrecords").save(path=path+"native/", mode="overwrite")
print("native tfrecord done")
h = time.time()
print((h-f)/60)
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[0] == 1).map(lambda x: (x[3], x[4], x[5]))) \
.toDF("city", "uid", "cid_id")
print("nearby csv")
nearby_pre.toPandas().to_csv(local_path + "nearby.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 1)
.map(lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11],
x[12], x[13], x[14], x[15], x[16], x[17], x[18]))) \
x[12], x[13], x[14], x[15], x[16], x[17], x[18],x[3],x[4],x[5]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list") \
"tag5_list", "tag6_list", "tag7_list", "ids", "search_tag2_list","search_tag3_list","city","uid","cid_id") \
.repartition(1).write.format("tfrecords").save(path=path + "nearby/", mode="overwrite")
print("nearby tfrecord done")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment