from utils import con_sql from datetime import datetime from config import * import pandas as pd import os import time import pymysql import time def fetch_qa(device_id, card_type, size): try: key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type, date=RecommendFeed.current_date()) if (device_id != '0'): search_qa_recommend_key = "TS:search_recommend_answer_queue:device_id:" + str(device_id) search_qa_recommend_list = list() search_cursor_ts = 0 if redis_client.exists(search_qa_recommend_key): search_qa_recommend_dict = redis_client.hgetall(search_qa_recommend_key) if b'cursor' in search_qa_recommend_dict: search_cursor_ts = json.loads(search_qa_recommend_dict[b'cursor']) if search_cursor_ts < 10: search_qa_recommend_list = json.loads(search_qa_recommend_dict[b'answer_queue']) if search_cursor_ts < len(search_qa_recommend_list): size = size - 1 try: que = DeviceQAQueue.objects.get(device_id=device_id) except DeviceQAQueue.DoesNotExist: que = AnswerQueue.objects.last() if not que: return [] que = list(filter(None, que.queue.split(','))) # adjust args. cursor = redis_client.get(key) or 0 cursor = int(cursor) % len(que) size = min(size, len(que)) # redis_client.set(key, cursor + size, ex=24 * 60 * 60) data = list(islice(cycle(que), cursor, cursor + size)) data = list(map(int, data)) if cursor + 2 * size < len(que): redis_client.set(key, cursor + size, ex=24 * 60 * 60) else: try: context.request_logger.app(reset_answer_queue=True) cursor = 0 redis_client.set(key, cursor, ex=24 * 60 * 60) except: redis_client.set(key, cursor + size, ex=24 * 60 * 60) if device_id != '0': if len(search_qa_recommend_list) > 0 and search_cursor_ts < len(search_qa_recommend_list): queue = search_qa_recommend_list[search_cursor_ts:search_cursor_ts + 1] queue.extend(data) data = queue new_search_cursor = search_cursor_ts + 1 redis_client.hset(search_qa_recommend_key, 'cursor', new_search_cursor) redis_client.expire(search_qa_recommend_key, 30 * 24 * 60 * 60) read_qa_key = "TS:recommend_answer_set:device_id:" + str(device_id) if len(data) > 0: redis_client.sadd(read_qa_key, *data) return data except: logging_exception() return [] def fetch_user_topic(device_id, card_type, size): try: key = '{device_id}-{card_type}-{date}'.format(device_id=device_id, card_type=card_type, date=RecommendFeed.current_date()) if (device_id != '0') and size >= 2: search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id) search_topic_recommend_list = list() search_cursor_ts = 0 if redis_client.exists(search_topic_recommend_key): search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key) if b'cursor' in search_topic_recommend_dict: search_cursor_ts = json.loads(search_topic_recommend_dict[b'cursor']) if search_cursor_ts < 30: search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue']) if search_cursor_ts < len(search_topic_recommend_list): size = size - 2 try: que = DeviceUserTopicQueue.objects.get(device_id=device_id) except DeviceUserTopicQueue.DoesNotExist: que = UserTopicQueue.objects.last() if not que: return [] que = list(filter(None, que.queue.split(','))) # adjust args. cursor = redis_client.get(key) or 0 cursor = int(cursor) % len(que) size = min(size, len(que)) data = list(islice(cycle(que), cursor, cursor + size)) data = list(map(int, data)) if cursor + 2 * size < len(que): redis_client.set(key, cursor + size, ex=24 * 60 * 60) else: try: context.request_logger.app(reset_queue=True) cursor = 0 redis_client.set(key, cursor, ex=24 * 60 * 60) except: redis_client.set(key, cursor + size, ex=24 * 60 * 60) if device_id != '0' and size >= 2: if len(search_topic_recommend_list) > 0 and search_cursor_ts < len(search_topic_recommend_list): queue = search_topic_recommend_list[search_cursor_ts:search_cursor_ts + 2] queue.extend(data) data = queue new_search_cursor = search_cursor_ts + 2 redis_client.hset(search_topic_recommend_key, 'cursor', new_search_cursor) redis_client.expire(search_topic_recommend_key, 30 * 24 * 60 * 60) read_topic_key = "TS:recommend_tractate_set:device_id:" + str(device_id) if len(data) > 0: redis_client.sadd(read_topic_key, *data) return data except: logging_exception() return [] def fetch_diary(cls, device_id, card_type, city_id, size): # first, we fetch data from personal-queue city-queue, if not both, get data # from world queue. user_portrait_diary_part_list = list() click_diary_size = 1 search_diary_size = 4 if device_id != '0': user_portrait_diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % (device_id, datetime.datetime.now().strftime('%Y-%m-%d')) if redis_client.exists(user_portrait_diary_key): user_portrait_diary_dict = redis_client.hgetall(user_portrait_diary_key) user_portrait_cursor = str(user_portrait_diary_dict[b'cursor'],encoding='utf-8') if user_portrait_cursor == '0': if b'len_cursor' in user_portrait_diary_dict.keys(): user_portrait_diary_list = json.loads(user_portrait_diary_dict[b'diary_queue']) len_cursor = str(user_portrait_diary_dict[b'len_cursor'],encoding='utf-8') len_cursor = int(len_cursor) if len(user_portrait_diary_list) - len_cursor >size: user_portrait_diary_part_list = user_portrait_diary_list[len_cursor:len_cursor+size] redis_client.hset(user_portrait_diary_key,'len_cursor',len_cursor+size) size = 0 else: user_portrait_diary_list = json.loads(user_portrait_diary_dict[b'diary_queue']) diary_list_len = len(user_portrait_diary_list) - len_cursor size = size - diary_list_len user_portrait_diary_part_list = user_portrait_diary_list[len_cursor:len_cursor + diary_list_len] redis_client.hset(user_portrait_diary_key, 'len_cursor', len_cursor + diary_list_len) user_portrait_cursor = int(user_portrait_cursor) + 1 redis_client.hset(user_portrait_diary_key, 'cursor', user_portrait_cursor) else: user_portrait_diary_part_list = json.loads(user_portrait_diary_dict[b'diary_queue']) size = size - len(user_portrait_diary_part_list) user_portrait_cursor = int(user_portrait_cursor) + 1 redis_client.hset(user_portrait_diary_key, 'cursor', user_portrait_cursor) try: # obj = DeviceDiaryQueue.objects.filter(device_id=device_id, city_id=city_id).first() (local, nearby, nation, megacity, city_id) = cls.fetch_device_diary_queue_data(city_id, device_id) if len(local) == 0 and len(nearby) == 0 and len(nation) == 0 and len(megacity) == 0: (local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id) # if not obj: # (local, nearby, nation, megacity,city_id) = cls.fetch_diary_queue_data(city_id) # else: # local = list(filter(None, obj.native_queue.split(','))) if obj.native_queue else [] # nearby = list(filter(None, obj.nearby_queue.split(','))) if obj.nearby_queue else [] # nation = list(filter(None, obj.nation_queue.split(','))) if obj.nation_queue else [] # megacity = list(filter(None, obj.megacity_queue.split(','))) if obj.megacity_queue else [] except: logging_exception() (local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id) if(device_id!='0'): search_diary_recommend_key = "TS:search_recommend_diary_queue:device_id:" + str(device_id) search_diary_recommend_list = list() search_cursor_ts = 0 if redis_client.exists(search_diary_recommend_key) and size >3: search_diary_recommend_dict = redis_client.hgetall(search_diary_recommend_key) if b'cursor' in search_diary_recommend_dict: search_cursor_ts = json.loads(search_diary_recommend_dict[b'cursor']) search_diary_recommend_list = json.loads(search_diary_recommend_dict[b'diary_queue']) if search_cursor_ts +search_diary_size < len(search_diary_recommend_list) : size = size - search_diary_size if (device_id != '0') : diary_recommend_key = "TS:recommend_diary_queue:device_id:" + str(device_id) diary_recommend_list = list() if redis_client.exists(diary_recommend_key) and size > 0: diary_recommend_dict = redis_client.hgetall(diary_recommend_key) diary_recommend_list = json.loads(diary_recommend_dict[b'diary_queue']) if len(diary_recommend_list)>0: size = size -click_diary_size key = '{device_id}-{city_id}-{date}'.format(device_id=device_id, city_id=city_id, date=RecommendFeed.current_date()) # strategy rule: when user refresh over 30 loadings, reset native nearby nation queue cursor. counter_key = key + '-counter_v1' counter = redis_client.incr(counter_key) if counter == 1: redis_client.expire(counter_key, 24 * 60 * 60) cursor_key = key + '-cursor_v1' cursor = redis_client.get(cursor_key) or b'0-0-0-0' # if counter > 30: # cursor = b'0-0-0-0' # redis_client.delete(counter_key) cx, cy, cm, cz = map(int, cursor.split(b'-')) x, y, m, z = cls.get_city_scale(city_id) data, ncx, ncy, ncm, ncz = cls.get_scale_data( local, nearby, nation, megacity, cx, cy, cm, cz, x, y, z, m, size ) if ncx == cx and ncy == cy: # native queue and nearby queue logger.info("diary queue reach end,cx:%d,cy:%d,cm:%d,cz:%d",cx,cy,cm,cz) # redis_client.delete(counter_key) # data, ncx, ncy, ncm, ncz = cls.get_scale_data( # local, nearby, nation, megacity, # 0, 0, 0, 0, # x, y, z, m, size # ) ncx = ncy = ncm = ncz = 0 val = '-'.join(map(str, [ncx, ncy, ncm, ncz])) redis_client.set(cursor_key, val, ex=24 * 60 * 60) data = list(map(int, data)) if device_id != '0': if search_cursor_ts<len(search_diary_recommend_list)-search_diary_size: queue = search_diary_recommend_list[search_cursor_ts:search_cursor_ts+search_diary_size] queue.extend(data) data = queue new_search_cursor = search_cursor_ts +search_diary_size redis_client.hset(search_diary_recommend_key,'cursor',new_search_cursor) redis_client.expire(search_diary_recommend_key,30*24*60*60) if len(diary_recommend_list) >0: diary_id = diary_recommend_list.pop(0) data.insert(0,diary_id) if len(diary_recommend_list)>0: diary_recommend_list_json = json.dumps(diary_recommend_list) redis_client.hset(diary_recommend_key,'diary_queue',diary_recommend_list_json) redis_client.expire(diary_recommend_key,30*24*60*60) else: redis_client.delete(diary_recommend_key) if len(user_portrait_diary_part_list)>0: user_portrait_diary_part_list.extend(data) data = user_portrait_diary_part_list #已读 read_diary_key = "TS:recommend_diary_set:device_id:" + str(device_id) if len(data)>0: redis_client.sadd(read_diary_key,*data) return data def get_scale_data(local, nearby, nation, megacity, cx, cy, cm, cz, x, y, z, m, size): """ :param local: local diary queue :param nearby: nearby diary queue :param nation: nation diary queue :param megacity: megacity diary queue :param cx: seen local diary offset :param cy: seen nearby diary offset :param cz: seen nation diary offset :param cm: seen megacity diary offset :param x: local diary scale factor :param y: nearby diary scale factor :param z: nation diary scale factor :param m: megacity diary scale factor :param size: nubmer of diary :return: """ # 本地 临近 特大城市 全国 四个层级 都按照的是四舍五入取得方式 # 针对出现的问题,本次相应的优化是: # 1、如果出现两个层级为零,且有剩余坑位时,则按照本地 临近 全国的优先级,先给优先级高且为零的层级一个坑位。 # 2、如果所有层级都非零,且有剩余坑位时,则优先给权重占比大的层级一个坑位。 # 3、如果只有一个层级为零,且有剩余坑位时,则优先填充权重占比大的层级一个坑位。 nx = int(round(x * 1.0 / (x + y + z + m) * size)) ny = int(round(y * 1.0 / (x + y + z + m) * size)) nz = int(round(z * 1.0 / (x + y + z + m) * size)) nm = int(round(m * 1.0 / (x + y + z + m) * size)) nxyz = [nx, ny, nm, nz] xyz = [x, y, m, z] counter = Counter([nx, ny, nm, nz]) if counter[0] == 2: nxyz[nxyz.index(0)] += size - sum(nxyz) else: nxyz[xyz.index(max(xyz))] += size - sum(nxyz) nx, ny, nm, nz = nxyz slocal = local[cx:cx + nx] cx = min(cx + nx, len(local)) ny += (nx - len(slocal)) snearby = nearby[cy:cy + ny] cy = min(cy + ny, len(nearby)) nm += (ny - len(snearby)) smegacity = megacity[cm: cm + nm] cm = min(cm + nm, len(megacity)) nz += (nm - len(smegacity)) snation = nation[cz:cz + nz] cz = min(cz + nz, len(nation)) return chain(slocal, snearby, smegacity, snation), cx, cy, cm, cz