#coding=utf-8 from sqlalchemy import create_engine import pandas as pd import pymysql import MySQLdb import time def con_sql(sql): """ :type sql : str :rtype : tuple """ db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() db.close() return result def nearby_set_join(lst): # return ','.join([str(i) for i in list(lst)]) return ','.join([str(i) for i in lst.unique().tolist()]) def native_set_join(lst): l = lst.unique().tolist() d = int(len(l)/2) if d == 0: d = 1 r = [str(i) for i in l] r =r[:d] return ','.join(r) def main(): # native queue df2 = pd.read_csv('/data/esmm/native.csv') df2['cid_id'] = df2['cid_id'].astype(str) df1 = pd.read_csv("/data/esmm/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"]) df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"] df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False) df3.columns = ["device_id","city_id","native_queue"] print("native_device_count",df3.shape) # nearby queue df2 = pd.read_csv('/data/esmm/nearby.csv') df2['cid_id'] = df2['cid_id'].astype(str) df1 = pd.read_csv("/data/esmm/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"]) df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"] df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False) df4.columns = ["device_id","city_id","nearby_queue"] print("nearby_device_count",df4.shape) #union df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("") df_all['device_id'] = df_all['device_id'].astype(str) df_all['city_id'] = df_all['city_id'].astype(str) ctime = int(time.time()) df_all["time"] = ctime print("union_device_count",df_all.shape) host='10.66.157.22' port=4000 user='root' password='3SYz54LS9#^9sBvC' db='jerry_test' charset='utf8' engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db)) try: # df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1) df_merge = df_all['device_id'] + df_all['city_id'] df_merge_str = (str(list(df_merge.values))).strip('[]') delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str) con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') cur = con.cursor() cur.execute(delete_str) con.commit() df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False) except Exception as e: print(e) if __name__ == '__main__': main()