Commit 3e6ab5c5 authored by 王志伟's avatar 王志伟
parents 7a4644e0 ec4eb794
...@@ -204,7 +204,7 @@ def feature_engineer(): ...@@ -204,7 +204,7 @@ def feature_engineer():
value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]], value_map[x[17]], value_map[x[18]], value_map[x[19]], value_map[x[20]], value_map[x[21]],
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]])) value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))
rdd.persist(storageLevel= StorageLevel.MEMORY_AND_DISK) rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集 # TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
...@@ -215,11 +215,14 @@ def feature_engineer(): ...@@ -215,11 +215,14 @@ def feature_engineer():
spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list", spark.createDataFrame(train).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids") \ "tag5_list", "tag6_list", "tag7_list", "ids") \
.write.format("tfrecords").save(path=path + "tr/", mode="overwrite") .repartition(1).write.format("tfrecords").save(path=path + "tr/", mode="overwrite")
h = time.time() h = time.time()
print("train tfrecord done") print("train tfrecord done")
print((h - f) / 60) print((h - f) / 60)
print("样本总量:")
print(rdd.count())
test = rdd.filter(lambda x: x[0] == validate_date).map( test = rdd.filter(lambda x: x[0] == validate_date).map(
lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], lambda x: (x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9],
x[10], x[11], x[12], x[13])) x[10], x[11], x[12], x[13]))
...@@ -227,7 +230,7 @@ def feature_engineer(): ...@@ -227,7 +230,7 @@ def feature_engineer():
spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list", spark.createDataFrame(test).toDF("y", "z", "app_list", "level2_list", "level3_list",
"tag1_list", "tag2_list", "tag3_list", "tag4_list", "tag1_list", "tag2_list", "tag3_list", "tag4_list",
"tag5_list", "tag6_list", "tag7_list", "ids") \ "tag5_list", "tag6_list", "tag7_list", "ids") \
.write.format("tfrecords").save(path=path + "va/", mode="overwrite") .repartition(1).write.format("tfrecords").save(path=path + "va/", mode="overwrite")
print("va tfrecord done") print("va tfrecord done")
...@@ -263,6 +266,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -263,6 +266,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
"treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"] "treatment_method", "price_min", "price_max", "treatment_time", "maintain_time", "recover_time"]
df = spark.sql(sql) df = spark.sql(sql)
df = df.drop_duplicates(["ucity_id", "device_id", "cid_id"])
df = df.na.fill(dict(zip(features, features))) df = df.na.fill(dict(zip(features, features)))
f = time.time() f = time.time()
rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids", rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
...@@ -286,16 +290,12 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -286,16 +290,12 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
value_map.get(x[29], 15) value_map.get(x[29], 15)
])) ]))
rdd.persist(storageLevel= StorageLevel.MEMORY_AND_DISK) rdd.persist(storageLevel= StorageLevel.MEMORY_ONLY_SER)
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\ native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id") .toDF("city","uid","cid_id")
print("native csv") print("native csv")
native_pre.toPandas().to_csv(local_path+"native.csv", header=True) native_pre.toPandas().to_csv(local_path+"native.csv", header=True)
# TODO 写成csv文件改成下面这样
# native_pre.coalesce(1).write.format('com.databricks.spark.csv').save(path+"native/",header = 'true')
# 预测的tfrecord必须写成一个文件,这样可以摆保证顺序
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0) spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16]))) \ .map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16]))) \
.toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list", .toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list",
...@@ -309,8 +309,6 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map): ...@@ -309,8 +309,6 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
.toDF("city", "uid", "cid_id") .toDF("city", "uid", "cid_id")
print("nearby csv") print("nearby csv")
native_pre.toPandas().to_csv(local_path + "nearby.csv", header=True) native_pre.toPandas().to_csv(local_path + "nearby.csv", header=True)
# TODO 写成csv文件改成下面这样
# nearby_pre.coalesce(1).write.format('com.databricks.spark.csv').save(path+"nearby/",header = 'true')
spark.createDataFrame(rdd.filter(lambda x: x[0] == 1) spark.createDataFrame(rdd.filter(lambda x: x[0] == 1)
.map( .map(
......
This diff is collapsed.
This diff is collapsed.
...@@ -7,6 +7,7 @@ from pyspark.sql import SparkSession ...@@ -7,6 +7,7 @@ from pyspark.sql import SparkSession
import datetime import datetime
import pandas as pd import pandas as pd
import subprocess import subprocess
import tensorflow as tf
def app_list_func(x,l): def app_list_func(x,l):
...@@ -142,7 +143,22 @@ def get_filename(dir_in): ...@@ -142,7 +143,22 @@ def get_filename(dir_in):
x.append(t) x.append(t)
return x return x
def get_hdfs(dir_in):
pre_path = "hdfs://172.16.32.4:8020"
args = "hdfs dfs -ls " + dir_in + " | awk '{print $8}'"
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
s_output, s_err = proc.communicate()
all_dart_dirs = s_output.split()
a = []
for i in all_dart_dirs:
b = str(i).split("/")[4]
if b[:4] == "part":
tmp = pre_path + str(i)[2:-1]
a.append(tmp)
return a
if __name__ == '__main__': if __name__ == '__main__':
print("hello")
# sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ # sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
# .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ # .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
# .set("spark.tispark.plan.allow_index_double_read", "false") \ # .set("spark.tispark.plan.allow_index_double_read", "false") \
...@@ -161,22 +177,17 @@ if __name__ == '__main__': ...@@ -161,22 +177,17 @@ if __name__ == '__main__':
# #
# validate_date, value_map, app_list_map = feature() # validate_date, value_map, app_list_map = feature()
# get_predict(validate_date, value_map, app_list_map) # get_predict(validate_date, value_map, app_list_map)
#
# [path + "tr/part-r-00000"] #
# spark = SparkSession.builder.getOrCreate() # spark = SparkSession.builder.getOrCreate()
#
# b = [("a", 1), ("a", 1), ("b", 3), ("a", 2)] # b = [("a", 1), ("a", 1), ("b", 3), ("a", 2)]
# rdd = spark.sparkContext.parallelize(b) # rdd = spark.sparkContext.parallelize(b)
# df = spark.createDataFrame(rdd).toDF("id", "n") # df = spark.createDataFrame(rdd).toDF("id", "n")
# df.show() # df.show()
# df.createOrReplaceTempView("df") # df.createOrReplaceTempView("df")
# t = spark.sql("select id from df").map() # t = spark.sql("select id from df").map()
import glob
import random
tr_files = glob.glob("/home/gmuser/test/*")
random.shuffle(tr_files)
print("tr_files:", tr_files)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment