Commit 2d4411f4 authored by 张彦钊's avatar 张彦钊

add

parent 4eac817f
......@@ -8,9 +8,58 @@ url="http://doris.paas-test.env/v1/once"
header_dict={'Content-Type': 'application/x-www-form-urlencoded'}
param_dict={}
param_dict["method"]="doris/recommend/spu_detail_diary"
param_dict["method"]="doris/recommend/ai_faces"
param_detail = {"device_id":"xx"}
param_detail = {"device_id":"3ab", "size":10,"offset":0,"service_id":1437}
# param_dict["method"]="doris/search/doctor_officer"
# param_detail = {"device_id": '1',"size": 10,"offset":0, "query": "双眼皮","user_city_tag_id": 328,
# "is_officer":True,"filters":{},"sort_type":3}
# param_dict["method"]="doris/recommend/doctor_service"
# param_detail = {"id":"fengxiaobing","offset":0,"size":2,"is_doctor":False,"device_id":"aaa"}
# param_dict["method"]="doris/search/tag_count"
# param_detail = {"device_id":"3ab",
# "doctor_list":["e30b5c076770446b84580162b7d4df48","e35e6dbdabc740e69e2808e33505feaf"],
# "query":"吸脂"}
# param_dict["method"]="doris/recommend/homepage_polymer"
# param_detail = {"device_id":"3ab","size":10}
# param_dict["method"]="doris/search/top_doctor"
# #cpt
#
# param_detail = {"device_id":"865007","size":10,"user_city_tag_id":328,"offset":0,"is_officer":True,
# "business_recomment":True,"filters":{}}
# param_detail = {"is_merchant_cpt":True,"is_operation_cpt":True,"device_id":"baby",
# "size":6,"offset":0,"query":"切开双眼皮","user_city_tag_id":328,"city_id":"beijing",
# "sort_type":0,"sort_with_submission":False,"is_feed":False,'filters': {'area_tag_id': 328}
# }
# param_dict["method"]="doris/search/query_sku"
# param_detail = {"sort_with_submission": False, "user_city_tag_id": 328, "sort_type": 0,
# "filters": {}, "offset": 0, "query": "切开双眼皮","size": 17}
# 第一次请求
# param_detail = {"is_merchant_cpt":True,"is_operation_cpt":True,"device_id":"baby",
# "size":10,"offset":0,"query":"切开双眼皮","user_city_tag_id":328,"city_id":"beijing",
# "sort_type":0,"sort_with_submission":False,"is_feed":True,'filters': {'area_tag_id': 328}
# }
# # # 第二次请求
#
# param_detail = {"is_merchant_cpt":True,"is_operation_cpt":True,"device_id":"baby",
# "size":10,"offset":0,"query":"切开双眼皮","user_city_tag_id":328,"city_id":"beijing",
# "sort_type":0,"sort_with_submission":False,"is_feed":False,'filters': {'area_tag_id': 328}
# }
# param_dict["method"]="doris/recommend/spu_detail_diary"
# param_detail = {"device_id":"3ab", "size":10,"offset":0,"service_id":5740173,"has_cover":True}
# "query":"双眼皮"
# tail -f filelog.log.20200701 | grep 3ab | grep doris/recommend/spu_detail_diary
param_dict["params"]=json.dumps(param_detail)
......
from itertools import chain, islice, cycle
import datetime
from collections import Counter
# -*- coding:utf-8 -*-
# author:gm
# mail: zhangguodong@igengmei.com
# datetime:2020/4/24 3:32 下午
# software: PyCharm
from functools import reduce
import copy
from gm_types.gaia import DIARY_ORDER_TYPE
from gm_types.doris import ANSWER_SORT_TYPE
from gm_types.doris import ARTICLE_SORT_TYPE
from gm_types.mimas import CONTENT_CLASS
from gm_types.doris import CARD_TYPE
from gm_types.gaia import CITY_LEVEL
from gm_rpcd.all import bind
import traceback
from search.utils.diary import recall_diary
from search.utils.answer import recall_answers
from search.utils.article import recall_articles
from search.utils.service import recommed_service_category_device_id
from gm_rpcd.all import context
from libs.algorithms import drop_dup
from libs.cache import redis_client,redis_client2
from libs.error import logging_exception
import time
from extend.models.gaia import City, CityScale
from extend.models.gold import (
QAQueue,
WikiQueue,
IconQueue,
UserTopicQueue,
DoctorTopicQueue,
DiaryQueue,
ArticleQueue,
AnswerQueue,
DeviceQAQueue,
DeviceIconQueue,
DeviceUserTopicQueue,
DeviceDoctorTopicQueue,
DeviceAnswerQueue,
DeviceArticleQueue,
DeviceDiaryQueue,
QuestionQueue,
DeviceQuestionQueue
)
import logging
import redis
import json
from django.conf import settings
import traceback
from recommend.utils.diary_portrait import fetch_diary_by_user_portrait
from recommend.utils.diary_portrait import fetch_qa_by_user_portrait
from recommend.utils.diary_portrait import fetch_topic_by_user_portrait
class Business(object):
def __init__(self):
pass
MAX_LOAD = 200
logger = logging.getLogger(__name__)
def sort(self):
pass
@bind("doris/recommend/get_diaries")
def get_diaries(tags, city, offset=0, size=10, city_tag_id=None):
# NOTE: city as city id
sort_params = {}
class Strategy(object):
if city_tag_id:
sort_params["user_city_tag_id"] = city_tag_id
def __init__(self):
pass
elif city:
try:
x = City.objects.get(id=city)
sort_params["user_city_tag_id"] = x.tag_id
except City.DoesNotExist:
pass
def run(self, documents):
return documents
filters = {
"is_sink": False,
"has_before_cover": True,
"has_after_cover": True,
"content_level_is_good": True
}
if tags:
filters["closure_tag_ids"] = tags
tail = offset + size
diaries_ids = []
if tail < MAX_LOAD:
diaries = recall_diary(None, 0, 200, filters, DIARY_ORDER_TYPE.RECOMMEND, sort_params, fields=["id", "user.id"])
diaries_items = [(diary['id'], diary['user']['id']) for diary in diaries]
drop_dup_diaries = drop_dup(diaries_items)
drop_dup_size = len(drop_dup_diaries)
if tail <= drop_dup_size:
diaries_ids = [item[0] for item in drop_dup_diaries[offset:tail]]
if len(diaries_ids) == 0: # 如果头200条去重结束 后面的排序不去重
diaries = recall_diary(None, offset, size, filters, DIARY_ORDER_TYPE.RECOMMEND, sort_params, fields=["id"])
diaries_ids = [diary['id'] for diary in diaries]
class ScatterStrategy(Strategy):
return {"diaries_ids": diaries_ids}
@bind("doris/recommend/get_articles")
def get_articles(tags, offset=0, size=10):
filters = {
"content_level": [CONTENT_CLASS.EXCELLENT, CONTENT_CLASS.FINE]
}
if tags:
filters["tag_ids"] = tags
articles = recall_articles(None, offset, size, filters, ARTICLE_SORT_TYPE.RECOMMEND, {})
article_ids = [article['id'] for article in articles]
return {"article_ids": article_ids}
@bind("doris/recommend/get_answers")
def get_answers(tags, offset=0, size=10):
filters = {
"content_level": [CONTENT_CLASS.EXCELLENT, CONTENT_CLASS.FINE]
}
if tags:
filters["tag_ids"] = tags
tail = offset + size
answer_ids = []
if tail < MAX_LOAD:
answers = recall_answers(None, 0, MAX_LOAD, filters, ANSWER_SORT_TYPE.RECOMMEND, {}, fields=["id", "user_id"])
answers = filter(lambda answer: "id" in answer and "user_id" in answer, answers)
answer_items = [(answer["id"], answer["user_id"]) for answer in answers]
drop_dup_answers = drop_dup(answer_items)
if tail <= len(drop_dup_answers):
answer_ids = [item[0] for item in drop_dup_answers[offset:tail]]
if len(answer_ids) == 0:
answers = recall_answers(None, offset, size, filters, ANSWER_SORT_TYPE.RECOMMEND, {})
answer_ids = [answer['id'] for answer in answers]
return {"answer_ids": answer_ids}
@bind('doris/recommend/icon')
def fetch_icon(device_id, size):
try:
card_type = "icon"
try:
que = DeviceIconQueue.objects.get(device_id=device_id)
except DeviceIconQueue.DoesNotExist:
que = IconQueue.objects.last()
if not que:
return {"icon": []}
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
return {card_type: list(map(int, data))}
except:
logging_exception()
return {"icon": []}
@bind('doris/recommend/homepage_polymer')
def fetch_polymer_ids(device_id, size):
try:
card_type = "polymer_ids"
try:
que = DeviceIconQueue.objects.get(device_id=device_id)
except DeviceIconQueue.DoesNotExist:
que = IconQueue.objects.last()
if not que:
return {"polymer_ids": []}
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
return {card_type: list(map(int, data))}
except:
logging_exception()
return {"polymer_ids": []}
@bind('doris/recommend/feed')
def recommend_feed(device_id, card_type, city_id, size):
try:
return RecommendFeed.dispatch(device_id, card_type,
city_id, size)
except:
logging_exception()
return {card_type: [], "cpc_ids":[]}
class RecommendFeed:
@classmethod
def dispatch(cls, device_id, card_type, city_id, size):
data = []
cpc_ids = []
time_begin = time.time()
if card_type == CARD_TYPE.QA:
data = cls.fetch_qa(device_id, card_type, size)
logging.info("duan add test,fetch_qa cost:%f,device_id:%s" % ((time.time() - time_begin), str(device_id)))
elif card_type == CARD_TYPE.ANSWER:
data = cls.fetch_answer(device_id, card_type, size)
data = list(map(int, data))
logging.info("duan add test,fetch_answer cost:%f" % (time.time() - time_begin))
elif card_type == CARD_TYPE.ARTICLE:
data = cls.fetch_article(device_id, card_type, size)
data = list(map(int, data))
elif card_type == CARD_TYPE.QUESTION:
data = cls.fetch_question(device_id, card_type, size)
data = list(map(int, data))
elif card_type == CARD_TYPE.DIARY:
total = cls.fetch_diary(device_id, card_type, city_id, size)
if total:
data = total[0]
cpc_ids = total[1]
logging.info("duan add test,fetch_diary cost:%f,device_id:%s" % ((time.time() - time_begin),str(device_id)))
elif card_type == CARD_TYPE.USERTOPIC:
data = cls.fetch_user_topic(device_id,card_type,size)
logging.info("duan add test,fetch_user_topic cost:%f,device_id:%s" % ((time.time() - time_begin), str(device_id)))
elif card_type == CARD_TYPE.DOCTORTOPIC:
data = cls.fetch_doctor_topic(device_id,card_type,size)
data = list(map(int, data))
elif card_type == CARD_TYPE.ENCYCLOPEDIA:
data = cls.fetch_wiki(device_id,card_type,size)
return {card_type: data, "cpc_ids":cpc_ids}
@staticmethod
def current_date():
return datetime.datetime.now().strftime('%Y-%m-%d')
@staticmethod
def fetch_qa(device_id, card_type, size):
try:
def filter_qa(device_id,cid_list):
try:
gmkv = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gmkv = get_gmkv(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"],
redis_db=gm_kv_host_item["db"],
redis_password=gm_kv_host_item["password"])
if gmkv:
break
key = str(device_id) + "_dislike_qa"
if gmkv.exists(key):
dislike = gmkv.smembers(key)
if len(cid_list) > 0:
if type(cid_list[0]) == int or type(cid_list[0]) == str:
cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike]
else:
cid_list = [i for i in cid_list if i not in dislike]
return cid_list
else:
return cid_list
except:
return cid_list
def read_history(cid_list):
redis_client.sadd(today_qa_key, *cid_list)
redis_client.expire(today_qa_key, 14 * 24 * 60 * 60)
redis_client.sadd(read_qa_key, *cid_list)
if redis_client.exists(old_qa_key):
redis_client.sdiffstore(read_qa_key, read_qa_key, old_qa_key)
redis_client.delete(old_qa_key)
redis_client.expire(read_qa_key, time=13 * 24 * 60 * 60)
def get_gmkv(redis_ip, redis_port, redis_db, redis_password=""):
try:
if len(redis_password) == 0:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2)
else:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password,
socket_timeout=2)
cli_ins.ping()
return cli_ins
except:
return None
def no_filter_qa(size):
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
try:
que = DeviceQAQueue.objects.get(device_id=device_id)
except DeviceQAQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
if len(que) == 0:
return []
else:
cursor = int(cursor) % len(que)
size = min(size, len(que))
# redis_client.set(key, cursor + size, ex=24 * 60 * 60)
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_answer_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return data
search_qa_recommend_list = list()
read_qa_key = "TS:recommend_answer_set:device_id:" + str(device_id)
old_qa_key = "TS:recommend_answer_set:device_id:{}:{}"\
.format(device_id,(datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_qa_key = "TS:recommend_answer_set:device_id:{}:{}"\
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
answer_key = "qa_location:" + str(device_id)
time_begin = time.time()
if device_id != '0':
# if recommed_service_category_device_id(device_id):
read_list = []
if redis_client.exists(read_qa_key):
read_list = [int(x) for x in list(redis_client.smembers(read_qa_key))]
data = fetch_qa_by_user_portrait(device_id, read_list, size)
if data:
read_history(data)
if len(data) >= size:
logging.info("duan add,qa time cost:%f" % (time.time() - time_begin))
return data
else:
size = size - len(data)
search_qa_recommend_list.extend(data)
logger.info("portrait_fetch_qa:supplement1:device_id:{0}:size:{1}".format(device_id, size))
search_qa_recommend_key = "TS:search_recommend_answer_queue:device_id:" + str(device_id)
if redis_client.exists(search_qa_recommend_key):
search_qa_recommend_dict = redis_client.hgetall(search_qa_recommend_key)
queue_list = json.loads(search_qa_recommend_dict[b'answer_queue'])
queue_list = filter_qa(device_id, queue_list)
if len(queue_list) == 0:
redis_client.delete(search_qa_recommend_key)
elif len(queue_list) == 1:
size = size - 1
search_qa_recommend_list = queue_list
redis_client.delete(search_qa_recommend_key)
else:
size = size - 1
search_qa_recommend_list.append(queue_list[0])
redis_client.hset(search_qa_recommend_key,"answer_queue",json.dumps(queue_list[1:]))
if redis_client.exists(answer_key):
if b"tail" in redis_client.hgetall(answer_key):
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
else:
search_qa_recommend_list = no_filter_qa(size)
if len(search_qa_recommend_list) > 0:
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif b"location" in redis_client.hgetall(answer_key):
try:
que = DeviceQAQueue.objects.get(device_id=device_id)
except DeviceQAQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
location = int(redis_client.hgetall(answer_key)[b"location"])
old_qa = list(filter(None, que.queue.split(',')))
after_filter_qa = filter_qa(device_id, old_qa[location:])
if (location >= len(old_qa)-1) or (len(after_filter_qa) == 0):
redis_client.hset(answer_key,"tail","1")
redis_client.expire(answer_key,3*60*60)
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
else:
search_qa_recommend_list = no_filter_qa(size)
if len(search_qa_recommend_list) > 0:
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(after_filter_qa) <= size:
search_qa_recommend_list.extend(after_filter_qa)
redis_client.hset(answer_key, "tail", "1")
redis_client.expire(answer_key, 3 * 60 * 60)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(after_filter_qa) > size:
search_qa_recommend_list.extend(after_filter_qa[:size])
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
if after_filter_qa[size] in old_qa:
redis_client.hset(answer_key, "location", str(old_qa.index(after_filter_qa[size])))
redis_client.expire(answer_key, 3 * 60 * 60)
else:
redis_client.hset(answer_key, "tail", "1")
redis_client.expire(answer_key, 3 * 60 * 60)
return search_qa_recommend_list
else:
try:
que = DeviceQAQueue.objects.get(device_id=device_id)
except DeviceQAQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
old_qa = list(filter(None, que.queue.split(',')))
after_filter_qa = filter_qa(device_id,old_qa)
if len(after_filter_qa) == 0:
redis_client.hset(answer_key, "tail", "1")
redis_client.expire(answer_key, 3 * 60 * 60)
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
else:
search_qa_recommend_list = no_filter_qa(size)
if len(search_qa_recommend_list) > 0:
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(after_filter_qa) <= size:
search_qa_recommend_list.extend(after_filter_qa)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
redis_client.hset(answer_key, "tail", "1")
redis_client.expire(answer_key, 3 * 60 * 60)
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
search_qa_recommend_list.extend(after_filter_qa[:size])
if after_filter_qa[size] in old_qa:
redis_client.hset(answer_key, "location", str(old_qa.index(after_filter_qa[size])))
redis_client.expire(answer_key, 3 * 60 * 60)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
else:
redis_client.hset(answer_key, "tail", "1")
redis_client.expire(answer_key, 3 * 60 * 60)
return search_qa_recommend_list
def __init__(self):
self.name = "打散"
def strategy(self, documents, field_name, win_len=4):
"""
我们根据文档的某个字段将文档进行打乱,我们优先取值比较多的,
之后按照判断是否满足窗口大小
:param documents:
:param field_name:
:param win_len:多大的便宜内不出现重复值的数据
:return:
"""
ret_documents = []
ret_documents_idx = []
last_field_value_pointer = {}
field_dict = self.build_field_dict(documents, field_name)
while len(ret_documents_idx) < len(documents):
sorted_value = self.get_sorted_value(field_dict)
# print(ret_documents_idx, win_len, [documents[idx] for idx in ret_documents_idx])
is_append = self.ask(field_dict, sorted_value, win_len, last_field_value_pointer, ret_documents_idx)
if not is_append:
while win_len > 0:
win_len -= 1 # 如果窗口无法满足,那么采取贪心的策略
# print(ret_documents_idx, win_len, [documents[idx] for idx in ret_documents_idx])
is_append = self.ask(field_dict, sorted_value, win_len, last_field_value_pointer, ret_documents_idx)
if is_append:
break
for idx in ret_documents_idx:
ret_documents.append(documents[idx])
return ret_documents
# def remove_empty(self, field_dict):
# for value in field_dict:
# size = field_dict[value]["len"]
# if size <= 0:
# del field_dict[value]
def ask(self, field_dict, sorted_field, win_len=4, last_field_value_pointer={}, ret_documents_idx=[]):
"""
轮询填充数据
:param field_dict:分组后的items数据
:param sorted_field:根据数量排序后的值
:param win_len:窗口大小
:param last_field_value_pointer:存放着类别当前存储到的offset
:param ret_documents_idx:返回数据的下标
:return:
"""
is_append = False
for value in sorted_field:
# 没有取过的话直接取
if value not in last_field_value_pointer:
is_append = True
# 记录当前类别放置的offset
last_field_value_pointer[value] = len(ret_documents_idx)
current_value = field_dict[value]["items"].pop(0)
ret_documents_idx.append(current_value)
field_dict[value]["len"] -= 1
break
else:
data = no_filter_qa(size)
return data
except:
logging_exception()
return []
@staticmethod
def fetch_user_topic(device_id, card_type, size):
try:
def filter_topic(cid_list):
try:
gmkv = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gmkv = get_gmkv(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"],
redis_db=gm_kv_host_item["db"],
redis_password=gm_kv_host_item["password"])
if gmkv:
break
if gmkv.exists(dislike_key):
dislike = gmkv.smembers(dislike_key)
if len(cid_list) > 0:
if type(cid_list[0]) == int or type(cid_list[0]) == str:
cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike]
else:
cid_list = [i for i in cid_list if i not in dislike]
return cid_list
else:
return cid_list
except:
return cid_list
def read_history(cid_list):
redis_client.sadd(today_key, *cid_list)
redis_client.expire(today_key, 14 * 24 * 60 * 60)
redis_client.sadd(read_key, *cid_list)
if redis_client.exists(old_key):
redis_client.sdiffstore(read_key, read_key, old_key)
redis_client.delete(old_key)
redis_client.expire(read_key, time=13 * 24 * 60 * 60)
def get_gmkv(redis_ip, redis_port, redis_db, redis_password=""):
try:
if len(redis_password) == 0:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2)
else:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password,
socket_timeout=2)
cli_ins.ping()
return cli_ins
except:
return None
def no_filter_get_topic(size):
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type,
date=RecommendFeed.current_date())
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
# 取过的话,我们获取上次放置的offset,然后看窗口大小是否合适
current_idx = last_field_value_pointer[value]
current_value = len(ret_documents_idx)
if current_value - current_idx >= win_len: # 窗口大小进行判断
is_append = True
ret_documents_idx.append(field_dict[value]["items"].pop(0))
last_field_value_pointer[value] = current_value
field_dict[value]["len"] -= 1
break
else:
try:
context.request_logger.app(reset_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return data
def list_distinct(ids):
news_ids = []
for id in ids:
if id not in news_ids:
news_ids.append(id)
return news_ids
continue
return is_append
dislike_key = str(device_id) + "_dislike_tractate"
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
tractate_key = "tractate_location:" + str(device_id)
read_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
old_key = "TS:recommend_tractate_set:device_id:{}:{}" \
.format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_key = "TS:recommend_tractate_set:device_id:{}:{}" \
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
search_list = list()
time_begin = time.time()
if device_id != '0':
# if recommed_service_category_device_id(device_id):
have_read_list = list()
if redis_client.exists(read_key):
have_read_list = [int(i) for i in redis_client.smembers(read_key)]
topic_list = fetch_topic_by_user_portrait(device_id, have_read_list, size)
if topic_list:
read_history(topic_list)
if len(topic_list) >= size:
logging.info("duan add,user_topic time cost:%f" % (time.time() - time_begin))
return topic_list
else:
search_list.extend(topic_list)
size = size - len(topic_list)
logger.info("portrait_fetch_topic:supplement1:device_id:{0}:size:{1}".format(device_id, size))
if redis_client.exists(search_topic_recommend_key):
search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key)
search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue'])
search_topic_recommend_list = filter_topic(search_topic_recommend_list)
if len(search_topic_recommend_list) == 0:
redis_client.delete(search_topic_recommend_key)
elif len(search_topic_recommend_list) <= 2:
search_list.extend(search_topic_recommend_list)
size = size - len(search_list)
redis_client.delete(search_topic_recommend_key)
else:
search_list.extend(search_topic_recommend_list[:2])
size = size - 2
redis_client.hset(search_topic_recommend_key, 'tractate_queue',
json.dumps(search_topic_recommend_list[2:]))
if redis_client.exists(tractate_key):
if b'tail' in redis_client.hgetall(tractate_key):
if len(search_list) == 0:
data = no_filter_get_topic(size)
search_list.extend(data)
search_list = list(map(int, search_list))
read_history(search_list)
search_list = list_distinct(search_list)
return search_list
elif b'location' in redis_client.hgetall(tractate_key):
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
search_list = list_distinct(search_list)
return search_list
old_qa = list(filter(None, que.queue.split(',')))
location = int(redis_client.hgetall(tractate_key)[b'location'])
after_filter_qa = filter_topic(old_qa[location:])
if (location >= len(old_qa) - 1) or (len(after_filter_qa) == 0):
redis_client.hset(tractate_key, "tail", "1")
redis_client.expire(tractate_key, 3 * 60 * 60)
if len(search_list) == 0:
search_list = no_filter_get_topic(size)
search_list = list(map(int, search_list))
read_history(search_list)
search_list = list_distinct(search_list)
return search_list
elif len(after_filter_qa) <= size:
search_list.extend(after_filter_qa)
redis_client.hset(tractate_key, "tail", "1")
redis_client.expire(tractate_key, 3 * 60 * 60)
search_list = list(map(int, search_list))
read_history(search_list)
search_list = list_distinct(search_list)
return search_list
elif len(after_filter_qa) > size:
search_list.extend(after_filter_qa[:size])
search_list = list(map(int, search_list))
read_history(search_list)
search_list = list_distinct(search_list)
if after_filter_qa[size] in old_qa:
redis_client.hset(tractate_key, "location", str(old_qa.index(after_filter_qa[size])))
redis_client.expire(tractate_key, 3 * 60 * 60)
else:
redis_client.hset(tractate_key, "tail", "1")
redis_client.expire(tractate_key, 3 * 60 * 60)
def get_sorted_value(self, field_dict):
"""
按照值的数量进行排序,我们优先取值比较多的
:param field_dict:
:return:
"""
ret = sorted(field_dict.keys(), key=lambda x: field_dict[x]["len"], reverse=True)
ret = filter(lambda x: field_dict[x]["len"] > 0, ret)
return list(ret)
return search_list
def build_field_dict(self, documents, field_name):
"""
将文档根据某个字段的值进行归类,字段可以是类目,机构等等
:param documents:
:param field_name:
:return:
"""
ret = {}
for idx, document in enumerate(documents):
if field_name in document:
value = document[field_name]
if value in ret:
ret[value]["items"].append(idx)
ret[value]["len"] += 1
else:
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
search_list = list_distinct(search_list)
return search_list
old_qa = list(filter(None, que.queue.split(',')))
after_filter_qa = filter_topic(old_qa)
if len(after_filter_qa) == 0:
redis_client.hset(tractate_key, "tail", "1")
redis_client.expire(tractate_key, 3 * 60 * 60)
if len(search_list) == 0:
search_list = no_filter_get_topic(size)
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
search_list = list_distinct(search_list)
return search_list
elif len(after_filter_qa) <= size:
search_list.extend(after_filter_qa)
search_list = list(map(int, search_list))
redis_client.hset(tractate_key, "tail", "1")
redis_client.expire(tractate_key, 3 * 60 * 60)
read_history(search_list)
search_list = list_distinct(search_list)
return search_list
else:
search_list.extend(after_filter_qa[:size])
if after_filter_qa[size] in old_qa:
redis_client.hset(tractate_key, "location", str(old_qa.index(after_filter_qa[size])))
redis_client.expire(tractate_key, 3 * 60 * 60)
search_list = list(map(int, search_list))
search_list = list_distinct(search_list)
read_history(search_list)
else:
redis_client.hset(tractate_key, "tail", "1")
redis_client.expire(tractate_key, 3 * 60 * 60)
return search_list
else:
data = no_filter_get_topic(size)
return data
except:
logging_exception()
return []
@classmethod
def fetch_diary_queue_data(cls, city_id, device_id=None):
local = list()
nearby = list()
nation = list()
megacity = list()
use_city_id = city_id
try:
qs = DiaryQueue.objects.filter(city_id__in=[city_id, 'world'])
# Assume that world queue must exist.
if len(qs) == 1:
obj = qs[0]
ret[value] = {
"items": [idx],
"len": 1
}
else:
obj = qs[0] if qs[0].city_id == city_id else qs[1]
if obj.native_queue:
local = list(filter(None, obj.native_queue.split(',')))
if obj.nearby_queue:
nearby = list(filter(None, obj.nearby_queue.split(',')))
if obj.nation_queue:
nation = list(filter(None, obj.nation_queue.split(',')))
if obj.megacity_queue:
megacity = list(filter(None, obj.megacity_queue.split(',')))
use_city_id = obj.city_id if obj else use_city_id
return (local, nearby, nation, megacity, use_city_id)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
return [],[],[],[],use_city_id
# gm_kv_ins = None
# for gm_kv_host_item in settings.GM_KV_HOSTS:
# gm_kv_ins = cls.get_gm_kv_ins(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"], redis_db=gm_kv_host_item["db"],redis_password=gm_kv_host_item["password"])
# if gm_kv_ins:
# break
#
# specify_city_id_key = "diary_queue:city_id:" + use_city_id
# world_city_id_key = "diary_queue:city_id:world"
# if device_id is not None:
# specify_city_id_key = "device_diary_queue:device_id:" + device_id + ":city_id:" + use_city_id
#
# city_val_dict = gm_kv_ins.hgetall(specify_city_id_key)
# if len(city_val_dict) == 0:
# city_val_dict = gm_kv_ins.hgetall(world_city_id_key)
# use_city_id = "world"
#
# if b"native_queue" in city_val_dict and city_val_dict[b"native_queue"]:
# local = list(filter(None, city_val_dict[b"native_queue"].split(b",")))
# if b"nearby_queue" in city_val_dict and city_val_dict[b"nearby_queue"]:
# nearby = list(filter(None, city_val_dict[b"nearby_queue"].split(b",")))
# if b"nation_queue" in city_val_dict and city_val_dict[b"nation_queue"]:
# nation = list(filter(None, city_val_dict[b"nation_queue"].split(b",")))
# if b"megacity_queue" in city_val_dict and city_val_dict[b"megacity_queue"]:
# megacity = list(filter(None, city_val_dict[b"megacity_queue"].split(b",")))
#
# return (local, nearby, nation, megacity, use_city_id)
#
# except:
@classmethod
def fetch_diary_queue_from_redis(cls,city_id):
local = list()
nearby = list()
nation = list()
megacity = list()
use_city_id = city_id
try:
specify_city_id_key = "diary_queue:city_id:" + use_city_id
world_city_id_key = "diary_queue:city_id:world"
city_val_dict = redis_client2.hgetall(specify_city_id_key)
if len(city_val_dict) == 0:
city_val_dict = redis_client2.hgetall(world_city_id_key)
use_city_id = "world"
if b"native_queue" in city_val_dict and city_val_dict[b"native_queue"]:
local = list(filter(None, city_val_dict[b"native_queue"].split(b",")))
if b"nearby_queue" in city_val_dict and city_val_dict[b"nearby_queue"]:
nearby = list(filter(None, city_val_dict[b"nearby_queue"].split(b",")))
if b"nation_queue" in city_val_dict and city_val_dict[b"nation_queue"]:
nation = list(filter(None, city_val_dict[b"nation_queue"].split(b",")))
if b"megacity_queue" in city_val_dict and city_val_dict[b"megacity_queue"]:
megacity = list(filter(None, city_val_dict[b"megacity_queue"].split(b",")))
return (local, nearby, nation, megacity, use_city_id)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
return [],[],[],[],use_city_id
@classmethod
def fetch_device_diary_queue_data(cls, city_id, device_id):
local = list()
nearby = list()
nation = list()
megacity = list()
use_city_id = city_id
try:
obj = DeviceDiaryQueue.objects.filter(device_id=device_id, city_id=city_id).first()
if obj and obj.native_queue:
local = list(filter(None, obj.native_queue.split(',')))
if obj and obj.nearby_queue:
nearby = list(filter(None, obj.nearby_queue.split(',')))
if obj and obj.nation_queue:
nation = list(filter(None, obj.nation_queue.split(',')))
if obj and obj.megacity_queue:
megacity = list(filter(None, obj.megacity_queue.split(',')))
use_city_id = obj.city_id if obj else use_city_id
return (local, nearby, nation, megacity, use_city_id)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
return [],[],[],[],use_city_id
# gm_kv_ins = None
# for gm_kv_host_item in settings.GM_KV_HOSTS:
# gm_kv_ins = cls.get_gm_kv_ins(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"], redis_db=gm_kv_host_item["db"],redis_password=gm_kv_host_item["password"])
# if gm_kv_ins:
# break
#
# specify_city_id_key = "device_diary_queue:device_id:" + device_id + ":city_id:" + use_city_id
# city_val_dict = gm_kv_ins.hgetall(specify_city_id_key)
#
# # 判断是否命中灰度用户
# if recommed_service_category_device_id(device_id):
# if b"new_native_queue" in city_val_dict and city_val_dict[b"new_native_queue"]:
# local = list(filter(None, city_val_dict[b"new_native_queue"].split(b",")))
# else:
# if b"native_queue" in city_val_dict and city_val_dict[b"native_queue"]:
# local = list(filter(None, city_val_dict[b"native_queue"].split(b",")))
#
# if b"nearby_queue" in city_val_dict and city_val_dict[b"nearby_queue"]:
# nearby = list(filter(None, city_val_dict[b"nearby_queue"].split(b",")))
# if b"nation_queue" in city_val_dict and city_val_dict[b"nation_queue"]:
# nation = list(filter(None, city_val_dict[b"nation_queue"].split(b",")))
# if b"megacity_queue" in city_val_dict and city_val_dict[b"megacity_queue"]:
# megacity = list(filter(None, city_val_dict[b"megacity_queue"].split(b",")))
#
# return (local, nearby, nation, megacity, use_city_id)
# except:
@classmethod
def fetch_diary(cls, device_id, card_type, city_id, size):
try:
def read_history(cid_list):
read_key = "TS:recommend_diary_set:device_id:" + str(device_id)
old_key = "TS:recommend_diary_set:device_id:{}:{}" \
.format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_key = "TS:recommend_diary_set:device_id:{}:{}" \
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
redis_client.sadd(today_key, *cid_list)
redis_client.expire(today_key, 14 * 24 * 60 * 60)
redis_client.sadd(read_key, *cid_list)
if redis_client.exists(old_key):
redis_client.sdiffstore(read_key, read_key, old_key)
redis_client.delete(old_key)
redis_client.expire(read_key, time=13 * 24 * 60 * 60)
def dislike_cid_filter(device_id, cid_list):
try:
gmkv = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gmkv = cls.get_gm_kv_ins(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"],
redis_db=gm_kv_host_item["db"],
redis_password=gm_kv_host_item["password"])
if gmkv:
break
key = str(device_id) + "_dislike_diary"
if gmkv.exists(key):
dislike = gmkv.smembers(key)
if len(cid_list) > 0:
if type(cid_list[0]) == int or type(cid_list[0]) == str:
cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike]
else:
cid_list = [i for i in cid_list if i not in dislike]
return cid_list
except:
return cid_list
def get_cids(location, cursor_list, n, cid_list):
new_list = []
if n == 0:
cursor_list.append(6666)
else:
if location >= len(cid_list) -1:
cursor_list.append(6666)
else:
local_filter = dislike_cid_filter(device_id, cid_list[location:])
if len(local_filter) == 0:
cursor_list.append(6666)
elif len(local_filter) <= n:
cursor_list.append(6666)
new_list = local_filter
else:
new_list = local_filter[:n]
cursor = cid_list.index(local_filter[n])
cursor_list.append(cursor)
return new_list,cursor_list
def get_data(local, nearby, nation, megacity, cx, cy, cm, cz, x, y, z, m, size):
nx = int(round(x * 1.0 / (x + y + z + m) * size))
ny = int(round(y * 1.0 / (x + y + z + m) * size))
nz = int(round(z * 1.0 / (x + y + z + m) * size))
nm = int(round(m * 1.0 / (x + y + z + m) * size))
nxyz = [nx, ny, nm, nz]
xyz = [x, y, m, z]
counter = Counter([nx, ny, nm, nz])
if counter[0] == 2:
nxyz[nxyz.index(0)] += size - sum(nxyz)
if "empty_value" in ret:
ret["empty_value"]["items"].append(idx)
ret["empty_value"]["len"] += 1
else:
nxyz[xyz.index(max(xyz))] += size - sum(nxyz)
nx, ny, nm, nz = nxyz
cursor_list = []
slocal,cursor_list = get_cids(cx,cursor_list,nx, local)
ny += (nx - len(slocal))
snearby,cursor_list = get_cids(cy,cursor_list,ny, nearby)
nm += (ny - len(snearby))
smegacity, cursor_list = get_cids(cm, cursor_list, nm, megacity)
nz += (nm - len(smegacity))
snation, cursor_list = get_cids(cz, cursor_list, nz, nation)
if cursor_list[0]== 6666 and cursor_list[1]== 6666 and cursor_list[2]== 6666 and cursor_list[3]== 6666:
redis_client.hset(diary_key,"tail","1")
redis_client.expire(diary_key, 6 * 60 * 60)
else:
redis_client.hset(diary_key, "location", json.dumps(cursor_list))
redis_client.expire(diary_key, 6 * 60 * 60)
total = list()
total.extend(slocal)
total.extend(snearby)
total.extend(smegacity)
total.extend(snation)
return total
# return chain(slocal, snearby, smegacity, snation)
time_begin = time.time()
if device_id != '0':
cpc_list = []
portrait_list = list()
click_diary_size = 1
search_diary_size = 4
user_portrait_diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % \
(device_id, datetime.datetime.now().strftime('%Y-%m-%d'))
if redis_client.exists(user_portrait_diary_key):
user_portrait_diary_dict = redis_client.hgetall(user_portrait_diary_key)
if b'cpc_queue' in user_portrait_diary_dict.keys():
cpc_queue = json.loads(user_portrait_diary_dict[b'cpc_queue'])
if len(cpc_queue) > size:
cpc_list.extend(cpc_queue[:size])
portrait_list.extend(cpc_queue[:size])
redis_client.hset(user_portrait_diary_key, 'cpc_queue',
json.dumps(cpc_queue[size:]))
redis_client.expire(user_portrait_diary_key, 24 * 60 * 60)
portrait_list = list(map(int, portrait_list))
read_history(portrait_list)
logging.info("duan add,diary time cost:%f" % (time.time()-time_begin))
return portrait_list,cpc_list
elif len(cpc_queue) == size:
cpc_list.extend(cpc_queue)
portrait_list.extend(cpc_queue)
redis_client.hdel(user_portrait_diary_key, 'cpc_queue')
redis_client.expire(user_portrait_diary_key, 24 * 60 * 60)
portrait_list = list(map(int, portrait_list))
read_history(portrait_list)
logging.info("duan add,diary time cost:%f" % (time.time() - time_begin))
return portrait_list,cpc_list
else:
cpc_list.extend(cpc_queue)
portrait_list.extend(cpc_queue)
redis_client.hdel(user_portrait_diary_key, 'cpc_queue')
redis_client.expire(user_portrait_diary_key, 24 * 60 * 60)
size = size - len(cpc_list)
# 如果不是灰度,才取diary_queue
# if not recommed_service_category_device_id(device_id):
# if b'diary_queue' in user_portrait_diary_dict.keys():
# user_portrait_diary_list = json.loads(user_portrait_diary_dict[b'diary_queue'])
# filter_user_portrait_diary_list = dislike_cid_filter(device_id,
# user_portrait_diary_list)
# if len(filter_user_portrait_diary_list) > size:
# portrait_list.extend(filter_user_portrait_diary_list[:size])
# redis_client.hset(user_portrait_diary_key, 'diary_queue',
# json.dumps(filter_user_portrait_diary_list[size:]))
# redis_client.expire(user_portrait_diary_key, 24 * 60 * 60)
# portrait_list = list(map(int, portrait_list))
# read_history(portrait_list)
# logging.info("duan add,diary time cost:%f" % (time.time() - time_begin))
# return portrait_list,cpc_list
# else:
# size = size - len(filter_user_portrait_diary_list)
# portrait_list.extend(filter_user_portrait_diary_list)
# redis_client.delete(user_portrait_diary_key)
# 用户画像召回实验
# if recommed_service_category_device_id(device_id):
if size > 0:
# 城市tag id
x = City.objects.filter(id=city_id)
city_tag_id = x[0].tag_id if x else -1
# 已读
read_key = "TS:recommend_diary_set:device_id:" + str(device_id)
have_read_diary_list = list()
if redis_client.exists(read_key):
p = redis_client.smembers(read_key)
have_read_diary_list = list(map(int, p))
have_read_diary_list.extend(portrait_list)
# 召回
diary_list = fetch_diary_by_user_portrait(device_id, city_tag_id, have_read_diary_list, size)
size = size - len(diary_list)
portrait_list.extend(diary_list)
if portrait_list:
read_history(portrait_list)
logger.info(
"portrait_fetch_diary:device_id:{0}:portrait_list:{1}:cpc_list:{2}".format(device_id,
portrait_list,
cpc_list))
if size <= 0:
return portrait_list, cpc_list
logger.info("portrait_fetch_diary:supplement1:device_id:{0}:size:{1}".format(device_id, size))
search_diary_recommend_key = "TS:search_recommend_diary_queue:device_id:" + str(device_id)
search_list = list()
if redis_client.exists(search_diary_recommend_key) and size > 3:
search_diary_recommend_dict = redis_client.hgetall(search_diary_recommend_key)
search_diary_recommend_list = json.loads(search_diary_recommend_dict[b'diary_queue'])
search_diary_recommend_list = dislike_cid_filter(device_id, search_diary_recommend_list)
if len(search_diary_recommend_list) == 0:
redis_client.delete(search_diary_recommend_key)
elif len(search_diary_recommend_list) <= search_diary_size:
search_list.extend(search_diary_recommend_list)
size = size - len(search_diary_recommend_list)
redis_client.delete(search_diary_recommend_key)
else:
search_list.extend(search_diary_recommend_list[:search_diary_size])
size = size - search_diary_size
redis_client.hset(search_diary_recommend_key, 'diary_queue',
json.dumps(search_diary_recommend_list[search_diary_size:]))
redis_client.expire(search_diary_recommend_key, 24 * 60 * 60)
if size <= 0:
portrait_list.extend(search_list)
portrait_list = list(map(int, portrait_list))
read_history(portrait_list)
return portrait_list,cpc_list
diary_recommend_key = "TS:recommend_diary_queue:device_id:" + str(device_id)
ts_recommend_list = list()
if redis_client.exists(diary_recommend_key) and size > 0:
diary_recommend_dict = redis_client.hgetall(diary_recommend_key)
diary_recommend_list = json.loads(diary_recommend_dict[b'diary_queue'])
diary_recommend_list = dislike_cid_filter(device_id, diary_recommend_list)
if len(diary_recommend_list) == 0:
redis_client.delete(diary_recommend_key)
elif len(diary_recommend_list) <= click_diary_size:
ts_recommend_list = diary_recommend_list
redis_client.delete(diary_recommend_key)
size = size - len(ts_recommend_list)
else:
size = size - click_diary_size
ts_recommend_list = diary_recommend_list[:click_diary_size]
diary_recommend_list_json = json.dumps(diary_recommend_list[click_diary_size:])
redis_client.hset(diary_recommend_key, 'diary_queue', diary_recommend_list_json)
redis_client.expire(diary_recommend_key, 24 * 60 * 60)
if size <= 0:
portrait_list.extend(search_list)
portrait_list.extend(ts_recommend_list)
portrait_list = list(map(int, portrait_list))
read_history(portrait_list)
return portrait_list,cpc_list
if size > 0:
diary_key = "diary_device_city:" + str(device_id) + str(city_id)
portrait_list.extend(search_list)
portrait_list.extend(ts_recommend_list)
cx = 0
cy = 0
cz = 0
cm = 0
if redis_client.exists(diary_key):
if b"tail" in redis_client.hgetall(diary_key):
if len(portrait_list) == 0:
portrait_list = cls.no_filter_get_diary(city_id, device_id,size)
if len(portrait_list) > 0:
read_history(portrait_list)
return portrait_list,cpc_list
elif b"location" in redis_client.hgetall(diary_key):
location_list = json.loads(redis_client.hgetall(diary_key)[b"location"])
cx = location_list[0]
cy = location_list[1]
cm = location_list[2]
cz = location_list[3]
(local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_from_redis(city_id)
ret["empty_value"] = {
"items": [idx],
"len": 1
}
return ret
x, y, m, z = cls.get_city_scale(city_id)
data = get_data(
local, nearby, nation, megacity,
cx, cy, cm, cz,
x, y, z, m, size)
class HierarchyStrategy(Strategy):
def __init__(self):
pass
portrait_list.extend(data)
# if len(portrait_list) == 0:
# (local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
# portrait_list = cls.get_queue(local, nearby, nation, megacity,
# device_id, city_id, size, x, y, z, m)
portrait_list = list(map(int, portrait_list))
if len(portrait_list) != 0:
read_history(portrait_list)
return portrait_list,cpc_list
else:
data = cls.no_filter_get_diary(city_id, device_id,size)
return data,[]
except:
logging.error("catch exception,err_log:%s" % traceback.format_exc())
logging_exception()
return [],[]
@classmethod
def no_filter_get_diary(cls,city_id, device_id,size):
# try:
# (local, nearby, nation, megacity, city_id) = cls.fetch_device_diary_queue_data(city_id, device_id)
# if len(local) == 0 and len(nearby) == 0 and len(nation) == 0 and len(megacity) == 0:
# (local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
# except:
# logging.error("catch exception,err_log:%s" % traceback.format_exc())
# logging_exception()
# (local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
(local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_from_redis(city_id)
key = '{device_id}-{city_id}-{date}'.format(device_id=device_id,
city_id=city_id, date=RecommendFeed.current_date())
# strategy rule: when user refresh over 30 loadings, reset native nearby nation queue cursor.
counter_key = key + '-counter_v1'
counter = redis_client.incr(counter_key)
if counter == 1:
redis_client.expire(counter_key, 24 * 60 * 60)
cursor_key = key + '-cursor_v1'
cursor = redis_client.get(cursor_key) or b'0-0-0-0'
# if counter > 30:
# cursor = b'0-0-0-0'
# redis_client.delete(counter_key)
cx, cy, cm, cz = map(int, cursor.split(b'-'))
x, y, m, z = cls.get_city_scale(city_id)
data, ncx, ncy, ncm, ncz = cls.get_scale_data(
local, nearby, nation, megacity,
cx, cy, cm, cz,
x, y, z, m, size
)
if ncx == cx and ncy == cy: # native queue and nearby queue
logger.info("diary queue reach end,cx:%d,cy:%d,cm:%d,cz:%d", cx, cy, cm, cz)
# redis_client.delete(counter_key)
# data, ncx, ncy, ncm, ncz = cls.get_scale_data(
# local, nearby, nation, megacity,
# 0, 0, 0, 0,
# x, y, z, m, size
# )
ncx = ncy = ncm = ncz = 0
val = '-'.join(map(str, [ncx, ncy, ncm, ncz]))
redis_client.set(cursor_key, val, ex=24 * 60 * 60)
data = list(map(int, data))
return data
@classmethod
def get_queue(cls,local, nearby, nation, megacity, device_id,city_id,size,x,y,z,m):
key = '{device_id}-{city_id}-{date}'.format(device_id=device_id,
city_id=city_id, date=RecommendFeed.current_date())
counter_key = key + '-counter_v1'
counter = redis_client.incr(counter_key)
if counter == 1:
redis_client.expire(counter_key, 24 * 60 * 60)
cursor_key = key + '-cursor_v1'
cursor = redis_client.get(cursor_key) or b'0-0-0-0'
cx, cy, cm, cz = map(int, cursor.split(b'-'))
def get_scale(local, nearby, nation, megacity, cx, cy, cm, cz, x, y, z, m, size):
nx = int(round(x * 1.0 / (x + y + z + m) * size))
ny = int(round(y * 1.0 / (x + y + z + m) * size))
nz = int(round(z * 1.0 / (x + y + z + m) * size))
nm = int(round(m * 1.0 / (x + y + z + m) * size))
nxyz = [nx, ny, nm, nz]
xyz = [x, y, m, z]
counter = Counter([nx, ny, nm, nz])
if counter[0] == 2:
nxyz[nxyz.index(0)] += size - sum(nxyz)
else:
nxyz[xyz.index(max(xyz))] += size - sum(nxyz)
nx, ny, nm, nz = nxyz
slocal = local[cx:cx + nx]
cx = min(cx + nx, len(local))
ny += (nx - len(slocal))
snearby = nearby[cy:cy + ny]
cy = min(cy + ny, len(nearby))
nm += (ny - len(snearby))
smegacity = megacity[cm: cm + nm]
cm = min(cm + nm, len(megacity))
nz += (nm - len(smegacity))
snation = nation[cz:cz + nz]
cz = min(cz + nz, len(nation))
total = list()
total.extend(slocal)
total.extend(snearby)
total.extend(smegacity)
total.extend(snation)
return total, cx, cy, cm, cz
# return chain(slocal, snearby, smegacity, snation), cx, cy, cm, cz
data, ncx, ncy, ncm, ncz = get_scale(
local, nearby, nation, megacity,
cx, cy, cm, cz,
x, y, z, m, size)
if ncx == cx and ncy == cy: # native queue and nearby queue
logger.info("diary queue reach end,cx:%d,cy:%d,cm:%d,cz:%d", cx, cy, cm, cz)
ncx = ncy = ncm = ncz = 0
val = '-'.join(map(str, [ncx, ncy, ncm, ncz]))
redis_client.set(cursor_key, val, ex=24 * 60 * 60)
return list(map(int, data))
@staticmethod
def get_city_scale(city_id):
try:
c = CityScale.objects.get(city_id=city_id)
x, y, z, m = c.native, c.nearby, c.nation, c.megacity
except CityScale.DoesNotExist:
try:
c = City.objects.get(id=city_id)
if c.level in (CITY_LEVEL.SUPER, CITY_LEVEL.ONE):
x, y, m, z = 4, 3, 0, 3
elif c.level == CITY_LEVEL.TWO:
x, y, m, z = 3, 3, 0, 3
elif c.level == CITY_LEVEL.THREE:
x, y, m, z = 1, 4, 0, 5
else:
x, y, m, z = 0, 0, 0, 10
except City.DoesNotExist:
x, y, m, z = 0, 0, 0, 10
return x, y, m, z
@staticmethod
def get_scale_data(local, nearby, nation, megacity, cx, cy, cm, cz, x, y, z, m, size):
def strategy(self, documents, fields):
"""
:param local: local diary queue
:param nearby: nearby diary queue
:param nation: nation diary queue
:param megacity: megacity diary queue
:param cx: seen local diary offset
:param cy: seen nearby diary offset
:param cz: seen nation diary offset
:param cm: seen megacity diary offset
:param x: local diary scale factor
:param y: nearby diary scale factor
:param z: nation diary scale factor
:param m: megacity diary scale factor
:param size: nubmer of diary
:param documents:
:param fields:{"is_common_city":[1,0],"score":[1500,1000,800]}
:return:
"""
# 本地 临近 特大城市 全国 四个层级 都按照的是四舍五入取得方式
# 针对出现的问题,本次相应的优化是:
# 1、如果出现两个层级为零,且有剩余坑位时,则按照本地 临近 全国的优先级,先给优先级高且为零的层级一个坑位。
# 2、如果所有层级都非零,且有剩余坑位时,则优先给权重占比大的层级一个坑位。
# 3、如果只有一个层级为零,且有剩余坑位时,则优先填充权重占比大的层级一个坑位。
nx = int(round(x * 1.0 / (x + y + z + m) * size))
ny = int(round(y * 1.0 / (x + y + z + m) * size))
nz = int(round(z * 1.0 / (x + y + z + m) * size))
nm = int(round(m * 1.0 / (x + y + z + m) * size))
nxyz = [nx, ny, nm, nz]
xyz = [x, y, m, z]
counter = Counter([nx, ny, nm, nz])
if counter[0] == 2:
nxyz[nxyz.index(0)] += size - sum(nxyz)
else:
nxyz[xyz.index(max(xyz))] += size - sum(nxyz)
nx, ny, nm, nz = nxyz
slocal = local[cx:cx + nx]
cx = min(cx + nx, len(local))
ny += (nx - len(slocal))
snearby = nearby[cy:cy + ny]
cy = min(cy + ny, len(nearby))
nm += (ny - len(snearby))
smegacity = megacity[cm: cm + nm]
cm = min(cm + nm, len(megacity))
nz += (nm - len(smegacity))
snation = nation[cz:cz + nz]
cz = min(cz + nz, len(nation))
total = list()
total.extend(slocal)
total.extend(snearby)
total.extend(smegacity)
total.extend(snation)
return total, cx, cy, cm, cz
result = [documents]
for field_name in fields:
tmp = []
for item in result:
for value in fields[field_name]:
tmp.append(list(filter( lambda x: x[field_name] == value,item)))
result = tmp
return result
if __name__ == "__main__":
# documents = [{"a": 0}, {"a": 1}, {"a": 1}, {"a": 1}, {"a": 2}, {"a": 0}, {"a": 0}, {"a": 3}, {"a": 3}]
s = ScatterStrategy()
# print(s.strategy(documents, "a"))
documents = [{"is_common_city": 1, "score": 1500, "merchant_id": 1111, "is_promote": True, "id": 1},
{"is_common_city": 1, "score": 1500, "merchant_id": 1111, "is_promote": True, "id": 11},
{"is_common_city": 1, "score": 1500, "merchant_id": 1111, "is_promote": True, "id": 12},
{"is_common_city": 1, "score": 1500, "merchant_id": 1112, "is_promote": True, "id": 13},
{"is_common_city": 1, "score": 1500, "merchant_id": 1111, "is_promote": True, "id": 14},
{"is_common_city": 1, "score": 1500, "merchant_id": 1112, "is_promote": True, "id": 15},
{"is_common_city": 1, "score": 1500, "merchant_id": 1111, "is_promote": True, "id": 1},
{"is_common_city": 1, "score": 1500, "merchant_id": 1112, "is_promote": False, "id": 2},
{"is_common_city": 1, "score": 1500, "merchant_id": 1111, "is_promote": True, "id": 3},
{"is_common_city": 1, "score": 1000, "merchant_id": 1113, "is_promote": False, "id": 4},
{"is_common_city": 1, "score": 1000, "merchant_id": 1111, "is_promote": True, "id": 5},
{"is_common_city": 0, "score": 1500, "merchant_id": 1114, "is_promote": True, "id": 6},
{"is_common_city": 0, "score": 1500, "merchant_id": 1114, "is_promote": False, "id": 7},
{"is_common_city": 0, "score": 1000, "merchant_id": 1113, "is_promote": True, "id": 8},
{"is_common_city": 0, "score": 1000, "merchant_id": 1113, "is_promote": False, "id": 9},
{"is_common_city": 0, "score": 1500, "merchant_id": 1112, "is_promote": True, "id": 10}]
model = HierarchyStrategy()
result = model.strategy(documents,
fields={"is_common_city": [1, 0], "score": [1500, 1000, 0], "is_promote": [True, False]})
# print(result)
# print(len(list(filter(lambda x:x!=[],result))))
# print(s.strategy(documents, "merchant_id"))
for item in list(filter(lambda x:x!=[],result)):
print(s.strategy(item,"merchant_id"))
@staticmethod
def fetch_user_topic(device_id, card_type, size):
try:
def filter_topic(cid_list):
try:
if gmkv.exists(dislike_key):
dislike = gmkv.smembers(dislike_key)
if len(cid_list) > 0:
if type(cid_list[0]) == int or type(cid_list[0]) == str:
cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike]
else:
cid_list = [i for i in cid_list if i not in dislike]
return cid_list
else:
return cid_list
except:
return cid_list
def write_after_filter_tractate(cid_list):
try:
if gmkv.exists(after_filter_key):
gmkv.set(after_filter_key, json.dumps(cid_list))
else:
gmkv.set(after_filter_key, json.dumps(cid_list), ex=6 * 60 * 60)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
def get_filter_tractate():
try:
return json.loads(gmkv.get(after_filter_key))
except:
return []
def read_history(cid_list):
if redis_client.exists(today_key):
redis_client.sadd(today_key, *cid_list)
else:
redis_client.sadd(today_key, *cid_list)
redis_client.expire(today_key, 15 * 24 * 60 * 60)
if redis_client.exists(read_key) and redis_client.exists(old_key):
redis_client.sdiffstore(read_key, read_key, old_key)
redis_client.delete(old_key)
redis_client.expire(read_key, time=13 * 24 * 60 * 60)
redis_client.sadd(read_key, *cid_list)
def get_gmkv(redis_ip, redis_port, redis_db, redis_password=""):
try:
if len(redis_password) == 0:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2)
else:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password,
socket_timeout=2)
cli_ins.ping()
return cli_ins
except:
return None
dislike_key = str(device_id) + "_dislike_tractate"
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
after_filter_key = "device_tractate_after_filter:device_id:" + str(device_id)
tractate_key = "tractate_is_tail" + str(device_id)
read_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
old_key = "TS:recommend_tractate_set:device_id:{}:{}" \
.format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_key = "TS:recommend_tractate_set:device_id:{}:{}" \
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
search_list = list()
gmkv = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gmkv = get_gmkv(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"],
redis_db=gm_kv_host_item["db"],
redis_password=gm_kv_host_item["password"])
if gmkv:
break
if device_id != '0':
if redis_client.exists(search_topic_recommend_key):
search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key)
search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue'])
search_topic_recommend_list = filter_topic(search_topic_recommend_list)
if len(search_topic_recommend_list) == 0:
redis_client.delete(search_topic_recommend_key)
elif len(search_topic_recommend_list) <= 2:
search_list = search_topic_recommend_list
size = size - len(search_list)
redis_client.delete(search_topic_recommend_key)
else:
search_list = search_topic_recommend_list[:2]
size = size - 2
redis_client.hset(search_topic_recommend_key, 'tractate_queue',
json.dumps(search_topic_recommend_list[2:]))
if gmkv.exists(tractate_key):
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif gmkv.exists(after_filter_key):
que = get_filter_tractate()
que = filter_topic(que)
if len(que) == 0:
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif len(que) <= size:
search_list.extend(que)
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
else:
search_list.extend(que[:size])
write_after_filter_tractate(que[size:])
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
else:
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
qa = list(filter(None, que.queue.split(',')))
qa = filter_topic(qa)
if len(qa) == 0:
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif len(qa) <= size:
search_list.extend(qa)
search_list = list(map(int, search_list))
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
read_history(search_list)
return search_list
else:
search_list.extend(qa[:size])
search_list = list(map(int, search_list))
write_after_filter_tractate(qa[size:])
read_history(search_list)
return search_list
else:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type,
date=RecommendFeed.current_date())
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return data
except:
logging_exception()
return []
# 帖子以前的方法
def fetch_user_topic(device_id, card_type, size):
try:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type,
date=RecommendFeed.current_date())
if (device_id != '0') and size >= 2:
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
search_topic_recommend_list = list()
search_cursor_ts = 0
if redis_client.exists(search_topic_recommend_key):
search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key)
if b'cursor' in search_topic_recommend_dict:
search_cursor_ts = json.loads(search_topic_recommend_dict[b'cursor'])
if search_cursor_ts < 30:
search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue'])
if search_cursor_ts < len(search_topic_recommend_list):
size = size - 2
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
if device_id != '0' and size >= 2:
if len(search_topic_recommend_list) > 0 and search_cursor_ts < len(search_topic_recommend_list):
queue = search_topic_recommend_list[search_cursor_ts:search_cursor_ts + 2]
queue.extend(data)
data = queue
new_search_cursor = search_cursor_ts + 2
redis_client.hset(search_topic_recommend_key, 'cursor', new_search_cursor)
redis_client.expire(search_topic_recommend_key, 30 * 24 * 60 * 60)
read_topic_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
if len(data) > 0:
redis_client.sadd(read_topic_key, *data)
return data
except:
logging_exception()
return []
# 帖子老方法
def fetch_user_topic(device_id, card_type, size):
try:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type,
date=RecommendFeed.current_date())
if (device_id != '0') and size >= 2:
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
search_topic_recommend_list = list()
search_cursor_ts = 0
if redis_client.exists(search_topic_recommend_key):
search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key)
if b'cursor' in search_topic_recommend_dict:
search_cursor_ts = json.loads(search_topic_recommend_dict[b'cursor'])
if search_cursor_ts < 30:
search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue'])
if search_cursor_ts < len(search_topic_recommend_list):
size = size - 2
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
if device_id != '0' and size >= 2:
if len(search_topic_recommend_list) > 0 and search_cursor_ts < len(search_topic_recommend_list):
queue = search_topic_recommend_list[search_cursor_ts:search_cursor_ts + 2]
queue.extend(data)
data = queue
new_search_cursor = search_cursor_ts + 2
redis_client.hset(search_topic_recommend_key, 'cursor', new_search_cursor)
redis_client.expire(search_topic_recommend_key, 30 * 24 * 60 * 60)
read_topic_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
if len(data) > 0:
redis_client.sadd(read_topic_key, *data)
return data
except:
logging_exception()
return []
#9.6线上qa
def fetch_qa(device_id, card_type, size):
try:
def get_after_filter_qa():
try:
return json.loads(gmkv.get(after_filter_key))
except:
return []
def write_after_filter_qa(cid_list):
try:
if gmkv.exists(after_filter_key):
gmkv.set(after_filter_key, json.dumps(cid_list))
else:
gmkv.set(after_filter_key, json.dumps(cid_list), ex=6 * 60 * 60)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
def filter_qa(device_id, cid_list):
try:
key = str(device_id) + "_dislike_qa"
if gmkv.exists(key):
dislike = gmkv.smembers(key)
if len(cid_list) > 0:
if type(cid_list[0]) == int or type(cid_list[0]) == str:
cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike]
else:
cid_list = [i for i in cid_list if i not in dislike]
return cid_list
else:
return cid_list
except:
return cid_list
def read_history(cid_list):
if redis_client.exists(today_qa_key):
redis_client.sadd(today_qa_key, *cid_list)
else:
redis_client.sadd(today_qa_key, *cid_list)
redis_client.expire(today_qa_key, 15 * 24 * 60 * 60)
if redis_client.exists(read_qa_key) and redis_client.exists(old_qa_key):
redis_client.sdiffstore(read_qa_key, read_qa_key, old_qa_key)
redis_client.delete(old_qa_key)
redis_client.expire(read_qa_key, time=13 * 24 * 60 * 60)
redis_client.sadd(read_qa_key, *cid_list)
def get_gmkv(redis_ip, redis_port, redis_db, redis_password=""):
try:
if len(redis_password) == 0:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2)
else:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password,
socket_timeout=2)
cli_ins.ping()
return cli_ins
except:
return None
search_qa_recommend_list = list()
read_qa_key = "TS:recommend_answer_set:device_id:" + str(device_id)
old_qa_key = "TS:recommend_answer_set:device_id:{}:{}" \
.format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_qa_key = "TS:recommend_answer_set:device_id:{}:{}" \
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
answer_queue_key = "qa_is_tail:" + str(device_id)
after_filter_key = "device_qa_after_filter:device_id:" + str(device_id)
gmkv = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gmkv = get_gmkv(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"],
redis_db=gm_kv_host_item["db"],
redis_password=gm_kv_host_item["password"])
if gmkv:
break
if device_id != '0':
search_qa_recommend_key = "TS:search_recommend_answer_queue:device_id:" + str(device_id)
if redis_client.exists(search_qa_recommend_key):
search_qa_recommend_dict = redis_client.hgetall(search_qa_recommend_key)
queue_list = json.loads(search_qa_recommend_dict[b'answer_queue'])
queue_list = filter_qa(device_id, queue_list)
if len(queue_list) == 0:
redis_client.delete(search_qa_recommend_key)
elif len(queue_list) == 1:
size = size - 1
search_qa_recommend_list = queue_list
redis_client.delete(search_qa_recommend_key)
else:
size = size - 1
search_qa_recommend_list.append(queue_list[0])
redis_client.hset(search_qa_recommend_key, "answer_queue", json.dumps(queue_list[1:]))
if gmkv.exists(answer_queue_key):
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif gmkv.exists(after_filter_key):
que = get_after_filter_qa()
que = filter_qa(device_id, que)
if len(que) == 0:
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(que) <= size:
search_qa_recommend_list.extend(que)
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
search_qa_recommend_list.extend(que[:size])
write_after_filter_qa(que[size:])
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
try:
que = DeviceQAQueue.objects.get(device_id=device_id)
except DeviceQAQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
qa = list(filter(None, que.queue.split(',')))
if device_id != "0":
qa = filter_qa(device_id, qa)
if len(qa) == 0:
if device_id != "0":
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(qa) <= size:
search_qa_recommend_list.extend(qa)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
if device_id != "0":
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
search_qa_recommend_list.extend(qa[:size])
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
if device_id != "0":
write_after_filter_qa(qa[size:])
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
try:
que = DeviceQAQueue.objects.get(device_id=device_id)
except DeviceQAQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
# redis_client.set(key, cursor + size, ex=24 * 60 * 60)
data = list(islice(cycle(que), cursor, cursor + size))
data = list(map(int, data))
if cursor + 2 * size < len(que):
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
else:
try:
context.request_logger.app(reset_answer_queue=True)
cursor = 0
redis_client.set(key, cursor, ex=24 * 60 * 60)
except:
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return data
except:
logging_exception()
return []
# -*- coding: utf-8 -*-
import redis
import json
def tag_spu():
DORIS_URL = 'redis://:ReDis!GmTx*0aN6@172.16.40.133:6379'
redis_client = redis.StrictRedis.from_url(DORIS_URL)
redis_client.set("测脸型",json.dumps([]))
# -*- coding: utf-8 -*-
import redis
import re
import json
def name_process(name):
project_tags = ["口腔","植发","牙","皮肤","眼","外科","美容","整形","烧伤","胸","丰胸","美胸","祛痘","祛斑","脱毛",
"创伤","除疤","半永久","纹绣","纹眉"]
names = ["医疗","门诊","研究所","有限","公司","医学","诊所","中心","医美","集团","卫生","机构","专业",
"光学","国际","连锁","综合","专科",""]
location = ["街道","社区",]
stop_words = project_tags + names + location
for word in stop_words:
name = re.sub(word, '', name)
# 去除 中文括号( )
name = re.sub(r'\(.*?\)', '', name)
# 去除 英文括号( )
name = re.sub(r'\(.*?\)', '', name)
# 去除数字
name = re.sub(r'\d', '', name)
return name
if __name__ == '__main__':
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN6@172.16.40.133:6379")
name_list = eval(redis_client.get("hname"))
print(name_list[:10])
result = []
for i in name_list:
result.append(name_process(i))
redis_client.set("hname_short",json.dumps(result))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment