# -*- coding:utf-8 -*- # @Time : 2019/7/26 14:33 # @Author : litao # -*- coding: utf-8 -*- """ Created on Mon Dec 17 10:05:18 2018 @author: zhouyujiang 从csv 写入 target_releaser索引 """ import json, re import datetime, copy from elasticsearch import Elasticsearch from write_data_into_es.func_get_releaser_id import get_releaser_id import redis import hashlib hosts = '172.16.32.37' port = 9200 es = Elasticsearch(hosts=hosts, port=port) # pool = redis.ConnectionPool(host='192.168.17.60', port=6379, db=2, decode_responses=True) # rds = redis.Redis(connection_pool=pool) today = datetime.datetime.now() first_day = datetime.datetime(today.year, today.month, 1) day_before_first_day = first_day - datetime.timedelta(1) l_month = day_before_first_day.month l_year = day_before_first_day.year count = 0 def parse_line_dict(line, line_dict, blank_space_error, new_line_error, err_id_line): for k in line_dict: try: if " " in line_dict[k]: blank_space_error = blank_space_error + str(line + 2) + "," if "\r" in line_dict[k]: new_line_error = new_line_error + str(line + 2) + "," if "\n" in line_dict[k]: new_line_error = new_line_error + str(line + 2) + "," if "\t" in line_dict[k]: new_line_error = new_line_error + str(line + 2) + "," line_dict[k] = line_dict[k].replace("\r", "").replace("\n", "").replace("\t", "").replace(" ", "") except Exception as e: # print(e) continue return line_dict, blank_space_error, new_line_error, err_id_line def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, update_dic={}, extra_dic={}, **kwargs): """ :param file: :param kwargs: not_push_to_redis = True 不push到redis中 department : Str 所属部门 key_releaser: bool 用于判断是否重点发布者 add_departments: list 用于增加部门 del_departments: list 用于删除部门 add_project_tags: list 用于增加项目标签 del_project_tags: list 用于删除项目标签 kwargs: extra_dic 用于添加额外的信息 导入的csv中添加 purchase_end_time 和 is_purchased 字段 用于表示是否采购 purchase_end_time (%Y-%m-%d) is_purchased (0/1) :return: """ bulk_all_body = "" err_id_line = "" blank_space_error = "" new_line_error = "" error_msg_list = [] bluk_purchase_list = [] count = 0 try: f = open(file, 'r', encoding="gb18030") head = f.readline() head_list = head.strip().split(',') except: f = file for line, i in enumerate(f): if type(file) != list: try: line_list = i.strip().split(',') line_dict = dict(zip(head_list, line_list)) except: line_dict = f else: line_dict = i print(i) try: platform = line_dict['platform'] if platform == "short_video": line_dict['platform'] = line_dict['releaser_platform'] platform = line_dict['releaser_platform'] except: new_line_error += str(line + 2) + "," continue line_dict, blank_space_error, new_line_error, err_id_line = parse_line_dict(line, line_dict, blank_space_error, new_line_error, err_id_line) if "" in line_dict: line_dict.pop("") try: releaserUrl = line_dict['releaserUrl'] except: releaserUrl = line_dict['releaserUrl'] if extra_dic: line_dict.update(extra_dic) # import pdb; # pdb.set_trace() # print(str(get_releaser_id(platform=platform, releaserUrl=releaserUrl))) line_dict["releaser_id"] = get_releaser_id(platform=platform, releaserUrl=releaserUrl) if line_dict["releaser_id"]: doc_id = platform + '_' + line_dict['releaser_id'] else: doc_id = platform + '_' + line_dict['releaser'] err_id_line += str(line + 2) + "," find_exist = { "query": { "bool": { "filter": [ {"term": {"_id": doc_id}} ] } } } if not extra_dic.get("project_tags"): extra_dic.pop("project_tags", 0) if not extra_dic.get("department_tags"): extra_dic.pop("department_tags", 0) # search_re = es.search(index='target_releasers', doc_type='doc', body=find_exist) # if search_re['hits']['total'] > 0: # search_source = search_re['hits']['hits'][0]['_source'] # # print(search_source) # if search_source.get("project_tags"): # try: # # print(kwargs.get("extra_dic")) # line_dict["project_tags"].extend(search_source.get("project_tags")) # line_dict["project_tags"] = list(set(line_dict["project_tags"])) # search_source.pop("project_tags", 0) # except Exception as e: # pass # # print("project_tags error", e) # if search_source.get("department_tags"): # try: # # print(kwargs.get("extra_dic")) # line_dict["department_tags"].extend(search_source.get("department_tags")) # line_dict["department_tags"] = list(set(line_dict["department_tags"])) # search_source.pop("department_tags", 0) # except Exception as e: # pass # # print("project_tags error", e) # if update: # line_dict.update(search_source) # line_dict["post_time"] = search_source.get("post_time") if line_dict.get("post_time"): pass else: line_dict['post_time'] = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) try: line_dict["releaser_id"] = get_releaser_id(platform=platform, releaserUrl=releaserUrl) line_dict["releaser_id_str"] = platform + "_" + line_dict["releaser_id"] line_dict["is_valid"] = "true" except: line_dict["releaser_id"] = "" line_dict["releaser_id_str"] = "" line_dict["is_valid"] = "false" if kwargs.get("post_by"): line_dict["post_by"] = kwargs.get("post_by") if not line_dict.get("project_tags"): line_dict["project_tags"] = [] if not line_dict.get("department_tags"): line_dict["department_tags"] = [] if line_dict.get("add_departments"): line_dict["department_tags"].extend(line_dict.get("add_departments")) line_dict["department_tags"] = list(set(line_dict["department_tags"])) if line_dict.get("del_departments"): for key in line_dict.get("del_departments"): try: line_dict["department_tags"].remove(key) except: continue if line_dict.get("add_project_tags"): line_dict["project_tags"].extend(line_dict.get("add_project_tags")) line_dict["project_tags"] = list(set(line_dict["project_tags"])) if line_dict.get("del_project_tags"): for key in line_dict.get("del_project_tags"): try: line_dict["project_tags"].remove(key) except: continue bulk_dic = { "releaser": line_dict.get("releaser"), "releaserUrl": line_dict.get("releaserUrl"), "platform": line_dict.get("platform"), "releaser_id": line_dict.get("releaser_id"), "releaser_id_str": line_dict.get("releaser_id_str"), "post_by": line_dict.get("post_by"), "post_time": line_dict.get("post_time"), "frequency": 3 if line_dict.get("project_tags") else 1, "key_releaser": line_dict.get("key_releaser"), "is_valid": line_dict.get("is_valid"), "has_data": line_dict.get("has_data") if line_dict.get("has_data") else 0, "project_tags": line_dict.get("project_tags"), "department_tags": line_dict.get("department_tags"), 'timestamp': int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000), 'media_type': line_dict.get("media_type") if line_dict.get("media_type") else "", 'releaser_type': line_dict.get("releaser_type") if line_dict.get("releaser_type") else "", } bulk_head = '{"index": {"_id":"%s"}}' % doc_id # if push_to_redis: # rds.lpush("releaser_doc_id_list", doc_id) data_str = json.dumps(bulk_dic, ensure_ascii=False) bulk_one_body = bulk_head + '\n' + data_str + '\n' # print(bulk_one_body) bulk_all_body += bulk_one_body count = count + 1 if count % 500 == 0: eror_dic = es.bulk(index='target_releasers', body=bulk_all_body) bulk_all_body = '' if eror_dic['errors'] is True: print(eror_dic) if bulk_all_body != '': eror_dic = es.bulk(body=bulk_all_body, index='target_releasers', ) if eror_dic['errors'] is True: print(eror_dic) error_msg_list.append("%s条 写入成功" % count) if err_id_line: error_msg_list.append("第%s行 releaserUrl错误" % err_id_line[:-1]) if blank_space_error: error_msg_list.append("第%s行 发现存在空格" % blank_space_error[:-1]) if new_line_error: error_msg_list.append("第%s行 发现存在换行符" % new_line_error[:-1]) return error_msg_list if __name__ == "__main__": data_list = [ {"releaserUrl": "https://weibo.com/u/1764615662", "releaser": "娱乐圈贵妃", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/3662247177", "releaser": "捞娱君", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2378564111", "releaser": "娱乐扒皮", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2983578965", "releaser": "娱乐圈小青年", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/3938976579", "releaser": "娱乐捞饭", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/6511177474", "releaser": "小组吃瓜蜀黍", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/6343916471", "releaser": "圈内老顽童", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/6511177474", "releaser": "八组吃瓜蜀黍", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2921603920", "releaser": "娱乐圈新鲜事", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/6470919752", "releaser": "伊丽莎白骨精啊", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2653906910?refer_flag=1001030103_&is_hot=1", "releaser": "娱乐榜姐", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/3115996363?is_hot=1", "releaser": "娱乐星事", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005053212093237/home?from=page_100505&mod=TAB#place", "releaser": "星探扒皮", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/3926129482", "releaser": "星闻追踪", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5509337969?is_hot=1", "releaser": "卦哥娱乐", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5477320351", "releaser": "圈内扒爷", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005055634795408/home?from=page_100505&mod=TAB#place", "releaser": "圈八戒 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/6511173721", "releaser": "圈内课代表", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005055471534537/home?from=page_100505&mod=TAB#place", "releaser": "娱闻少女", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/3193443435", "releaser": "圈太妹", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2022990945", "releaser": "圈内狙击手", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1809782810?is_all=1", "releaser": "全娱乐爆料", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5157190426?is_all=1", "releaser": "娱乐扒少", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2125613987?is_all=1", "releaser": "圈内一把手 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005051948622644/home?from=page_100505&mod=TAB#place", "releaser": "影视圈扒姐 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2611791490", "releaser": "娱评八公", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1652840683", "releaser": "追星", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5086098727?is_hot=1", "releaser": "闻娱教主", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5101787982?is_all=1", "releaser": "扒婆说", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5101844765?is_hot=1", "releaser": "星娱客 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005052115034114/home?from=page_100505&mod=TAB#place", "releaser": "娱乐明星团 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/6473952993?is_hot=1", "releaser": "偶像日报", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5106602573?is_hot=1", "releaser": "八哥", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5909342713?", "releaser": "圈内教父", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/3200673035?", "releaser": "扒圈老鬼", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005055965621313/home?from=page_100505&mod=TAB#place", "releaser": "圈内师爷", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1915749764?is_hot=1", "releaser": "迷妹速报", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1002061836328652/home?from=page_100206&mod=TAB#place", "releaser": "前线娱乐", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5896207859?is_hot=1", "releaser": "娱记者", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5717515328?is_hot=1", "releaser": "娱老汉", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005051795994180/home?from=page_100505&mod=TAB#place", "releaser": "娱乐News", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5978818414?is_hot=1", "releaser": "娱圈蜀黍", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2489917511?is_hot=1", "releaser": "芒果捞扒婆 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5279487569?is_hot=1", "releaser": "娱姐速报 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5106602573?is_hot=1", "releaser": "八哥 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5323541229?profile_ftype=1&is_all=1#_0", "releaser": "国内外白富美揭秘 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1003062512591982/home?from=page_100306&mod=TAB#place", "releaser": "圈少爷", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2821843050?profile_ftype=1&is_all=1#_0", "releaser": "圈内老鬼", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/3028215832?profile_ftype=1&is_all=1#_0", "releaser": "娱扒爷", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5336756846?profile_ftype=1&is_all=1#_0", "releaser": "兔兔热议", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005051844235935/home?from=page_100505&mod=TAB#place", "releaser": "娱乐圈外汉", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005052586409491/home?from=page_100505&mod=TAB#place", "releaser": "娱乐圈吃瓜指南 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5255814135", "releaser": "八组兔区爆料", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2871033210?is_hot=1", "releaser": "八组兔区热议 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005052813285937/home?from=page_100505&mod=TAB#place", "releaser": "八组兔区娱乐圈", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005052831749482/home?from=page_100505&mod=TAB#place", "releaser": "八组兔区揭秘", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/2709814831", "releaser": "娱大蜀黍", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5634795408", "releaser": "圈八戒", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5176743404", "releaser": "瓜瓜搬运机", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5039775130", "releaser": "娱乐揭秘蜀黍", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/7123521074", "releaser": "饭圈日报", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1746658980", "releaser": "饭圈阿姨", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005052453653365/home?from=page_100505&mod=TAB#place", "releaser": "圈内星探", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/6311417880?profile_ftype=1&is_all=1#_0", "releaser": "星扒婆 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1420816495?profile_ftype=1&is_all=1#_0", "releaser": "娱尾纹", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1974754790", "releaser": "教父娱乐", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1818950785?refer_flag=1028035010_&is_hot=1", "releaser": "扒圈有鱼", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1893711543", "releaser": "娱乐有饭", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1002061653255165/home?from=page_100206&mod=TAB#place", "releaser": "娱乐日爆社", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005052391322817/home?from=page_100505&mod=TAB#place", "releaser": "小娱乐家", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1003061994712500/home?from=page_100306&mod=TAB#place", "releaser": "星扒客push", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5700087877", "releaser": "毒舌八卦", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/3779202361", "releaser": "西皮娱乐", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1632619962", "releaser": "瓜组新鲜事", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005052103460752/home?from=page_100505&mod=TAB#place", "releaser": "娱嬷嬷 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/5874584452", "releaser": "吃瓜鹅每日搬", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005052397961280/home?from=page_100505&mod=TAB#place", "releaser": "娱大白", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005053246379064/home?from=page_100505&mod=TAB#place", "releaser": "娱乐圈扒姐 ", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/u/1830483711", "releaser": "娱乐女记", "platform": "weibo"}, {"releaserUrl": "https://weibo.com/p/1005053847401640/home?from=page_100505&mod=TAB#place", "releaser": "吃瓜爆料每日搬 ", "platform": "weibo"}, {"releaserUrl": "https://www.douban.com/people/hot_tag", "releaser": "hot_tag", "platform": "douban"}, {"releaserUrl": "https://www.douban.com/people/new_tag", "releaser": "new_tag", "platform": "douban"} ] extra_dic = { "department_tags":["策略组"], 'key_releaser': True, 'frequency': 3, } # csv_type = {"SMG": [], "an_hui": [], "ronghe": [], "su_zhou": []} #ronghe_releaser_write_es(file, post_by="litao") write_to_es(data_list, post_by="litao", extra_dic=extra_dic, push_to_redis=False)