Commit d84b3ee6 authored by 张彦钊's avatar 张彦钊

change test file

parent 92807d12
......@@ -6,6 +6,7 @@ from pyspark.sql import SparkSession
import datetime
import pandas as pd
import time
from pyspark import StorageLevel
def app_list_func(x,l):
......@@ -138,7 +139,7 @@ def feature_engineer():
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=3)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=100)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
......@@ -203,7 +204,8 @@ def feature_engineer():
value_map[x[22]], value_map[x[23]], value_map[x[24]], value_map[x[25]], value_map[x[26]]]))
d = time.time()
rdd.persist()
rdd.persist(storageLevel= StorageLevel.MEMORY_AND_DISK)
# TODO 上线后把下面train fliter 删除,因为最近一天的数据也要作为训练集
......@@ -264,7 +266,7 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
df = spark.sql(sql)
df = df.na.fill(dict(zip(features, features)))
c = time.time()
f = time.time()
rdd = df.select("label", "y", "z", "ucity_id", "device_id", "cid_id", "app_list", "level2_ids", "level3_ids",
"tag1", "tag2", "tag3", "tag4", "tag5", "tag6", "tag7",
"ucity_id", "ccity_name", "device_type", "manufacturer", "channel", "top", "time",
......@@ -286,10 +288,8 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
value_map.get(x[29], 299985)
]))
rdd.persist()
d = time.time()
print("rdd")
print((d-c)/60)
rdd.persist(storageLevel= StorageLevel.MEMORY_AND_DISK)
native_pre = spark.createDataFrame(rdd.filter(lambda x:x[0] == 0).map(lambda x:(x[3],x[4],x[5])))\
.toDF("city","uid","cid_id")
print("native csv")
......@@ -298,7 +298,6 @@ def get_predict(date,value_map,app_list_map,leve2_map,leve3_map):
# native_pre.coalesce(1).write.format('com.databricks.spark.csv').save(path+"native/",header = 'true')
# 预测的tfrecord必须写成一个文件,这样可以摆保证顺序
f = time.time()
spark.createDataFrame(rdd.filter(lambda x: x[0] == 0)
.map(lambda x: (x[1],x[2],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16]))) \
.toDF("y","z","app_list", "level2_list", "level3_list","tag1_list", "tag2_list", "tag3_list", "tag4_list",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment