import redis import json import time import pymysql from rediscluster import StrictRedisCluster import numpy as np # def getRedisConn(): # conn = redis.Redis(host="172.18.51.10", port=6379,db=0) # conn.execute_command() # return conn # # # def getRedisConn1(): # pool = redis.ConnectionPool(host="172.16.40.133",password="ReDis!GmTx*0aN6",port=6379,db=0) # conn = redis.Redis(connection_pool=pool) # return conn # # def getRedisConn2(): # pool = redis.ConnectionPool(host="172.16.40.173",password="ReDis!GmTx*0aN9",port=6379,db=0) # conn = redis.Redis(connection_pool=pool) # return conn # # def getRedisConn3(): # pool = redis.ConnectionPool(host="172.16.40.164",password="ReDis!GmTx*0aN12",port=6379,db=0) # conn = redis.Redis(connection_pool=pool) # return conn # def getRedisConn4(): # startup_nodes = [ # {'host': '172.16.179.131', 'port': '7000'}, # ] # conn = StrictRedisCluster(host="172.16.50.145",password="XfkMCCdWDIU%ls$h",port=6379,decode_responses=True) pool = redis.ConnectionPool(host="172.16.50.145",password="XfkMCCdWDIU%ls$h",port=6379,db=0) conn = redis.Redis(connection_pool=pool) return conn # # def getRedisConn5(): # pool = redis.ConnectionPool(host="172.16.50.159",password="XfkMCCdWDIU%ls$h3",port=6379,db=0) # conn = redis.Redis(connection_pool=pool) # return conn # # def getDeviceIds(): # ids_set = set() # # db = pymysql.connect(host='172.16.30.136', port=3306, user='doris_olap', passwd='bA27hXasdfswuolap', # db='doris_olap') # sql = "select distinct cl_id from user_tag3_portrait;" # # cursor = db.cursor() # cursor.execute(sql) # datas = cursor.fetchall() # for d in datas: # device_id = str(d[0]) # if device_id and len(device_id) > 0: # ids_set.add(str(d[0])) # # print("deviceIds size:{}".format(str(len(ids_set)))) # return ids_set # # # # keys = set() # sum = 0 # key_sum = 0 # conn2 = getRedisConn2() # pipline = conn2.pipeline() # # pipline = conn.pipeline() # # key = "have_reply_answer_comment*" #139285 # # key = "device_register_qa_read_set:869574031646601*" #139285 # # key = "doris_feed:*" #139285 # cursor = 0 # while True: # del_datas = set() # cursor, data = conn2.scan(cursor,"*",10000) # # cursor = str(cursor, encoding='utf-8') # for d in data: # dd = str(d, encoding='utf-8') # pipline.type(dd) # pip_datas = pipline.execute() # for i,pp in enumerate(pip_datas): # if str(pp,encoding='utf-8') == "list": # keys.add(data[i]) # key_sum += 1 # # if cursor == 0 or len(data) == 0: # sum += len(data) # print(cursor,sum,key_sum) # if cursor == 0: # break # print(key, key_sum) # # # # def redis2(): # sum = 0 # conn = getRedisConn2() # # key = "feed:recommend:by:user:portrait*" # # key = "doris:user_portrait:tag3:candidate_latest_dict:*" # # key = "doris:user_portrait:tag3:candidate_list:*" #1 # # key = "device:latest:action:tag:names:update:*" #1831254 # # key = "device:latest:action:tags:update:*" #1698541 # # key = "feed:recommend:device_id:*" #1895536 #@bind('doris/recommend/feed') # # key = "feed:recommend:topic:device_id:*" #995546 #@bind('doris/recommend/feed') # # key = "user:service_portrait_tags2:cl_id:*" #21830 # # key = "doris:aichannel_smart_rank:tag3:read:device_id:*" #727 # # key = "doris:user_portrait:tag3:read_v2:device_id:*" # 1658011 过期时间设置 # key = "aaa:diary*" # 5 # cursor = 0 # keys_map = {} # while True: # del_datas = set() # cursor, data = conn.scan(cursor=cursor, match=key, count=10000) # for d in data: # if len(d) > len(key) - 2: # del_datas.add(str(d, encoding='utf-8')) # # pipline = conn.pipeline() # # for d in del_datas: # # pipline.delete(d) # # # pipline.expire(d,60*60*24*2) # # pipline.execute() # sum += len(data) # print(cursor, len(data), sum) # # if cursor == 0 or len(data) == 0: # if cursor == 0: # break # # def redis1(): # keys = ['doris:query_compre_question_id:have_read_list' # , 'query_wiki_have_read:' # , 'doris:query_compre_answer:have_read_list:' # , 'doris_feed:doctor_have_read:device_id:' # , 'have_read_live_playback_device_id' # , 'doris_feed:ganhuo_have_read:device_id:' # , 'doris_feed:home_video_diary:device_id:' # , 'doris_feed:home_video_answer:device_id:' # , 'doris_feed:home_video_tractate:device_id:' # , 'doris_feed:good_look_have_read:device_id:' # , 'doris_feed:ganhuo_have_read:device_id:' # , 'doris_feed:bendi_have_read:device_id:' # , 'service:homepage_slide_tab:have_read:sku_ids:' # , 'doris_feed:home_recommend_diary_v1:device_id:' # , 'search:query_type:word:' # , 'doris_search:' # , 'servicehot:' # , 'interest:' # , 'device:' # , 'gaia:hot_tractate_keyword:city_id:' # , 'gaia:hot_wiki_keyword:city_id:'] # for key_k in keys: # key = key_k + '*' # sum = 0 # conn = getRedisConn1() # pipline = conn.pipeline() # # key = "have_reply_answer_comment*" #139285 # # key = "device_register_qa_read_set:869574031646601*" #139285 # # key = "doris_feed:*" #139285 # cursor = 0 # key_sum = 0 # while True: # del_datas = set() # cursor, data = conn.scan(cursor=cursor, match=key, count=10000) # for d in data: # dd = str(d, encoding='utf-8') # if len(dd) > 0 and dd.startswith(key_k): # del_datas.add(str(d, encoding='utf-8')) # pipline = conn.pipeline() # for d in del_datas: # # pipline.delete(d) # pipline.expire(d, 60 * 60 * 24 * 3) # pipline.execute() # sum += len(data) # key_sum += len(del_datas) # print(cursor, len(data), sum) # # if cursor == 0 or len(data) == 0: # if cursor == 0: # break # print(key,key_sum) # # break # # conn2 = getRedisConn2() # conn = getRedisConn4() # # doris:tag_v3:coldstart:diary: # # for key in ["rims:tag_v3:coldstart:answer","rims:tag_v3:coldstart:tractate"]: # # key='user_portrait:doris:tag3:es:queue' # # key='rims:tag_v3:coldstart:answer' # key='rims:tag_v3:coldstart:tractate' # ls = [str(s, encoding='utf-8') for s in conn2.lrange(key, 0, -1)] # conn.delete(key) # for i in range(0,len(ls),1000): # data = ls[i:i+1000] # conn.lpush(key,*data) # # for i in range(0,len(ls),10000): # # conn.lpush(key, ) # # # #677 # # #659 # # def tett(): # # keys = ['rims:tag_v3:coldstart:'] #177 # # keys = ['doris:tag_v3:coldstart:'] #471 # # keys = ['rims:tag_v3:coldstart:','doris:tag_v3:coldstart:'] # keys = ['coldstart:light:clinic:beauty:']#11 # conn2 = getRedisConn2() # # for key_k in keys: # key = key_k + '*' # sum = 0 # key_sum = 0 # conn = getRedisConn4() # # pipline = conn.pipeline() # # key = "have_reply_answer_comment*" #139285 # # key = "device_register_qa_read_set:869574031646601*" #139285 # # key = "doris_feed:*" #139285 # cursor = 0 # while True: # del_datas = set() # cursor, data = conn2.scan(cursor,key,10000) # # cursor = str(cursor, encoding='utf-8') # for d in data: # dd = str(d, encoding='utf-8') # if len(dd) > 0 and dd.startswith(key_k): # del_datas.add(str(d, encoding='utf-8')) # # pipline = conn.pipeline() # for d in del_datas: # if d in ["rims:tag_v3:coldstart:answer","rims:tag_v3:coldstart:tractate"]: # continue # if str(conn2.type(d),encoding='utf-8') == "list": # print(d) # # ls = conn2.lrange(d, 0, -1) # ls = [str(s, encoding='utf-8') for s in conn2.lrange(d, 0, -1)] # conn.delete(d) # conn.lpush(d, *ls) # # conn.delete(d) # # # pipline.expire(d, 60 * 60 * 24 * 3) # # pipline.execute() # sum += len(data) # key_sum += len(del_datas) # print(cursor, len(data), key_sum) # # if cursor == 0 or len(data) == 0: # if cursor == 0: # break # print(key, key_sum) # # # # def redis4(): nodes=['7877da182171e313bc9326729f82999d1b629c79' ,'a4d4034faa81b935c2fd583053105b37f1c92ff1' ,'a8835d4c987847302bac66c5fc17ee1faae91fa3' ,'5a48236679f22637508651530633c9cc2f56f489' ,'fc3715919081c2cf3b30a2f8defb055c03564fdc' ,'eec89d7480980749c998add484e80f23fe5022a4' ,'f3f1ec6df458a5093c31663517a3cadaed5ab29c' ,'d35c630aad0a8b7f579bf4100f2860401b5d4f52'] # keys = ['rims:tag_v3:coldstart:','doris:tag_v3:coldstart:'] search_keys = [''] do_delete = False idletime_max = 60 * 60 * 180 * 24 # conn2 = getRedisConn2() for search_key in search_keys: search_key = search_key + '*' sum = 0 key_sum = 0 conn = getRedisConn4() # pipline = conn.pipeline() # key = "have_reply_answer_comment*" #139285 # key = "device_register_qa_read_set:869574031646601*" #139285 # key = "doris_feed:*" #139285 for node in nodes: cursor = "0" node_key_count = 0 node_del_key_list = [] while True: cursor, keys = conn.execute_command("scan {} match {} count {} {}".format(cursor,search_key,10000,node)) keys = list(map(lambda key:str(key, encoding='utf-8'),keys)) node_key_count += len(keys) cursor = str(cursor,encoding='utf-8') pipline = conn.pipeline() for key in keys: pipline.object('idletime', key) idletime_list = pipline.execute() for idx,idletime in enumerate(idletime_list): if idletime > idletime_max: node_del_key_list.append(keys[idx]) if cursor == "0": break node_del_key_count = len(node_del_key_list) print("node: {}, cursor: {}, node_key_count: {}, node_del_key_count: {}, node_del_key_rate: {}%".format(node, cursor, node_key_count, node_del_key_count, 100 * len(node_del_key_list) / node_key_count)) print("node: {}, node_del_key_sample_100: {}".format(node, np.random.choice(node_del_key_list, 50, replace= False))) if do_delete: node_del_batch_size = 10000 node_has_del_key_count = 0 for batch_start_idx in range(0,node_del_key_count, node_del_batch_size): node_del_key_batch_list = node_del_key_list[batch_start_idx : batch_start_idx + node_del_batch_size] pipline = conn.pipeline() for node_del_key in node_del_key_batch_list: pipline.delete(node_del_key) # pass pipline.execute() node_has_del_key_count += len(node_del_key_batch_list) print("node: {}, node_has_del_key_count: {}, node_del_key_count: {}, node_has_del_key_rate: {}%".format(node, node_has_del_key_count, node_del_key_count, 100 * node_has_del_key_count / node_del_key_count)) print("node: {}, node_has_del_key_batch_sample_50: {}".format(node, np.random.choice(node_del_key_batch_list, 50, replace= False))) print("") # # nodes = ['7877da182171e313bc9326729f82999d1b629c79' # , 'a4d4034faa81b935c2fd583053105b37f1c92ff1' # , 'a8835d4c987847302bac66c5fc17ee1faae91fa3' # , '5a48236679f22637508651530633c9cc2f56f489' # , 'fc3715919081c2cf3b30a2f8defb055c03564fdc' # , 'eec89d7480980749c998add484e80f23fe5022a4' # , 'f3f1ec6df458a5093c31663517a3cadaed5ab29c' # , 'd35c630aad0a8b7f579bf4100f2860401b5d4f52'] # conn = getRedisConn4() # num = 0 # key_sum = 0 # res = {} # keys_set = set() # for node in nodes: # cursor = "0" # while True: # cursor, data = conn.execute_command("scan {} match {} count {} {}".format(cursor, "*", 10000, node)) # cursor = str(cursor, encoding='utf-8') # # pipline = conn.pipeline() # keys = [] # for d in data: # key = str(d, encoding='utf-8') # is_flag = True # for kk in ['streaming:candidate:','tag3:user_portrait:topn:']: # if key.startswith(kk): # is_flag = False # break # if is_flag: # end = key.split(":")[-1] # keys_set.add(key[0:len(key)-len(end)]) # keys.append(key) # key_sum += len(data) # print("node:{},curor:{},key_sum:{},keys size:{}".format(node,str(cursor), str(key_sum), str(len(keys_set)))) # if cursor == "0": # break # print(list(keys_set)[0:100]) # # # # def redis5(): # key_sum = 0 # conn = getRedisConn5() # conn4 = getRedisConn4() # nodes=['d6b136db85c4c97c7bf9817f45bd4a2039e45fed' # ,'1ea04348ffc7a9858b2f86f5ef3b6565125f43df' # ,'9327036c8f0b4406f198f857b98198bd3472c85c' # ,'b6f89870957ef22d2aff0c6fa18a6bb9401a8294' # ,'4c5e23ed43c5b2bb4706a6335308920ac5990f94' # ,'0927a69bd5df3007175e55bb3ff0ac11bb9cd816' # ,'492a36ac916ed461a214d9c1e76ab21760a4b40d' # ,'5d8b9b2d7814df5664958e10178947a2fbd07f90'] # for node in nodes: # cursor = "0" # while True: # cursor, data = conn.execute_command("scan {} match {} count {} {}".format(cursor, "*", 10000, node)) # cursor = str(cursor, encoding='utf-8') # for d in data: # dd = str(d, encoding='utf-8') # # print(dd) # t = str(conn.type(dd),encoding='utf-8') # if t == "string": # conn4.set(dd,conn.get(dd)) # if t == "hash": # conn4.hmset(dd,conn.hgetall(dd)) # key_sum += len(data) # print(node, cursor, len(data), key_sum) # if cursor == "0": # break # # # def write2ToRedis4_copy(): # res={"hash":0,"string":0,"list":0} # key_sum = 0 # conn = getRedisConn2() # conn4 = getRedisConn4() # cursor = 0 # while True: # cursor, data = conn.scan(cursor=cursor,match="*",count=10000) # for d in data: # dd = str(d, encoding='utf-8') # # t = str(conn.type(dd), encoding='utf-8') # # ttl = conn.ttl(dd) # # print(dd) # t = str(conn.type(dd), encoding='utf-8') # # if t == "string": # # conn4.set(dd, conn.get(dd)) # # if ttl: # # conn4.expire(dd, ttl) # # res["string"] += 1 # # if t == "hash": # # conn4.hmset(dd, conn.hgetall(dd)) # # if ttl: # # conn4.expire(dd, ttl) # # res["hash"] += 1 # if t == "list": # datas = [conn.lrange(dd, 0, -1)] # conn4.lpush(dd, conn.lrange(dd,0,-1)) # print(dd) # # if ttl: # # conn4.expire(dd, ttl) # res["list"] += 1 # key_sum += len(data) # print(cursor, len(data), key_sum) # if cursor == 0: # break # # def write2ToRedis4(): # res={"hash":0,"string":0,"list":0} # key_sum = 0 # conn = getRedisConn2() # conn4 = getRedisConn4() # pipeline4 = conn4.pipeline() # cursor = 0 # while True: # cursor, data = conn.scan(cursor=cursor,match="*",count=20000) # keys = [] # for d in data: # dd = str(d, encoding='utf-8') # keys.append(dd) # pipeline = conn.pipeline() # for k in keys: # pipeline.type(k) # pip_datas1 = pipeline.execute() # pipeline = conn.pipeline() # for i,p in enumerate(pip_datas1): # t = str(p, encoding='utf-8') # dd = keys[i] # pipeline.ttl(dd) # if t == "string": # pipeline.get(dd) # elif t == "hash": # pipeline.hgetall(dd) # elif t == "list": # pipeline.lrange(dd, 0, -1) # else: # pipeline.get("123456787654321") # pip_datas2 = pipeline.execute() # for i in range(0,len(pip_datas2),2): # dd = keys[i//2] # t = str(pip_datas1[i//2], encoding='utf-8') # ttl = pip_datas2[i] # v = pip_datas2[i+1] # if t == "string": # if v: # pipeline4.set(dd, v) # if ttl: # pipeline4.expire(dd, ttl) # res["string"] += 1 # if t == "hash": # if v: # pipeline4.hmset(dd, v) # if ttl: # pipeline4.expire(dd, ttl) # res["hash"] += 1 # if t == "list": # if v: # pipeline4.lpush(dd, v) # if ttl: # pipeline4.expire(dd, ttl) # res["list"] += 1 # pipeline4.execute() # key_sum += len(data) # print(cursor, len(data), key_sum) # if cursor == 0: # break # # # if __name__ == '__main__': # # conn = getRedisConn4() # num = 0 # key_sum = 0 # res = {} # cursor = 0 # while True: # cursor, data = conn.scan(cursor=cursor, count=10000) # pipline = conn.pipeline() # keys = [] # for d in data: # key = str(d, encoding='utf-8') # keys.append(key) # t = pipline.type(key) # idle = pipline.object('idletime', key) # ttl = pipline.ttl(key) # datass = pipline.execute() # for i in range(0, len(datass), 3): # t = str(datass[i], encoding='utf-8') # if idle and ttl and t=="set" and datass[i + 2] == -1: # idle = datass[i + 1] # ttl = datass[i + 2] # key = keys[i // 3] # add_d = res.setdefault(t, {}) # add_dd = add_d.setdefault(key, {}) # add_dd["ttl"] = ttl # add_dd["idle"] = idle # add_d[key] = add_dd # res[t] = add_d # num += 1 # print("curor:{},key_sum:{},num:{},keys size:{},pipline size:{}".format(str(cursor),str(key_sum),str(num),str(len(data)),str(len(datass)))) # key_sum += len(data) # if cursor == 0 or len(data) == 0: # break # # # # # end = int(time.time()) # # print("conn:{},size:{},cost:{}s".format(str(index+1),str(num), str(end - start))) # # # # open('/tmp/redis_keys_hash_2.json', mode='w', encoding='utf-8').write(json.dumps(res, ensure_ascii=False)) #