@staticmethod def fetch_user_topic(device_id, card_type, size): try: def filter_topic(cid_list): try: if gmkv.exists(dislike_key): dislike = gmkv.smembers(dislike_key) if len(cid_list) > 0: if type(cid_list[0]) == int or type(cid_list[0]) == str: cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike] else: cid_list = [i for i in cid_list if i not in dislike] return cid_list else: return cid_list except: return cid_list def write_after_filter_tractate(cid_list): try: if gmkv.exists(after_filter_key): gmkv.set(after_filter_key, json.dumps(cid_list)) else: gmkv.set(after_filter_key, json.dumps(cid_list), ex=6 * 60 * 60) except: logging_exception() logger.error("catch exception,err_log:%s" % traceback.format_exc()) def get_filter_tractate(): try: return json.loads(gmkv.get(after_filter_key)) except: return [] def read_history(cid_list): if redis_client.exists(today_key): redis_client.sadd(today_key, *cid_list) else: redis_client.sadd(today_key, *cid_list) redis_client.expire(today_key, 15 * 24 * 60 * 60) if redis_client.exists(read_key) and redis_client.exists(old_key): redis_client.sdiffstore(read_key, read_key, old_key) redis_client.delete(old_key) redis_client.expire(read_key, time=13 * 24 * 60 * 60) redis_client.sadd(read_key, *cid_list) def get_gmkv(redis_ip, redis_port, redis_db, redis_password=""): try: if len(redis_password) == 0: cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2) else: cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password, socket_timeout=2) cli_ins.ping() return cli_ins except: return None dislike_key = str(device_id) + "_dislike_tractate" search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id) after_filter_key = "device_tractate_after_filter:device_id:" + str(device_id) tractate_key = "tractate_is_tail" + str(device_id) read_key = "TS:recommend_tractate_set:device_id:" + str(device_id) old_key = "TS:recommend_tractate_set:device_id:{}:{}" \ .format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d")) today_key = "TS:recommend_tractate_set:device_id:{}:{}" \ .format(device_id, datetime.date.today().strftime("%Y-%m-%d")) search_list = list() gmkv = None for gm_kv_host_item in settings.GM_KV_HOSTS: gmkv = get_gmkv(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"], redis_db=gm_kv_host_item["db"], redis_password=gm_kv_host_item["password"]) if gmkv: break if device_id != '0': if redis_client.exists(search_topic_recommend_key): search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key) search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue']) search_topic_recommend_list = filter_topic(search_topic_recommend_list) if len(search_topic_recommend_list) == 0: redis_client.delete(search_topic_recommend_key) elif len(search_topic_recommend_list) <= 2: search_list = search_topic_recommend_list size = size - len(search_list) redis_client.delete(search_topic_recommend_key) else: search_list = search_topic_recommend_list[:2] size = size - 2 redis_client.hset(search_topic_recommend_key, 'tractate_queue', json.dumps(search_topic_recommend_list[2:])) if gmkv.exists(tractate_key): if len(search_list) > 0: search_list = list(map(int, search_list)) read_history(search_list) return search_list elif gmkv.exists(after_filter_key): que = get_filter_tractate() que = filter_topic(que) if len(que) == 0: gmkv.set(tractate_key, "tail", ex=2 * 60 * 60) if len(search_list) > 0: search_list = list(map(int, search_list)) read_history(search_list) return search_list elif len(que) <= size: search_list.extend(que) gmkv.set(tractate_key, "tail", ex=2 * 60 * 60) search_list = list(map(int, search_list)) read_history(search_list) return search_list else: search_list.extend(que[:size]) write_after_filter_tractate(que[size:]) search_list = list(map(int, search_list)) read_history(search_list) return search_list else: try: que = DeviceUserTopicQueue.objects.get(device_id=device_id) except DeviceUserTopicQueue.DoesNotExist: que = UserTopicQueue.objects.last() if not que: if len(search_list) > 0: search_list = list(map(int, search_list)) read_history(search_list) return search_list qa = list(filter(None, que.queue.split(','))) qa = filter_topic(qa) if len(qa) == 0: gmkv.set(tractate_key, "tail", ex=2 * 60 * 60) if len(search_list) > 0: search_list = list(map(int, search_list)) read_history(search_list) return search_list elif len(qa) <= size: search_list.extend(qa) search_list = list(map(int, search_list)) gmkv.set(tractate_key, "tail", ex=2 * 60 * 60) read_history(search_list) return search_list else: search_list.extend(qa[:size]) search_list = list(map(int, search_list)) write_after_filter_tractate(qa[size:]) read_history(search_list) return search_list else: key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type, date=RecommendFeed.current_date()) try: que = DeviceUserTopicQueue.objects.get(device_id=device_id) except DeviceUserTopicQueue.DoesNotExist: que = UserTopicQueue.objects.last() if not que: return [] que = list(filter(None, que.queue.split(','))) # adjust args. cursor = redis_client.get(key) or 0 cursor = int(cursor) % len(que) size = min(size, len(que)) data = list(islice(cycle(que), cursor, cursor + size)) data = list(map(int, data)) if cursor + 2 * size < len(que): redis_client.set(key, cursor + size, ex=24 * 60 * 60) else: try: context.request_logger.app(reset_queue=True) cursor = 0 redis_client.set(key, cursor, ex=24 * 60 * 60) except: redis_client.set(key, cursor + size, ex=24 * 60 * 60) return data except: logging_exception() return [] # 帖子以前的方法 def fetch_user_topic(device_id, card_type, size): try: key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type, date=RecommendFeed.current_date()) if (device_id != '0') and size >= 2: search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id) search_topic_recommend_list = list() search_cursor_ts = 0 if redis_client.exists(search_topic_recommend_key): search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key) if b'cursor' in search_topic_recommend_dict: search_cursor_ts = json.loads(search_topic_recommend_dict[b'cursor']) if search_cursor_ts < 30: search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue']) if search_cursor_ts < len(search_topic_recommend_list): size = size - 2 try: que = DeviceUserTopicQueue.objects.get(device_id=device_id) except DeviceUserTopicQueue.DoesNotExist: que = UserTopicQueue.objects.last() if not que: return [] que = list(filter(None, que.queue.split(','))) # adjust args. cursor = redis_client.get(key) or 0 cursor = int(cursor) % len(que) size = min(size, len(que)) data = list(islice(cycle(que), cursor, cursor + size)) data = list(map(int, data)) if cursor + 2 * size < len(que): redis_client.set(key, cursor + size, ex=24 * 60 * 60) else: try: context.request_logger.app(reset_queue=True) cursor = 0 redis_client.set(key, cursor, ex=24 * 60 * 60) except: redis_client.set(key, cursor + size, ex=24 * 60 * 60) if device_id != '0' and size >= 2: if len(search_topic_recommend_list) > 0 and search_cursor_ts < len(search_topic_recommend_list): queue = search_topic_recommend_list[search_cursor_ts:search_cursor_ts + 2] queue.extend(data) data = queue new_search_cursor = search_cursor_ts + 2 redis_client.hset(search_topic_recommend_key, 'cursor', new_search_cursor) redis_client.expire(search_topic_recommend_key, 30 * 24 * 60 * 60) read_topic_key = "TS:recommend_tractate_set:device_id:" + str(device_id) if len(data) > 0: redis_client.sadd(read_topic_key, *data) return data except: logging_exception() return [] # 帖子老方法 def fetch_user_topic(device_id, card_type, size): try: key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type, date=RecommendFeed.current_date()) if (device_id != '0') and size >= 2: search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id) search_topic_recommend_list = list() search_cursor_ts = 0 if redis_client.exists(search_topic_recommend_key): search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key) if b'cursor' in search_topic_recommend_dict: search_cursor_ts = json.loads(search_topic_recommend_dict[b'cursor']) if search_cursor_ts < 30: search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue']) if search_cursor_ts < len(search_topic_recommend_list): size = size - 2 try: que = DeviceUserTopicQueue.objects.get(device_id=device_id) except DeviceUserTopicQueue.DoesNotExist: que = UserTopicQueue.objects.last() if not que: return [] que = list(filter(None, que.queue.split(','))) # adjust args. cursor = redis_client.get(key) or 0 cursor = int(cursor) % len(que) size = min(size, len(que)) data = list(islice(cycle(que), cursor, cursor + size)) data = list(map(int, data)) if cursor + 2 * size < len(que): redis_client.set(key, cursor + size, ex=24 * 60 * 60) else: try: context.request_logger.app(reset_queue=True) cursor = 0 redis_client.set(key, cursor, ex=24 * 60 * 60) except: redis_client.set(key, cursor + size, ex=24 * 60 * 60) if device_id != '0' and size >= 2: if len(search_topic_recommend_list) > 0 and search_cursor_ts < len(search_topic_recommend_list): queue = search_topic_recommend_list[search_cursor_ts:search_cursor_ts + 2] queue.extend(data) data = queue new_search_cursor = search_cursor_ts + 2 redis_client.hset(search_topic_recommend_key, 'cursor', new_search_cursor) redis_client.expire(search_topic_recommend_key, 30 * 24 * 60 * 60) read_topic_key = "TS:recommend_tractate_set:device_id:" + str(device_id) if len(data) > 0: redis_client.sadd(read_topic_key, *data) return data except: logging_exception() return [] #9.6线上qa def fetch_qa(device_id, card_type, size): try: def get_after_filter_qa(): try: return json.loads(gmkv.get(after_filter_key)) except: return [] def write_after_filter_qa(cid_list): try: if gmkv.exists(after_filter_key): gmkv.set(after_filter_key, json.dumps(cid_list)) else: gmkv.set(after_filter_key, json.dumps(cid_list), ex=6 * 60 * 60) except: logging_exception() logger.error("catch exception,err_log:%s" % traceback.format_exc()) def filter_qa(device_id, cid_list): try: key = str(device_id) + "_dislike_qa" if gmkv.exists(key): dislike = gmkv.smembers(key) if len(cid_list) > 0: if type(cid_list[0]) == int or type(cid_list[0]) == str: cid_list = [i for i in cid_list if str(i).encode('utf-8') not in dislike] else: cid_list = [i for i in cid_list if i not in dislike] return cid_list else: return cid_list except: return cid_list def read_history(cid_list): if redis_client.exists(today_qa_key): redis_client.sadd(today_qa_key, *cid_list) else: redis_client.sadd(today_qa_key, *cid_list) redis_client.expire(today_qa_key, 15 * 24 * 60 * 60) if redis_client.exists(read_qa_key) and redis_client.exists(old_qa_key): redis_client.sdiffstore(read_qa_key, read_qa_key, old_qa_key) redis_client.delete(old_qa_key) redis_client.expire(read_qa_key, time=13 * 24 * 60 * 60) redis_client.sadd(read_qa_key, *cid_list) def get_gmkv(redis_ip, redis_port, redis_db, redis_password=""): try: if len(redis_password) == 0: cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2) else: cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password, socket_timeout=2) cli_ins.ping() return cli_ins except: return None search_qa_recommend_list = list() read_qa_key = "TS:recommend_answer_set:device_id:" + str(device_id) old_qa_key = "TS:recommend_answer_set:device_id:{}:{}" \ .format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d")) today_qa_key = "TS:recommend_answer_set:device_id:{}:{}" \ .format(device_id, datetime.date.today().strftime("%Y-%m-%d")) answer_queue_key = "qa_is_tail:" + str(device_id) after_filter_key = "device_qa_after_filter:device_id:" + str(device_id) gmkv = None for gm_kv_host_item in settings.GM_KV_HOSTS: gmkv = get_gmkv(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"], redis_db=gm_kv_host_item["db"], redis_password=gm_kv_host_item["password"]) if gmkv: break if device_id != '0': search_qa_recommend_key = "TS:search_recommend_answer_queue:device_id:" + str(device_id) if redis_client.exists(search_qa_recommend_key): search_qa_recommend_dict = redis_client.hgetall(search_qa_recommend_key) queue_list = json.loads(search_qa_recommend_dict[b'answer_queue']) queue_list = filter_qa(device_id, queue_list) if len(queue_list) == 0: redis_client.delete(search_qa_recommend_key) elif len(queue_list) == 1: size = size - 1 search_qa_recommend_list = queue_list redis_client.delete(search_qa_recommend_key) else: size = size - 1 search_qa_recommend_list.append(queue_list[0]) redis_client.hset(search_qa_recommend_key, "answer_queue", json.dumps(queue_list[1:])) if gmkv.exists(answer_queue_key): if len(search_qa_recommend_list) > 0: search_qa_recommend_list = list(map(int, search_qa_recommend_list)) read_history(search_qa_recommend_list) return search_qa_recommend_list elif gmkv.exists(after_filter_key): que = get_after_filter_qa() que = filter_qa(device_id, que) if len(que) == 0: gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60) if len(search_qa_recommend_list) > 0: search_qa_recommend_list = list(map(int, search_qa_recommend_list)) read_history(search_qa_recommend_list) return search_qa_recommend_list elif len(que) <= size: search_qa_recommend_list.extend(que) gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60) search_qa_recommend_list = list(map(int, search_qa_recommend_list)) read_history(search_qa_recommend_list) return search_qa_recommend_list else: search_qa_recommend_list.extend(que[:size]) write_after_filter_qa(que[size:]) search_qa_recommend_list = list(map(int, search_qa_recommend_list)) read_history(search_qa_recommend_list) return search_qa_recommend_list try: que = DeviceQAQueue.objects.get(device_id=device_id) except DeviceQAQueue.DoesNotExist: que = AnswerQueue.objects.last() if not que: if len(search_qa_recommend_list) > 0: search_qa_recommend_list = list(map(int, search_qa_recommend_list)) read_history(search_qa_recommend_list) return search_qa_recommend_list qa = list(filter(None, que.queue.split(','))) if device_id != "0": qa = filter_qa(device_id, qa) if len(qa) == 0: if device_id != "0": gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60) if len(search_qa_recommend_list) > 0: search_qa_recommend_list = list(map(int, search_qa_recommend_list)) read_history(search_qa_recommend_list) return search_qa_recommend_list elif len(qa) <= size: search_qa_recommend_list.extend(qa) search_qa_recommend_list = list(map(int, search_qa_recommend_list)) if device_id != "0": gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60) read_history(search_qa_recommend_list) return search_qa_recommend_list else: search_qa_recommend_list.extend(qa[:size]) search_qa_recommend_list = list(map(int, search_qa_recommend_list)) if device_id != "0": write_after_filter_qa(qa[size:]) read_history(search_qa_recommend_list) return search_qa_recommend_list else: key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type, date=RecommendFeed.current_date()) try: que = DeviceQAQueue.objects.get(device_id=device_id) except DeviceQAQueue.DoesNotExist: que = AnswerQueue.objects.last() if not que: return [] que = list(filter(None, que.queue.split(','))) # adjust args. cursor = redis_client.get(key) or 0 cursor = int(cursor) % len(que) size = min(size, len(que)) # redis_client.set(key, cursor + size, ex=24 * 60 * 60) data = list(islice(cycle(que), cursor, cursor + size)) data = list(map(int, data)) if cursor + 2 * size < len(que): redis_client.set(key, cursor + size, ex=24 * 60 * 60) else: try: context.request_logger.app(reset_answer_queue=True) cursor = 0 redis_client.set(key, cursor, ex=24 * 60 * 60) except: redis_client.set(key, cursor + size, ex=24 * 60 * 60) return data except: logging_exception() return []