Commit dbdda60b authored by 张彦钊's avatar 张彦钊

change test file

parent 1caff903
...@@ -151,7 +151,7 @@ def feature_engineer(): ...@@ -151,7 +151,7 @@ def feature_engineer():
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=3)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=100)).strftime("%Y-%m-%d")
print(start) print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
...@@ -214,7 +214,11 @@ def feature_engineer(): ...@@ -214,7 +214,11 @@ def feature_engineer():
app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map), app_list_func(x[11], leve2_map), app_list_func(x[12], leve2_map),
[value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]], [value_map[x[0]], value_map[x[13]], value_map[x[14]], value_map[x[15]], value_map[x[16]],
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]], value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]])) value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))\
.zipWithIndex().map(lambda x:(x[0][0],x[0][1],x[0][2],x[0][3],x[0][4],x[0][5],x[0][6],x[0][7],x[0][8],
x[0][9],x[0][10],x[0][11],x[0][12],x[0][13],
x[1]))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER) rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
...@@ -226,8 +230,8 @@ def feature_engineer(): ...@@ -226,8 +230,8 @@ def feature_engineer():
f = time.time() f = time.time()
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list", spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids") \ "tag5_list", "tag6_list", "tag7_list", "ids","number") \
.repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite") .repartition(1).write.format("tfrecords").save(path=path + "test_tr/", mode="overwrite")
h = time.time() h = time.time()
print("train tfrecord done") print("train tfrecord done")
print((h - f) / 60) print((h - f) / 60)
...@@ -237,16 +241,16 @@ def feature_engineer(): ...@@ -237,16 +241,16 @@ def feature_engineer():
get_pre_number() get_pre_number()
test = rdd.filter(lambda x: x[0] == validate_date).map( # test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], # lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13])) # x[10], x[11], x[12], x[13]))
#
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list", # spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", # "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids") \ # "tag5_list", "tag6_list", "tag7_list", "ids","number") \
.repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite") # .repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
#
print("va tfrecord done") # print("va tfrecord done")
rdd.unpersist() rdd.unpersist()
...@@ -273,7 +277,8 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -273,7 +277,8 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"left join jerry_test.order_tag ot on e.device_id = ot.device_id " \ "left join jerry_test.order_tag ot on e.device_id = ot.device_id " \
"left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \ "left join jerry_test.sixin_tag sixin on e.device_id = sixin.device_id " \
"left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \ "left join jerry_test.cart_tag cart on e.device_id = cart.device_id " \
"left join jerry_test.knowledge k on feat.level2 = k.level2_id" "left join jerry_test.knowledge k on feat.level2 = k.level2_id" \
" where e.label = 1 limit 60000"
features = ["ucity_id", "ccity_name", "device_type", "manufacturer", features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "hospital_id", "channel", "top", "time", "hospital_id",
...@@ -302,22 +307,26 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -302,22 +307,26 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
value_map.get(x[25], 11), value_map.get(x[26], 12), value_map.get(x[25], 11), value_map.get(x[26], 12),
value_map.get(x[27], 13), value_map.get(x[28], 14), value_map.get(x[27], 13), value_map.get(x[28], 14),
value_map.get(x[29], 15) value_map.get(x[29], 15)
])) ]))\
.zipWithIndex().map(lambda x:(x[0][0],x[0][1],x[0][2],x[0][3],x[0][4],x[0][5],x[0][6],x[0][7],x[0][8],
x[0][9],x[0][10],x[0][11],x[0][12],x[0][13],x[0][14],x[0][15],x[0][16],
x[1]))
rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER) rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\ # native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id") # .toDF("city","uid","cid_id")
print("native csv") # print("native csv")
native_pre.toPandas().to_csv(local_path+"native.csv", header=True) # native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0) # spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16]))) \ # .map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16]))) \
.toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list", # .toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids").repartition(100).write.format("tfrecords") \ # "tag5_list", "tag6_list", "tag7_list", "ids","number").repartition(100).write.format("tfrecords") \
.save(path=path+"native/", mode="overwrite") # .save(path=path+"native/", mode="overwrite")
print("native tfrecord done") # print("native tfrecord done")
h = time.time() # h = time.time()
print((h-f)/60) # print((h-f)/60)
nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[0] == 1).map(lambda x: (x[3], x[4], x[5]))) \ nearby_pre = spark.createDataFrame(rdd.filter(lambda x: x[0] == 1).map(lambda x: (x[3], x[4], x[5]))) \
.toDF("city", "uid", "cid_id") .toDF("city", "uid", "cid_id")
...@@ -328,8 +337,8 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -328,8 +337,8 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
.map( .map(
lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16]))) \ lambda x: (x[1], x[2], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16]))) \
.toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list", .toDF("y", "z", "app_list", "level2_list", "level3_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids").repartition(100).write.format("tfrecords") \ "tag5_list", "tag6_list", "tag7_list", "ids","number").repartition(100).write.format("tfrecords") \
.save(path=path + "nearby/", mode="overwrite") .save(path=path + "test_nearby/", mode="overwrite")
print("nearby tfrecord done") print("nearby tfrecord done")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment