# -*- coding:UTF-8 -*- # @Time : 2020/9/9 10:07 # @File : portary_div_exposure.py # @email : litao@igengmei.com # @author : litao import datetime import json import traceback import redis import pymysql from elasticsearch import Elasticsearch from meta_base_code.utils.func_get_pv_card_id import get_card_id import pandas as pd redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN6@172.16.40.133:6379", decode_responses=True) redis_client2 = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379", decode_responses=True) redis_client3 = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN12@172.16.40.164:6379", decode_responses=True) redis_client4 = redis.StrictRedis.from_url("redis://:XfkMCCdWDIU%ls$h@172.16.50.145:6379", decode_responses=True) es = Elasticsearch([ { 'host': '172.16.31.17', 'port': 9200, }, { 'host': '172.16.31.11', 'port': 9200, }]) def user_portrait_scan_info(): return_dict = {} try: round = 0 all_count = 0 empty_count = 0 just_projects_count = 0 keys = "doris:user_portrait:tag3:device_id:*" cur, results = redis_client2.scan(0, keys, 3000) while cur != 0: round += 1 print("round: " + str(round)) cur, results = redis_client2.scan(cur, keys, 3000) for key in results: key = key device_id = key.split(":")[-1] all_count += 1 # print(key) # if user_portrait_is_empty(device_id): # print(device_id) # empty_count += 1 # if user_portrait_just_projects(device_id): # print(device_id) # just_projects_count += 1 # user_portrait_get_empty_candidates(device_id) try: res_dic = get_user_portrait_tag3_from_redis(device_id) # print(res_dic) for data_type in res_dic: for tag in res_dic[data_type]: if return_dict.get(tag): return_dict[tag] = (data_type,return_dict[tag][1] + 1) else: return_dict[tag] = (data_type,1) except: continue # for data_list in res_dic: # for data in data_list: return return_dict except Exception as e: print(e) return {} def get_user_portrait_tag3_redis_key(device_id): return "doris:user_portrait:tag3:device_id:" + str(device_id) def get_user_portrait_tag3_from_redis(device_id, limit_score=0): def items_gt_score(d): new_d = dict(sorted(d.items(), key=lambda x: x[1], reverse=True)) res = {tag: float(score) for tag, score in new_d.items() if float(score) >= limit_score} return list(res.keys()) portrait_key = get_user_portrait_tag3_redis_key(device_id) if redis_client2.exists(portrait_key): user_portrait = json.loads(redis_client2.get(portrait_key)) first_demands = items_gt_score(user_portrait.get("first_demands", {})) # 一级诉求 second_demands = items_gt_score(user_portrait.get("second_demands", {})) # 二级诉求 first_solutions = items_gt_score(user_portrait.get("first_solutions", {})) # 一级方式 second_solutions = items_gt_score(user_portrait.get("second_solutions", {})) # 二级方式 first_positions = items_gt_score(user_portrait.get("first_positions", {})) # 一级部位 second_positions = items_gt_score(user_portrait.get("second_positions", {})) projects = items_gt_score(user_portrait.get("projects", {})) # 项目 anecdote_tags = items_gt_score(user_portrait.get("anecdote_tags", {})) # 八卦 return { "first_demands": first_demands, "second_demands": second_demands, "first_solutions": first_solutions, "second_solutions": second_solutions, "first_positions": first_positions, "second_positions": second_positions, "projects": projects, "anecdote_tags": anecdote_tags } return {} def get_channel_tags_info(): """ tag_ids: [416, 432, 421, 423, 275, 582] return: """ sql = "SELECT name, tag_type from api_tag_3_0" results = get_data_by_mysql("172.16.30.141", 3306, "zx_str", "ZXueX58pStrage", "zhengxing", sql) first_demands_lst = [] second_demands_lst = [] first_solutions_lst = [] second_solutions_lst = [] first_positions_lst = [] second_positions_lst = [] projects_lst = [] # channels_lst = [] for i in results: name = i.get("name", "") tag_id = i.get("tag_type", -1) if tag_id == 1: projects_lst.append(name) elif tag_id == 21: first_positions_lst.append(name) elif tag_id == 22: second_positions_lst.append(name) elif tag_id == 19: first_demands_lst.append(name) elif tag_id == 20: second_demands_lst.append(name) elif tag_id == 18: first_solutions_lst.append(name) elif tag_id == 16: second_solutions_lst.append(name) # elif tag_id == 29: # channels_lst.append(name) return { "first_demands": first_demands_lst, "second_demands": second_demands_lst, "first_solutions": first_solutions_lst, "second_solutions": second_solutions_lst, "first_positions": first_positions_lst, "second_positions": second_positions_lst, "projects": projects_lst, # "channels": channels_lst } def get_device_num_from_es(word): results = es.search( index='gm-dbmw-device', doc_type='doc', timeout='10s', size=0, body={"aggs": { "NAME": { "nested": {"path": "projects"}, "aggs": { "NAME1": { "terms": {"field": "projects.name", "size": 10000} } } } } } ) tractate_content_num = results["hits"]["total"] return tractate_content_num def get_es_article_num(tag_dict): # {tag_name:(answer_content_num, tractate_content_num, diary_content_num, total_num)} article_dict = { "first_demands": [], "second_demands": [], "first_solutions": [], "second_solutions": [], "first_positions": [], "second_positions": [], "projects": [], } for tag_type in tag_dict: for tag_name in tag_dict[tag_type]: if tag_type == "projects": temp_name = "tags_v3" elif tag_type == "first_positions": temp_name = "positions" else: temp_name = tag_name body = { "query": { "bool": { "minimum_should_match": 1, "should": [], "must": [ { "term": { "is_online": True } }, { "terms": { "content_level": [6, 5, 4, 3.5, 3] } }, { "range": { "content_length": { "gte": 30 } } }] } } } body["query"]["bool"]["must"].append({"term": {temp_name: tag_name}}) try: results = es.search( index='gm-dbmw-answer-read', doc_type='answer', timeout='10s', size=0, body=body ) answer_content_num = results["hits"]["total"] except: print("answer has no %s" %tag_type) answer_content_num = 0 body = { "query": { "bool": { "minimum_should_match": 1, "should": [], "must": [{"term": {"is_online": True}}, {"terms": {"content_level": [6, 5, 4, 3.5, 3]}} ] } } } body["query"]["bool"]["must"].append({"term": {temp_name: tag_name}}) # tractate try: results = es.search( index='gm-dbmw-tractate-read', doc_type='tractate', timeout='10s', size=0, body=body ) tractate_content_num = results["hits"]["total"] except: tractate_content_num = 0 print("gm-dbmw-tractate-read has no %s" % tag_type) body = { "query": { "bool": { "minimum_should_match": 1, "should": [], "must": [{"term": {"is_online": True}}, { "term": { "has_cover": True } }, {"term": { "is_sink": False } }, { "term": { "has_after_cover": True } }, { "term": { "has_before_cover": True } }, {"range": {"content_level": {"gte": "3"}}}, { "term": { "content_simi_bol_show": 0 } } ] } }, } body["query"]["bool"]["must"].append({"term": {temp_name: tag_name}}) ###diary 日记 try: results = es.search( index='gm-dbmw-diary-read', doc_type='diary', timeout='10s', size=0, body=body ) diary_content_num = results["hits"]["total"] except: diary_content_num = 0 print("gm-dbmw-diary-read has no %s" % tag_type) total_num = answer_content_num + tractate_content_num + diary_content_num data_dic = {tag_name: (answer_content_num, tractate_content_num, diary_content_num, total_num)} print(data_dic) article_dict[tag_type].append(data_dic) return article_dict def get_data_by_mysql(host, port, user, passwd, db, sql): try: db = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, cursorclass=pymysql.cursors.DictCursor) cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() db.close() return results except: print("error2_user_portrait", traceback.format_exc()) return traceback.format_exc() def from_id_get_tag(card_id_dict): index = "" doc_type = "" query_count = { "diary":{}, "answer":{}, "tractate":{} } for card_type in card_id_dict: if card_type == "diary": index = 'gm-dbmw-diary-read' doc_type = 'diary' elif card_type == "qa": index = 'gm-dbmw-answer-read' doc_type = 'answer' elif card_type == "user_post": index = 'gm-dbmw-tractate-read' doc_type = 'tractate' for card_id in card_id_dict[card_type]: res = es.get_source(index,doc_type,card_id) # print(res) first_demands = res.get("first_demands") if res.get("first_demands") else [] second_demands = res.get("second_demands") if res.get("second_demands") else [] first_solutions = res.get("first_solutions") if res.get("first_solutions") else [] second_solutions = res.get("second_solutions") if res.get("second_solutions") else [] first_positions = res.get("first_positions") if res.get("first_positions") else [] second_positions = res.get("second_positions") if res.get("second_positions") else [] projects = res.get("projects") if res.get("projects") else [] word_count_list = first_demands + second_demands + first_solutions + second_solutions + first_positions + second_positions + projects for word in word_count_list: if word in query_count[doc_type]: query_count[doc_type][word] = (doc_type,query_count[doc_type][word][1] + 1) else: query_count[doc_type][word] = (doc_type,1) return query_count def save_data_to_csv(all_tags,user_portrait_dict,word_count_exposure): all_data = [] for tag in all_tags: data_type = "" data_count = "" diary_exposure = 0 answer_exposure = 0 tractate_exposure = 0 user_portrait = user_portrait_dict.get(tag) if user_portrait: data_type, data_count = user_portrait if word_count_exposure["diary"].get("tag"): diary_exposure = word_count_exposure["diary"].get("tag") if word_count_exposure["answer"].get("tag"): answer_exposure = word_count_exposure["answer"].get("tag") if word_count_exposure["tractate"].get("tag"): tractate_exposure = word_count_exposure["tractate"].get("tag") all_data.append((data_type,data_count,diary_exposure,answer_exposure,tractate_exposure)) print(all_data[-1]) # data = pd.DataFrame(all_data) # s = datetime.datetime.now() # ss = str(s)[0:19].replace(' ', '-').replace(':', '-') # data.to_csv('%s%sall_s2.csv' % (d, monthly_doc_type_name), encoding='gb18030', # # columns=columns # ) def parse_data(): demands_num = {} # 获取画像数 user_portrait_dict = user_portrait_scan_info() print(user_portrait_dict) # 获取全部标签 all_tags = get_channel_tags_info() print(all_tags) # 获取标签对应的日记帖子回答数 # article_num_dict = get_es_article_num(all_tags) # 获取曝光的id card_id_dict = get_card_id() print(card_id_dict) # 获取曝光id对应的标签 word_count_exposure = from_id_get_tag(card_id_dict) print(word_count_exposure) save_data_to_csv(all_tags,user_portrait_dict,word_count_exposure) if __name__ == "__main__": parse_data()