Commit ca6ccf95 authored by Your Name's avatar Your Name

dist test

parent 573e0080
......@@ -11,6 +11,7 @@ from pyspark import StorageLevel
from pyspark.sql import Row
import os
import sys
from sqlalchemy import create_engine
def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
print('Parsing', filenames)
......@@ -198,6 +199,11 @@ def main(te_file):
def trans(x):
return str(x)[2:-1] if str(x)[0] == 'b' else x
def set_join(lst):
l = lst.unique().tolist()
r = [str(i) for i in l]
r =r[:500]
return ','.join(r)
if __name__ == "__main__":
......@@ -219,8 +225,8 @@ if __name__ == "__main__":
spark.sparkContext.setLogLevel("WARN")
path = "hdfs://172.16.32.4:8020/strategy/esmm/"
df = spark.read.format("tfrecords").load(path+"test_native/part-r-00000")
df.show()
# df = spark.read.format("tfrecords").load(path+"test_native/part-r-00000")
# df.show()
# te_files = []
# for i in range(0,10):
......@@ -273,22 +279,34 @@ if __name__ == "__main__":
# for i in range(10,100):
# te_files.append([path + "test_nearby/part-r-000" + str(i)])
#
# te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/test_nearby/part-r-00000"]
#
# rdd_te_files = spark.sparkContext.parallelize(te_files)
# print("-"*100)
# indices = rdd_te_files.repartition(1).map(lambda x: main(x))
# # print(indices.take(1))
# print("-" * 100)
#
# te_result_dataframe = spark.createDataFrame(indices.flatMap(lambda x: x.split(";")).map(
# lambda l: Row(sample_id=l.split(":")[0],uid=l.split(":")[1],city=l.split(":")[2],cid_id=l.split(":")[3],ctcvr=l.split(":")[4])))
#
#nearby data
te_files = ["hdfs://172.16.32.4:8020/strategy/esmm/test_nearby/part-r-00000"]
rdd_te_files = spark.sparkContext.parallelize(te_files)
print("-"*100)
indices = rdd_te_files.repartition(1).map(lambda x: main(x))
# print(indices.take(1))
print("-" * 100)
te_result_dataframe = spark.createDataFrame(indices.flatMap(lambda x: x.split(";")).map(
lambda l: Row(sample_id=l.split(":")[0],uid=l.split(":")[1],city=l.split(":")[2],cid_id=l.split(":")[3],ctcvr=l.split(":")[4])))
# print("nearby rdd data")
# te_result_dataframe.show()
# nearby_data = te_result_dataframe.toPandas()
# print("nearby pd data")
nearby_data = te_result_dataframe.toPandas()
print("nearby pd data")
nearby_data["cid_id1"] = nearby_data["cid_id"].apply(trans)
nearby_data["city1"] = nearby_data["city"].apply(trans)
nearby_data["uid1"] = nearby_data["uid"].apply(trans)
print(nearby_data.head())
df3 = nearby_data.groupby(by=["uid1", "city1"]).apply(lambda x: x.sort_values(by="ctcvr", ascending=False)) \
.reset_index(drop=True).groupby(by=["uid1", "city1"]).agg({'cid_id1': set_join}).reset_index(drop=False)
df3.columns = ["device_id", "city_id", "native_queue"]
print("native_device_count", df3.shape)
# print(nearby_data.head())
# print(nearby_data.dtypes)
# print("elem type")
......@@ -296,17 +314,68 @@ if __name__ == "__main__":
# print(type(nearby_data["cid_id"][0]))
#native data
native_data = spark.read.parquet(path+"native_result/")
print("native rdd data")
native_data.show()
# print("native rdd data")
# native_data.show()
native_data_pd = native_data.toPandas()
print("native pd data")
print(native_data_pd.head())
# print(native_data_pd.head())
native_data_pd["cid_id1"] = native_data_pd["cid_id"].apply(trans)
native_data_pd["city1"] = native_data_pd["city"].apply(trans)
native_data_pd["uid1"] = native_data_pd["uid"].apply(trans)
print(native_data_pd.head())
print(native_data_pd.dtypes)
df4 = native_data_pd.groupby(by=["uid1", "city1"]).apply(lambda x: x.sort_values(by="ctcvr", ascending=False)) \
.reset_index(drop=True).groupby(by=["uid1", "city1"]).agg({'cid_id1': set_join}).reset_index(drop=False)
df4.columns = ["device_id", "city_id", "nearby_queue"]
print("nearby_device_count", df4.shape)
# print(native_data_pd.dtypes)
# union
df_all = pd.merge(df3, df4, on=['device_id', 'city_id'], how='outer').fillna("")
df_all['device_id'] = df_all['device_id'].astype(str)
df_all['city_id'] = df_all['city_id'].astype(str)
df_all["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
print("union_device_count", df_all.shape)
host = '172.16.40.158'
port = 4000
user = 'root'
password = '3SYz54LS9#^9sBvC'
db = 'jerry_test'
charset = 'utf8'
#delete table
df_merge = df_all['device_id'] + df_all['city_id']
to_delete = list(df_merge.values)
total = len(to_delete)
df_merge_str = [str(to_delete[:int(total / 5)]).strip('[]')]
for i in range(2, 6):
start = int(total * (i - 1) / 5)
end = int(total * i / 5)
tmp = str(to_delete[start:end]).strip('[]')
df_merge_str.append(tmp)
try:
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
print("delete done")
con.close()
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df_all.to_sql('esmm_device_diary_queue', con=engine, if_exists='append', index=False, chunksize=8000)
print("insert done")
except Exception as e:
print(e)
print("耗时(秒):")
print((time.time()-b))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment