Commit 459fd91c authored by 张彦钊's avatar 张彦钊

Merge branch 'zhao' into 'master'

nearby queue 改成取top500日记,写入数据库增加chunksize参数

See merge request !15
parents b1df4af7 edb20693
......@@ -18,18 +18,10 @@ def con_sql(sql):
db.close()
return result
def nearby_set_join(lst):
# return ','.join([str(i) for i in list(lst)])
return ','.join([str(i) for i in lst.unique().tolist()])
def native_set_join(lst):
l = lst.unique().tolist()
d = len(l)
if d > 500:
d = 500
r = [str(i) for i in l]
r =r[:d]
def set_join(lst):
r = [str(i) for i in lst.unique().tolist()]
r =r[:500]
return ','.join(r)
def main():
......@@ -40,7 +32,7 @@ def main():
df1 = pd.read_csv("/home/gmuser/esmm_data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"]
print("native_device_count",df3.shape)
......@@ -51,7 +43,7 @@ def main():
df1 = pd.read_csv("/home/gmuser/esmm_data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
......@@ -83,7 +75,7 @@ def main():
cur = con.cursor()
cur.execute(delete_str)
con.commit()
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False)
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False,chunksize=8000)
except Exception as e:
print(e)
......
import pandas as pd
import pymysql
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def cut_map(x):
if 0 < x <= 5:
return 2
elif 5 < x <= 10:
return 3
elif 10 < x <= 15:
return 4
elif 15 < x <= 20:
return 5
elif 20 < x <= 40:
return 6
else:
return 7
def cut():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select cid_id,time from cid_time"
df = con_sql(db, sql)
df = df.rename(columns={0: "cid", 1: "time"})
print(df.shape)
part_1 = df.loc[df["time"] == 0]
part_2 = df.loc[df["time"] != 0]
part_1["time"] = 1
part_2["time"] = part_2["time"].map(cut_map)
merge = part_1.append(part_2)
print(merge.shape)
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
pd.io.sql.to_sql(merge, "cid_time_cut", yconnect, schema='jerry_test', if_exists='replace', index=False)
if __name__ == "__main__":
cut()
import pandas as pd
import pymysql
import datetime
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def multi():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select diary_id,level2_ids from diary_feat"
df = con_sql(db, sql).dropna()
print(df.shape)
df = df.rename(columns={0: "cid", 1: "level"})
df["l1"] = "lost"
df["l2"] = "lost"
df["l3"] = "lost"
for i in list(df["level"].unique()):
l = [int(j) for j in i.split(";")]
l = sorted(l)
if len(l) >= 3:
df.loc[df["level"] == i, ["l1"]] = l[0]
df.loc[df["level"] == i, ["l2"]] = l[1]
df.loc[df["level"] == i, ["l3"]] = l[2]
elif len(l) == 2:
df.loc[df["level"] == i, ["l1"]] = l[0]
df.loc[df["level"] == i, ["l2"]] = l[1]
elif len(l) == 1:
df.loc[df["level"] == i, ["l1"]] = l[0]
df = df.drop("level",axis=1)
print(df.head())
# a = list(df["l1"].unique())
# b = list(df["l2"].unique())
# c = list(df["l3"].unique())
# print(len(a))
# print(a)
# print(len(b))
# print(b)
# print(len(c))
# print(c)
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
n = 200000
for i in range(0, df.shape[0], n):
if i == 0:
temp = df.iloc[0:n]
elif i + n > df.shape[0]:
temp = df.iloc[i:]
else:
temp = df.iloc[i:i + n]
pd.io.sql.to_sql(temp, "cid_level2", yconnect, schema='jerry_test', if_exists='append', index=False)
print("insert done")
if __name__ == "__main__":
multi()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment