Commit e589ec80 authored by 张彦钊's avatar 张彦钊

Merge branch 'zhao' into 'master'

把tidb库地址由172.16.40.158改为172.16.40.170

See merge request !36
parents 07e181a7 f458aade
@staticmethod
def fetch_user_topic(device_id, card_type, size):
try:
def filter_topic(cid_list):
try:
if gmkv.exists(dislike_key):
dislike = gmkv.smembers(dislike_key)
if len(cid_list) > 0:
if type(cid_list[0]) == int or type(cid_list[0]) == str:
cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike]
else:
cid_list = [i for i in cid_list if i not in dislike]
return cid_list
else:
return cid_list
except:
return cid_list
def write_after_filter_tractate(cid_list):
try:
if gmkv.exists(after_filter_key):
gmkv.set(after_filter_key, json.dumps(cid_list))
else:
gmkv.set(after_filter_key, json.dumps(cid_list), ex=6 * 60 * 60)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
def get_filter_tractate():
try:
return json.loads(gmkv.get(after_filter_key))
except:
return []
def read_history(cid_list):
if redis_client.exists(today_key):
redis_client.sadd(today_key, *cid_list)
else:
redis_client.sadd(today_key, *cid_list)
redis_client.expire(today_key, 15 * 24 * 60 * 60)
if redis_client.exists(read_key) and redis_client.exists(old_key):
redis_client.sdiffstore(read_key, read_key, old_key)
redis_client.delete(old_key)
redis_client.expire(read_key, time=13 * 24 * 60 * 60)
redis_client.sadd(read_key, *cid_list)
def get_gmkv(redis_ip, redis_port, redis_db, redis_password=""):
try:
if len(redis_password) == 0:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2)
else:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password,
socket_timeout=2)
cli_ins.ping()
return cli_ins
except:
return None
dislike_key = str(device_id) + "_dislike_tractate"
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
after_filter_key = "device_tractate_after_filter:device_id:" + str(device_id)
tractate_key = "tractate_is_tail" + str(device_id)
read_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
old_key = "TS:recommend_tractate_set:device_id:{}:{}" \
.format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_key = "TS:recommend_tractate_set:device_id:{}:{}" \
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
search_list = list()
gmkv = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gmkv = get_gmkv(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"],
redis_db=gm_kv_host_item["db"],
redis_password=gm_kv_host_item["password"])
if gmkv:
break
if device_id != '0':
if redis_client.exists(search_topic_recommend_key):
search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key)
search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue'])
search_topic_recommend_list = filter_topic(search_topic_recommend_list)
if len(search_topic_recommend_list) == 0:
redis_client.delete(search_topic_recommend_key)
elif len(search_topic_recommend_list) <= 2:
search_list = search_topic_recommend_list
size = size - len(search_list)
redis_client.delete(search_topic_recommend_key)
else:
search_list = search_topic_recommend_list[:2]
size = size - 2
redis_client.hset(search_topic_recommend_key, 'tractate_queue',
json.dumps(search_topic_recommend_list[2:]))
if gmkv.exists(tractate_key):
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif gmkv.exists(after_filter_key):
que = get_filter_tractate()
que = filter_topic(que)
if len(que) == 0:
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif len(que) <= size:
search_list.extend(que)
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
else:
search_list.extend(que[:size])
write_after_filter_tractate(que[size:])
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
else:
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
qa = list(filter(None, que.queue.split(',')))
qa = filter_topic(qa)
if len(qa) == 0:
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif len(qa) <= size:
search_list.extend(qa)
search_list = list(map(int, search_list))
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
read_history(search_list)
return search_list
else:
search_list.extend(qa[:size])
search_list = list(map(int, search_list))
write_after_filter_tractate(qa[size:])
read_history(search_list)
return search_list
else:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type,
date=RecommendFeed.current_date())
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return data
except:
logging_exception()
return []
# 帖子以前的方法
def fetch_user_topic(device_id, card_type, size):
try:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type,
date=RecommendFeed.current_date())
if (device_id != '0') and size >= 2:
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
search_topic_recommend_list = list()
search_cursor_ts = 0
if redis_client.exists(search_topic_recommend_key):
search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key)
if b'cursor' in search_topic_recommend_dict:
search_cursor_ts = json.loads(search_topic_recommend_dict[b'cursor'])
if search_cursor_ts < 30:
search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue'])
if search_cursor_ts < len(search_topic_recommend_list):
size = size - 2
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
if device_id != '0' and size >= 2:
if len(search_topic_recommend_list) > 0 and search_cursor_ts < len(search_topic_recommend_list):
queue = search_topic_recommend_list[search_cursor_ts:search_cursor_ts + 2]
queue.extend(data)
data = queue
new_search_cursor = search_cursor_ts + 2
redis_client.hset(search_topic_recommend_key, 'cursor', new_search_cursor)
redis_client.expire(search_topic_recommend_key, 30 * 24 * 60 * 60)
read_topic_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
if len(data) > 0:
redis_client.sadd(read_topic_key, *data)
return data
except:
logging_exception()
return []
# 帖子老方法
def fetch_user_topic(device_id, card_type, size):
try:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type,
date=RecommendFeed.current_date())
if (device_id != '0') and size >= 2:
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
search_topic_recommend_list = list()
search_cursor_ts = 0
if redis_client.exists(search_topic_recommend_key):
search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key)
if b'cursor' in search_topic_recommend_dict:
search_cursor_ts = json.loads(search_topic_recommend_dict[b'cursor'])
if search_cursor_ts < 30:
search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue'])
if search_cursor_ts < len(search_topic_recommend_list):
size = size - 2
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
if device_id != '0' and size >= 2:
if len(search_topic_recommend_list) > 0 and search_cursor_ts < len(search_topic_recommend_list):
queue = search_topic_recommend_list[search_cursor_ts:search_cursor_ts + 2]
queue.extend(data)
data = queue
new_search_cursor = search_cursor_ts + 2
redis_client.hset(search_topic_recommend_key, 'cursor', new_search_cursor)
redis_client.expire(search_topic_recommend_key, 30 * 24 * 60 * 60)
read_topic_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
if len(data) > 0:
redis_client.sadd(read_topic_key, *data)
return data
except:
logging_exception()
return []
#9.6线上qa
def fetch_qa(device_id, card_type, size):
try:
def get_after_filter_qa():
try:
return json.loads(gmkv.get(after_filter_key))
except:
return []
def write_after_filter_qa(cid_list):
try:
if gmkv.exists(after_filter_key):
gmkv.set(after_filter_key, json.dumps(cid_list))
else:
gmkv.set(after_filter_key, json.dumps(cid_list), ex=6 * 60 * 60)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
def filter_qa(device_id, cid_list):
try:
key = str(device_id) + "_dislike_qa"
if gmkv.exists(key):
dislike = gmkv.smembers(key)
if len(cid_list) > 0:
if type(cid_list[0]) == int or type(cid_list[0]) == str:
cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike]
else:
cid_list = [i for i in cid_list if i not in dislike]
return cid_list
else:
return cid_list
except:
return cid_list
def read_history(cid_list):
if redis_client.exists(today_qa_key):
redis_client.sadd(today_qa_key, *cid_list)
else:
redis_client.sadd(today_qa_key, *cid_list)
redis_client.expire(today_qa_key, 15 * 24 * 60 * 60)
if redis_client.exists(read_qa_key) and redis_client.exists(old_qa_key):
redis_client.sdiffstore(read_qa_key, read_qa_key, old_qa_key)
redis_client.delete(old_qa_key)
redis_client.expire(read_qa_key, time=13 * 24 * 60 * 60)
redis_client.sadd(read_qa_key, *cid_list)
def get_gmkv(redis_ip, redis_port, redis_db, redis_password=""):
try:
if len(redis_password) == 0:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2)
else:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password,
socket_timeout=2)
cli_ins.ping()
return cli_ins
except:
return None
search_qa_recommend_list = list()
read_qa_key = "TS:recommend_answer_set:device_id:" + str(device_id)
old_qa_key = "TS:recommend_answer_set:device_id:{}:{}" \
.format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_qa_key = "TS:recommend_answer_set:device_id:{}:{}" \
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
answer_queue_key = "qa_is_tail:" + str(device_id)
after_filter_key = "device_qa_after_filter:device_id:" + str(device_id)
gmkv = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gmkv = get_gmkv(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"],
redis_db=gm_kv_host_item["db"],
redis_password=gm_kv_host_item["password"])
if gmkv:
break
if device_id != '0':
search_qa_recommend_key = "TS:search_recommend_answer_queue:device_id:" + str(device_id)
if redis_client.exists(search_qa_recommend_key):
search_qa_recommend_dict = redis_client.hgetall(search_qa_recommend_key)
queue_list = json.loads(search_qa_recommend_dict[b'answer_queue'])
queue_list = filter_qa(device_id, queue_list)
if len(queue_list) == 0:
redis_client.delete(search_qa_recommend_key)
elif len(queue_list) == 1:
size = size - 1
search_qa_recommend_list = queue_list
redis_client.delete(search_qa_recommend_key)
else:
size = size - 1
search_qa_recommend_list.append(queue_list[0])
redis_client.hset(search_qa_recommend_key, "answer_queue", json.dumps(queue_list[1:]))
if gmkv.exists(answer_queue_key):
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif gmkv.exists(after_filter_key):
que = get_after_filter_qa()
que = filter_qa(device_id, que)
if len(que) == 0:
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(que) <= size:
search_qa_recommend_list.extend(que)
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
search_qa_recommend_list.extend(que[:size])
write_after_filter_qa(que[size:])
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
try:
que = DeviceQAQueue.objects.get(device_id=device_id)
except DeviceQAQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
qa = list(filter(None, que.queue.split(',')))
if device_id != "0":
qa = filter_qa(device_id, qa)
if len(qa) == 0:
if device_id != "0":
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(qa) <= size:
search_qa_recommend_list.extend(qa)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
if device_id != "0":
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
search_qa_recommend_list.extend(qa[:size])
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
if device_id != "0":
write_after_filter_qa(qa[size:])
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
try:
que = DeviceQAQueue.objects.get(device_id=device_id)
except DeviceQAQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
# redis_client.set(key, cursor + size, ex=24 * 60 * 60)
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_answer_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return data
except:
logging_exception()
return []
......@@ -37,19 +37,19 @@ def get_list(db,sql,n):
def get_map():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select app_list from device_app_list"
a = time.time()
apps_number, app_list_map = get_list(db,sql,16)
print("applist")
print((time.time()-a)/60)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_ids from diary_feat"
b = time.time()
leve2_number, leve2_map = get_list(db, sql, 16+apps_number)
print("leve2")
print((time.time() - b) / 60)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level3_ids from diary_feat"
c = time.time()
leve3_number, leve3_map = get_list(db, sql, 16+leve2_number+apps_number)
......@@ -77,7 +77,7 @@ def con_sql(db,sql):
def get_pre_number():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select count(*) from esmm_pre_data"
cursor = db.cursor()
cursor.execute(sql)
......@@ -103,65 +103,65 @@ def feature_engineer():
leve2_map["search_tag2"] = 27
unique_values = []
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct stat_date from esmm_train_data_dwell"
unique_values.extend(get_unique(db,sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ucity_id from esmm_train_data_dwell"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct ccity_name from esmm_train_data_dwell"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct time from cid_time_cut"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct device_type from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct manufacturer from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct channel from user_feature"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct top from cid_type_top"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_min from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_method from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct price_max from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct treatment_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct maintain_time from knowledge"
unique_values.extend(get_unique(db, sql))
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct recover_time from knowledge"
unique_values.extend(get_unique(db, sql))
# unique_values.append("video")
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data_dwell"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
......@@ -169,7 +169,7 @@ def feature_engineer():
start = (temp - datetime.timedelta(days=180)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC')
sql = "select distinct doctor.hospital_id from jerry_test.esmm_train_data_dwell e " \
"left join eagle.src_zhengxing_api_service service on e.diary_service_id = service.id " \
"left join eagle.src_zhengxing_api_doctor doctor on service.doctor_id = doctor.id " \
......@@ -374,7 +374,7 @@ if __name__ == '__main__':
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf")\
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf")\
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec","snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
......
......@@ -20,7 +20,7 @@ def get_esmm_users():
stat_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
sql = "select distinct device_id,city_id from data_feed_exposure_precise " \
"where stat_date = '{}'".format(stat_date)
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC','jerry_prod',sql)
result = get_mysql_data('172.16.40.170', 4000, 'root','3SYz54LS9#^9sBvC','jerry_prod',sql)
result = list(result)
return result
except:
......@@ -70,7 +70,7 @@ def get_searchworlds_to_tagid():
def get_queues(device_id,city_id):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
db = pymysql.connect(host='172.16.40.170', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
sql = "select native_queue, nearby_queue, nation_queue, megacity_queue from esmm_device_diary_queue " \
......@@ -95,7 +95,7 @@ def tag_boost(cid_str, tag_list):
"(select a.diary_id,b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type < '4' and a.diary_id in {}) tmp " \
"where id in {} group by id".format(tuple(cids), tuple(tag_list))
result = get_mysql_data('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC','eagle',sql)
result = get_mysql_data('172.16.40.170', 4000, 'root', '3SYz54LS9#^9sBvC','eagle',sql)
if len(result) > 0:
tag_cids = {}
left_cids = []
......@@ -147,13 +147,13 @@ def tag_boost(cid_str, tag_list):
def to_data_base(df):
sql = "select distinct device_id from esmm_resort_diary_queue"
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC', 'jerry_test',sql)
result = get_mysql_data('172.16.40.170', 4000, 'root','3SYz54LS9#^9sBvC', 'jerry_test',sql)
old_uid = [i[0] for i in result]
if len(old_uid) > 0:
old_uid = set(df["device_id"].values)&set(old_uid)
old_number = len(old_uid)
if old_number > 0:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
db = pymysql.connect(host='172.16.40.170', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "delete from esmm_resort_diary_queue where device_id in {}".format(tuple(old_uid))
......@@ -163,7 +163,7 @@ def to_data_base(df):
cursor.close()
db.close()
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@172.16.40.158:4000/jerry_test?charset=utf8')
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@172.16.40.170:4000/jerry_test?charset=utf8')
pd.io.sql.to_sql(df, "esmm_resort_diary_queue", yconnect, schema='jerry_test', if_exists='append', index=False,
chunksize=200)
print("insert done")
......
......@@ -11,7 +11,7 @@ def con_sql(sql):
:type sql : str
:rtype : tuple
"""
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
db = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
......@@ -58,7 +58,7 @@ def main():
df_all["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
print("union_device_count",df_all.shape)
host='172.16.40.158'
host='172.16.40.170'
port=4000
user='root'
password='3SYz54LS9#^9sBvC'
......@@ -78,7 +78,7 @@ def main():
try:
for i in df_merge_str:
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(i)
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
con = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
......
......@@ -396,7 +396,7 @@ def df_sort(result,queue_name):
def update_or_insert(df2,queue_name):
device_count = df2.shape[0]
con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
con = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test', charset = 'utf8')
cur = con.cursor()
try:
for i in range(0, device_count):
......
......@@ -211,30 +211,49 @@ def make_data(device_id,city_id,key_head):
# device_id = "868663038800476"
city_id = "beijing"
def topic():
device_id = "78687687"
dislike_key = str(device_id) + "_dislike_tractate"
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/2")
r.sadd(dislike_key, *[1,2])
print(r.smembers(dislike_key))
search = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
a = [1]
a.extend(list(range(36, 50)))
r.hset(search, 'tractate_queue',json.dumps(a))
print(r.hgetall(search))
def black(x):
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing",
cursorclass=pymysql.cursors.DictCursor)
cursor = db_zhengxing.cursor()
date_str = str(datetime.datetime.now())
sql = "REPLACE INTO hippo_deviceblacklist(device_id,create_at,update_at,pull_black_type)" \
"values('{}','{}','{}',{})".format(x,date_str,date_str,1)
cursor.execute(sql)
db_zhengxing.commit()
db_zhengxing.close()
def ip_black(x):
db_zhengxing = pymysql.connect(host="172.16.30.143", port=3306, user="work",
password="BJQaT9VzDcuPBqkd",
db="zhengxing",
cursorclass=pymysql.cursors.DictCursor)
cursor = db_zhengxing.cursor()
date_str = str(datetime.datetime.now())
sql = "REPLACE INTO hippo_ipblacklist(ip,create_at,update_at,pull_black_type)" \
"values('{}','{}','{}',{})".format(x, date_str, date_str, 1)
cursor.execute(sql)
db_zhengxing.commit()
db_zhengxing.close()
if __name__ == "__main__":
users_list = list(range(1,90))
n = 3
split_users_list = [users_list[i:i + n] for i in range(0, len(users_list), n)]
for child_users_list in split_users_list:
total_samples = list()
for uid_city in child_users_list:
# tag_list = get_user_profile(uid_city[0])
# queues = get_queues(uid_city[0], uid_city[1])
# if len(queues) > 0 and len(tag_list) > 0:
# new_native = tag_boost(queues[0], tag_list)
# new_nearby = tag_boost(queues[1], tag_list)
#
# insert_time = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
# sample = [uid_city[0], uid_city[1], new_native, new_nearby, queues[2], queues[3], insert_time]
total_samples.append(uid_city)
if len(total_samples) > 0:
df = pd.DataFrame(total_samples)
df = df.rename(columns={0: "device_id"})
print("df numbers")
print(df.shape[0])
# to_data_base(df)
ip_black("hello")
......
......@@ -183,35 +183,9 @@ def get_all_users():
if __name__ == "__main__":
# users_list = get_esmm_users()
# print("user number")
# print(len(users_list))
users_list = get_all_users()
name_tag = get_searchworlds_to_tagid()
n = 500
split_users_list = [users_list[i:i + n] for i in range(0, len(users_list), n)]
for child_users_list in split_users_list:
total_samples = list()
for uid_city in child_users_list:
tag_list = get_user_profile(uid_city[0])
queues = get_queues(uid_city[0], uid_city[1])
if len(queues) > 0:
new_native = tag_boost(queues[0], tag_list)
new_nearby = tag_boost(queues[1], tag_list)
insert_time = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
sample = [uid_city[0], uid_city[1], new_native, new_nearby, queues[2], queues[3], insert_time]
total_samples.append(sample)
if len(total_samples) > 0:
df = pd.DataFrame(total_samples)
df = df.rename(columns={0: "device_id", 1: "city_id",2:"native_queue",
3:"nearby_queue",4:"nation_queue",5:"megacity_queue",6:"time"})
print("数量")
print(df.shape[0])
to_data_base(df)
device_id = "868663038800476"
city_id = "beijing"
queues = get_queues(device_id, city_id)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment