Commit ac957eee authored by 张彦钊's avatar 张彦钊

加上if判断rdd为空

parent 7fe1994d
...@@ -214,69 +214,69 @@ def feature_engineer(): ...@@ -214,69 +214,69 @@ def feature_engineer():
"left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \ "left join jerry_test.search_doris doris on e.device_id = doris.device_id and e.stat_date = doris.get_date " \
"where e.stat_date >= '{}'".format(start) "where e.stat_date >= '{}'".format(start)
df = spark.sql(sql) # df = spark.sql(sql)
#
df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer", # df = df.drop_duplicates(["ucity_id", "level2_ids", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids", # "channel", "top", "time", "stat_date", "app_list", "hospital_id", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"]) # "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7"])
#
df = df.na.fill(dict(zip(features, features))) # df = df.na.fill(dict(zip(features, features)))
#
rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids", # rdd = df.select("stat_date", "y", "z", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7", # "tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time", # "ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
"hospital_id", "treatment_method", "price_min", "price_max", "treatment_time", # "hospital_id", "treatment_method", "price_min", "price_max", "treatment_time",
"maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\ # "maintain_time", "recover_time", "search_tag2", "search_tag3","cid_id","device_id")\
.rdd.repartition(200).map( # .rdd.repartition(200).map(
lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map), # lambda x: (x[0], float(x[1]), float(x[2]), app_list_func(x[3], app_list_map), app_list_func(x[4], leve2_map),
app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map), # app_list_func(x[5], leve3_map), app_list_func(x[6], leve2_map), app_list_func(x[7], leve2_map),
app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map), # app_list_func(x[8], leve2_map), app_list_func(x[9], leve2_map), app_list_func(x[10], leve2_map),
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map), # app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map.get(x[0], 1), value_map.get(x[13], 2), value_map.get(x[14], 3), value_map.get(x[15], 4), # [value_map.get(x[0], 1), value_map.get(x[13], 2), value_map.get(x[14], 3), value_map.get(x[15], 4),
value_map.get(x[16], 5), value_map.get(x[17], 6), value_map.get(x[18], 7), value_map.get(x[19], 8), # value_map.get(x[16], 5), value_map.get(x[17], 6), value_map.get(x[18], 7), value_map.get(x[19], 8),
value_map.get(x[20], 9), value_map.get(x[21], 10), # value_map.get(x[20], 9), value_map.get(x[21], 10),
value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13), # value_map.get(x[22], 11), value_map.get(x[23], 12), value_map.get(x[24], 13),
value_map.get(x[25], 14), value_map.get(x[26], 15)], # value_map.get(x[25], 14), value_map.get(x[26], 15)],
app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30] # app_list_func(x[27], leve2_map), app_list_func(x[28], leve3_map),x[13],x[29],x[30]
)) # ))
#
#
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER) # rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
#
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集 # # TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
#
train = rdd.map( # train = rdd.map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], # lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18])) # x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
f = time.time() # f = time.time()
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list", # spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", # "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", # "tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \ # "search_tag2_list","search_tag3_list","city","cid_id","uid") \
.repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite") # .repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
h = time.time() # h = time.time()
print("train tfrecord done") # print("train tfrecord done")
print((h - f) / 60) # print((h - f) / 60)
#
print("训练集样本总量:") # print("训练集样本总量:")
print(rdd.count()) # print(rdd.count())
#
get_pre_number() # get_pre_number()
#
test = rdd.filter(lambda x: x[0] == validate_date).map( # test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], # lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18])) # x[10], x[11], x[12], x[13], x[14], x[15],x[16],x[17],x[18]))
#
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list", # spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", # "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids", # "tag5_list", "tag6_list", "tag7_list", "ids",
"search_tag2_list","search_tag3_list","city","cid_id","uid") \ # "search_tag2_list","search_tag3_list","city","cid_id","uid") \
.repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite") # .repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
#
print("va tfrecord done") # print("va tfrecord done")
print("删除视频特征") # print("删除视频特征")
#
rdd.unpersist() # rdd.unpersist()
return validate_date, value_map, app_list_map, leve2_map, leve3_map return validate_date, value_map, app_list_map, leve2_map, leve3_map
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment