Commit 21af8f0b authored by 张彦钊's avatar 张彦钊

Merge branch 'zhao' into 'master'

把tidib地址由172.16.40.158改为172.16.40.170

See merge request !38
parents 0a84be4b abbf1a3f
......@@ -37,19 +37,19 @@ def get_list(db,sql,n):
def get_map():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select app_list from device_app_list"
a = time.time()
apps_number, app_list_map = get_list(db,sql,16)
print("applist")
print((time.time()-a)/60)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_ids from diary_feat"
b = time.time()
leve2_number, leve2_map = get_list(db, sql, 16+apps_number)
print("leve2")
print((time.time() - b) / 60)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level3_ids from diary_feat"
c = time.time()
leve3_number, leve3_map = get_list(db, sql, 16+leve2_number+apps_number)
......@@ -77,7 +77,7 @@ def con_sql(db,sql):
def get_pre_number():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select count(*) from esmm_pre_data"
cursor = db.cursor()
cursor.execute(sql)
......@@ -103,65 +103,65 @@ def feature_engineer():
leve2_map["search_tag2"] = 27
unique_values = []
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct stat_date from esmm_train_data_dwell"
unique_values.extend(get_unique(db,sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ucity_id from esmm_train_data_dwell"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ccity_name from esmm_train_data_dwell"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct time from cid_time_cut"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct device_type from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct manufacturer from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct channel from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct top from cid_type_top"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_min from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_method from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_max from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct maintain_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct recover_time from knowledge"
unique_values.extend(get_unique(db, sql))
# unique_values.append("video")
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data_dwell"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
......@@ -169,7 +169,7 @@ def feature_engineer():
start = (temp - datetime.timedelta(days=180)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
......@@ -374,7 +374,7 @@ if __name__ == '__main__':
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")\
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf")\
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec","snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
......
......@@ -20,7 +20,7 @@ def get_esmm_users():
stat_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
sql = "select distinct device_id,city_id from data_feed_exposure_precise " \
"where stat_date = '{}'".format(stat_date)
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC','jerry_prod',sql)
result = get_mysql_data('172.16.40.170', 4000, 'root','3SYz54LS9#^9sBvC','jerry_prod',sql)
result = list(result)
return result
except:
......@@ -70,7 +70,7 @@ def get_searchworlds_to_tagid():
def get_queues(device_id,city_id):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
db = pymysql.connect(host='172.16.40.170', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
sql = "select native_queue, nearby_queue, nation_queue, megacity_queue from esmm_device_diary_queue " \
......@@ -95,7 +95,7 @@ def tag_boost(cid_str, tag_list):
"(select a.diary_id,b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type < '4' and a.diary_id in {}) tmp " \
"where id in {} group by id".format(tuple(cids), tuple(tag_list))
result = get_mysql_data('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC','eagle',sql)
result = get_mysql_data('172.16.40.170', 4000, 'root', '3SYz54LS9#^9sBvC','eagle',sql)
if len(result) > 0:
tag_cids = {}
left_cids = []
......@@ -147,13 +147,13 @@ def tag_boost(cid_str, tag_list):
def to_data_base(df):
sql = "select distinct device_id from esmm_resort_diary_queue"
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC', 'jerry_test',sql)
result = get_mysql_data('172.16.40.170', 4000, 'root','3SYz54LS9#^9sBvC', 'jerry_test',sql)
old_uid = [i[0] for i in result]
if len(old_uid) > 0:
old_uid = set(df["device_id"].values)&set(old_uid)
old_number = len(old_uid)
if old_number > 0:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
db = pymysql.connect(host='172.16.40.170', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "delete from esmm_resort_diary_queue where device_id in {}".format(tuple(old_uid))
......@@ -163,7 +163,7 @@ def to_data_base(df):
cursor.close()
db.close()
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@172.16.40.158:4000/jerry_test?charset=utf8')
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@172.16.40.170:4000/jerry_test?charset=utf8')
pd.io.sql.to_sql(df, "esmm_resort_diary_queue", yconnect, schema='jerry_test', if_exists='append', index=False,
chunksize=200)
print("insert done")
......
......@@ -58,7 +58,7 @@ def main():
df_all["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
print("union_device_count",df_all.shape)
host='172.16.40.158'
host='172.16.40.170'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
......@@ -78,7 +78,7 @@ def main():
try:
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
con = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
......
......@@ -396,7 +396,7 @@ def df_sort(result,queue_name):
def update_or_insert(df2,queue_name):
device_count = df2.shape[0]
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
con = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
cur = con.cursor()
try:
for i in range(0, device_count):
......
......@@ -181,6 +181,7 @@ def get_all_users():
except:
return []
def
if __name__ == "__main__":
device_id = "868663038800476"
......
......@@ -192,24 +192,29 @@ def diary_write(device_id,cid):
# ssc.start()
# ssc.awaitTermination()
def make_data(device_id,city_id,key_head):
def make_data():
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/2")
device_id = "868240031404281"
city_id = "beijing"
key_head = "device_diary_queue_rerank:device_id:"
# key_head = "device_diary_queue:device_id:"
key = key_head + device_id + ":city_id:" + city_id
native = ",".join([str(i) for i in (range(100, 130))])
nearby = ",".join([str(i) for i in (range(130, 150))])
nation = ",".join([str(i) for i in (range(150, 180))])
megacity = ",".join([str(i) for i in (range(180, 200))])
r.hset(name=key, key="native_queue", value=native)
r.hset(name=key, key="nearby_queue", value=nearby)
r.hset(name=key, key="nation_queue", value=nation)
r.hset(name=key, key="megacity_queue", value=megacity)
print(r.hgetall(key))
native = ",".join([str(i) for i in (range(100, 102))])
nearby = ",".join([str(i) for i in (range(102, 106))])
nation = ",".join([str(i) for i in (range(106, 110))])
megacity = ",".join([str(i) for i in (range(110, 118))])
key_head = "device_diary_queue_rerank:device_id:"
# key_head = "device_diary_queue:device_id:"
device_id = "868663038800471"
# key_head = "device_diary_queue_rerank:device_id:"
# make_data(device_id, "beijing", key_head)
# device_id = "868663038800476"
city_id = "beijing"
def topic():
device_id = "78687687"
......@@ -223,34 +228,9 @@ def topic():
r.hset(search, 'tractate_queue',json.dumps(a))
print(r.hgetall(search))
def black(x):
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing",
cursorclass=pymysql.cursors.DictCursor)
cursor = db_zhengxing.cursor()
date_str = str(datetime.datetime.now())
sql = "REPLACE INTO hippo_deviceblacklist(device_id,create_at,update_at,pull_black_type)" \
"values('{}','{}','{}',{})".format(x,date_str,date_str,1)
cursor.execute(sql)
db_zhengxing.commit()
db_zhengxing.close()
def ip_black(x):
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing",
cursorclass=pymysql.cursors.DictCursor)
cursor = db_zhengxing.cursor()
date_str = str(datetime.datetime.now())
sql = "REPLACE INTO hippo_ipblacklist(ip,create_at,update_at,pull_black_type)" \
"values('{}','{}','{}',{})".format(x, date_str, date_str, 1)
cursor.execute(sql)
db_zhengxing.commit()
db_zhengxing.close()
if __name__ == "__main__":
ip_black("hello")
make_data()
......
from config import *
import pandas as pd
import pickle
import xlearn as xl
from userProfile import *
import time
from utils import *
import os
# 将device_id、city_id拼接到对应的城市热门日记表。注意:下面预测集特征顺序要与训练集保持一致
def feature_en(user_profile):
file_name = DIRECTORY_PATH + "diaryTestSet/{0}DiaryTop3000.csv".format(user_profile['city_id'])
data = pd.read_csv(file_name)
data["device_id"] = user_profile['device_id']
now = datetime.now()
data["hour"] = now.hour
data["minute"] = now.minute
data.loc[data["hour"] == 0, ["hour"]] = 24
data.loc[data["minute"] == 0, ["minute"]] = 60
data["hour"] = data["hour"].astype("category")
data["minute"] = data["minute"].astype("category")
# 虽然预测y,但ffm转化需要y,并不影响预测结果
data["y"] = 0
data = data.drop("city_id", axis=1)
return data
# 把ffm.pkl load进来,将上面的表转化为ffm格式
def transform_ffm_format(df, device_id):
with open(DIRECTORY_PATH+"ffm.pkl","rb") as f:
ffm_format_pandas = pickle.load(f)
data = ffm_format_pandas.transform(df)
now = datetime.now().strftime("%Y-%m-%d-%H-%M")
predict_file_name = DIRECTORY_PATH + "result/{0}_{1}DiaryTop3000.csv".format(device_id, now)
data.to_csv(predict_file_name, index=False,header=None)
print("成功将ffm预测文件写到服务器")
return predict_file_name
# 将模型加载,预测,把预测日记的概率值按照降序排序,存到一个表里
def predict(user_profile):
instance = feature_en(user_profile)
instance_file_path = transform_ffm_format(instance, user_profile["device_id"])
ffm_model = xl.create_ffm()
ffm_model.setTest(instance_file_path)
ffm_model.setSigmoid()
ffm_model.predict(DIRECTORY_PATH + "model.out",
DIRECTORY_PATH + "result/{0}_output.txt".format(user_profile['device_id']))
print("该用户预测结束")
predict_save_to_local(user_profile, instance)
# 将预测结果与device_id 进行拼接,并按照概率降序排序
def wrapper_result(user_profile, instance):
proba = pd.read_csv(DIRECTORY_PATH +
"result/{0}_output.txt".format(user_profile['device_id']), header=None)
proba = proba.rename(columns={0: "prob"})
proba["cid"] = instance['cid']
proba = proba.sort_values(by="prob", ascending=False)
proba = proba.head(50)
return proba
# 预测候选集保存到本地
def predict_save_to_local(user_profile, instance):
proba = wrapper_result(user_profile, instance)
proba.loc[:, "url"] = proba["cid"].apply(lambda x: "http://m.igengmei.com/diary_book/" + str(x[6:]) + '/')
proba.to_csv(DIRECTORY_PATH + "result/feed_{}".format(user_profile['device_id']), index=False)
print("成功将预测候选集保存到本地")
def router(device_id):
user_profile, not_exist = fetch_user_profile(device_id)
if not_exist:
print('Sorry, we don\'t have you.')
else:
predict(user_profile)
# 多进程预测
def multi_predict(predict_list,processes=12):
pool = Pool(processes)
for device_id in predict_list:
start = time.time()
pool.apply_async(router, (device_id,))
end = time.time()
print("该用户{}预测耗时{}秒".format(device_id, (end - start)))
pool.close()
pool.join()
if __name__ == "__main__":
# TODO 如果耗时小于一分钟,下一次取到的device_id和上一次相同。还有一种情况,一个用户持续活跃,会被重复预测
while True:
empty,device_id_list = get_active_users()
if empty:
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
time.sleep(58)
else:
old_device_id_list = pd.read_csv(DIRECTORY_PATH + "data_set_device_id.csv")["device_id"].values.tolist()
# 求活跃用户和老用户的交集,也就是只预测老用户
predict_list = list(set(device_id_list) & set(old_device_id_list))
multi_predict(predict_list)
#TODO 上线前把预测流程中的计时器、打印代码删掉或者注释,因为预测对性能要求高,能少一条代码语句就少一条
import pymysql
import datetime
import json
import redis
import pandas as pd
from sqlalchemy import create_engine
def get_mysql_data(host,port,user,passwd,db,sql):
db = pymysql.connect(host=host, port=port, user=user, passwd=passwd,db=db)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def get_user():
sql = "select user_id,phone,name,city_id,channel,auth_type from api_userextra"
df = pd.DataFrame(list(get_mysql_data(host,port,user,passwd,db,sql)))
df = df.rename(columns={0: "user_id", 1: "phone",2:"name",3:"city_id",4:"channel",5:"auth_type"})
print(df.shape)
l = 0
r = 2000
length = len(df)
while l < length:
pd.io.sql.to_sql(df[l:r], "user", yconnect, schema='jerry_test', if_exists='append', index=False)
l += 2000
r += 2000
print("insert done")
def get_device():
sql = "select device_id,platform,version,model,screen,channel,idfv from statistic_device"
if __name__ == "__main__":
host = "172.16.30.141"
port = 3306
user = "work"
passwd = "BJQaT9VzDcuPBqkd"
db = "zhengxing"
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@172.16.40.170:4000/jerry_test?charset=utf8')
get_user()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment