from sqlalchemy import create_engine import pandas as pd import pymysql import MySQLdb import time def con_sql(sql): """ :type sql : str :rtype : tuple """ db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() db.close() return result def set_join(lst): return ','.join(set(lst)) def main(): sql = "select device_id,city_id,cid from esmm_data2ffm_infer_native" result = con_sql(sql) dct = {"uid":[],"city":[],"cid_id":[]} for i in result: dct["uid"].append(i[0]) dct["city"].append(i[1]) dct["cid_id"].append(i[2]) df1 = pd.read_csv("/home/gaoyazhe/data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr"]) df2 = pd.DataFrame(dct) df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"] df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False) ctime = int(time.time()) df3["time"] = ctime df3.columns = ["device_id","city_id","native_queue","time"] print("native_device_count",df3.shape) sql_nearby = "select device_id,city_id,cid from esmm_data2ffm_infer_nearby" result = con_sql(sql_nearby) dct = {"uid":[],"city":[],"cid_id":[]} for i in result: dct["uid"].append(i[0]) dct["city"].append(i[1]) dct["cid_id"].append(i[2]) df1 = pd.read_csv("/home/gaoyazhe/data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr"]) df2 = pd.DataFrame(dct) df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"] df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False) df4.columns = ["device_id","city_id","nearby_queue"] print("nearby_device_count",df4.shape) #union df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("") print("union_device_count",df_all.shape) host='10.66.157.22' port=4000 user='root' password='3SYz54LS9#^9sBvC' db='jerry_test' charset='utf8' engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db)) try: df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='replace',index=False) except Exception as e: print(e) if __name__ == '__main__': main()