target_releaser_add.py 20.9 KB
# -*- coding:utf-8 -*-
# @Time : 2019/7/26 14:33 
# @Author : litao
# -*- coding: utf-8 -*-
"""
Created on Mon Dec 17 10:05:18 2018

@author: zhouyujiang

从csv 写入 target_releaser索引

"""
import json, re
import datetime, copy
from elasticsearch import Elasticsearch

from write_data_into_es.func_get_releaser_id import get_releaser_id
import redis
import hashlib

hosts = '172.16.32.37'
port = 9200
es = Elasticsearch(hosts=hosts, port=port)
# pool = redis.ConnectionPool(host='192.168.17.60', port=6379, db=2, decode_responses=True)
# rds = redis.Redis(connection_pool=pool)

today = datetime.datetime.now()
first_day = datetime.datetime(today.year, today.month, 1)
day_before_first_day = first_day - datetime.timedelta(1)
l_month = day_before_first_day.month
l_year = day_before_first_day.year
count = 0


def parse_line_dict(line, line_dict, blank_space_error, new_line_error, err_id_line):
    for k in line_dict:
        try:
            if " " in line_dict[k]:
                blank_space_error = blank_space_error + str(line + 2) + ","
            if "\r" in line_dict[k]:
                new_line_error = new_line_error + str(line + 2) + ","
            if "\n" in line_dict[k]:
                new_line_error = new_line_error + str(line + 2) + ","
            if "\t" in line_dict[k]:
                new_line_error = new_line_error + str(line + 2) + ","
            line_dict[k] = line_dict[k].replace("\r", "").replace("\n", "").replace("\t", "").replace(" ", "")
        except Exception as e:
            # print(e)
            continue
    return line_dict, blank_space_error, new_line_error, err_id_line


def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, update_dic={}, extra_dic={}, **kwargs):
    """

    :param file:
    :param kwargs: not_push_to_redis = True 不push到redis中
                   department : Str 所属部门
                   key_releaser: bool 用于判断是否重点发布者
                   add_departments: list 用于增加部门
                   del_departments: list 用于删除部门
                   add_project_tags: list 用于增加项目标签
                   del_project_tags: list 用于删除项目标签
                   kwargs: extra_dic 用于添加额外的信息

                   导入的csv中添加 purchase_end_time 和 is_purchased 字段
                    用于表示是否采购 purchase_end_time (%Y-%m-%d)
                    is_purchased (0/1)
    :return:
    """

    bulk_all_body = ""
    err_id_line = ""
    blank_space_error = ""
    new_line_error = ""
    error_msg_list = []
    bluk_purchase_list = []
    count = 0

    try:
        f = open(file, 'r', encoding="gb18030")
        head = f.readline()
        head_list = head.strip().split(',')
    except:
        f = file
    for line, i in enumerate(f):
        if type(file) != list:
            try:
                line_list = i.strip().split(',')
                line_dict = dict(zip(head_list, line_list))
            except:
                line_dict = f
        else:
            line_dict = i
        print(i)

        try:
            platform = line_dict['platform']
            if platform == "short_video":
                line_dict['platform'] = line_dict['releaser_platform']
                platform = line_dict['releaser_platform']
        except:
            new_line_error += str(line + 2) + ","
            continue
        line_dict, blank_space_error, new_line_error, err_id_line = parse_line_dict(line, line_dict, blank_space_error,
                                                                                    new_line_error, err_id_line)
        if "" in line_dict:
            line_dict.pop("")
        try:
            releaserUrl = line_dict['releaserUrl']
        except:
            releaserUrl = line_dict['releaserUrl']

        if extra_dic:
            line_dict.update(extra_dic)
        # import pdb;
        # pdb.set_trace()
        # print(str(get_releaser_id(platform=platform, releaserUrl=releaserUrl)))
        line_dict["releaser_id"] = get_releaser_id(platform=platform, releaserUrl=releaserUrl)

        if line_dict["releaser_id"]:
            doc_id = platform + '_' + line_dict['releaser_id']
        else:
            doc_id = platform + '_' + line_dict['releaser']
            err_id_line += str(line + 2) + ","
        find_exist = {
                "query": {
                        "bool": {
                                "filter": [
                                        {"term": {"_id": doc_id}}
                                ]
                        }
                }
        }
        if not extra_dic.get("project_tags"):
            extra_dic.pop("project_tags", 0)
        if not extra_dic.get("department_tags"):
            extra_dic.pop("department_tags", 0)

        # search_re = es.search(index='target_releasers', doc_type='doc', body=find_exist)
        # if search_re['hits']['total'] > 0:
        #     search_source = search_re['hits']['hits'][0]['_source']
        #     # print(search_source)
        #     if search_source.get("project_tags"):
        #         try:
        #             # print(kwargs.get("extra_dic"))
        #             line_dict["project_tags"].extend(search_source.get("project_tags"))
        #             line_dict["project_tags"] = list(set(line_dict["project_tags"]))
        #             search_source.pop("project_tags", 0)
        #         except Exception as e:
        #             pass
        #             # print("project_tags error", e)
        #     if search_source.get("department_tags"):
        #         try:
        #             # print(kwargs.get("extra_dic"))
        #             line_dict["department_tags"].extend(search_source.get("department_tags"))
        #             line_dict["department_tags"] = list(set(line_dict["department_tags"]))
        #             search_source.pop("department_tags", 0)
        #         except Exception as e:
        #             pass
        #             # print("project_tags error", e)
        #     if update:
        #         line_dict.update(search_source)
        #     line_dict["post_time"] = search_source.get("post_time")

        if line_dict.get("post_time"):
            pass
        else:
            line_dict['post_time'] = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)

        try:
            line_dict["releaser_id"] = get_releaser_id(platform=platform, releaserUrl=releaserUrl)
            line_dict["releaser_id_str"] = platform + "_" + line_dict["releaser_id"]
            line_dict["is_valid"] = "true"
        except:
            line_dict["releaser_id"] = ""
            line_dict["releaser_id_str"] = ""
            line_dict["is_valid"] = "false"
        if kwargs.get("post_by"):
            line_dict["post_by"] = kwargs.get("post_by")
        if not line_dict.get("project_tags"):
            line_dict["project_tags"] = []
        if not line_dict.get("department_tags"):
            line_dict["department_tags"] = []
        if line_dict.get("add_departments"):
            line_dict["department_tags"].extend(line_dict.get("add_departments"))
            line_dict["department_tags"] = list(set(line_dict["department_tags"]))
        if line_dict.get("del_departments"):
            for key in line_dict.get("del_departments"):
                try:
                    line_dict["department_tags"].remove(key)
                except:
                    continue
        if line_dict.get("add_project_tags"):
            line_dict["project_tags"].extend(line_dict.get("add_project_tags"))
            line_dict["project_tags"] = list(set(line_dict["project_tags"]))
        if line_dict.get("del_project_tags"):
            for key in line_dict.get("del_project_tags"):
                try:
                    line_dict["project_tags"].remove(key)
                except:
                    continue
        bulk_dic = {
                "releaser": line_dict.get("releaser"),
                "releaserUrl": line_dict.get("releaserUrl"),
                "platform": line_dict.get("platform"),
                "releaser_id": line_dict.get("releaser_id"),
                "releaser_id_str": line_dict.get("releaser_id_str"),
                "post_by": line_dict.get("post_by"),
                "post_time": line_dict.get("post_time"),
                "frequency": 3 if line_dict.get("project_tags") else 1,
                "key_releaser": line_dict.get("key_releaser"),
                "is_valid": line_dict.get("is_valid"),
                "has_data": line_dict.get("has_data") if line_dict.get("has_data") else 0,
                "project_tags": line_dict.get("project_tags"),
                "department_tags": line_dict.get("department_tags"),
                'timestamp': int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000),
                'media_type': line_dict.get("media_type") if line_dict.get("media_type") else "",
                'releaser_type': line_dict.get("releaser_type") if line_dict.get("releaser_type") else "",

        }

        bulk_head = '{"index": {"_id":"%s"}}' % doc_id
        # if push_to_redis:
        #     rds.lpush("releaser_doc_id_list", doc_id)
        data_str = json.dumps(bulk_dic, ensure_ascii=False)
        bulk_one_body = bulk_head + '\n' + data_str + '\n'
        #        print(bulk_one_body)
        bulk_all_body += bulk_one_body
        count = count + 1
        if count % 500 == 0:
            eror_dic = es.bulk(index='target_releasers',
                               body=bulk_all_body)
            bulk_all_body = ''
            if eror_dic['errors'] is True:
                print(eror_dic)
    if bulk_all_body != '':
        eror_dic = es.bulk(body=bulk_all_body,
                           index='target_releasers',
                           )
        if eror_dic['errors'] is True:
            print(eror_dic)
    error_msg_list.append("%s条 写入成功" % count)
    if err_id_line:
        error_msg_list.append("第%s行 releaserUrl错误" % err_id_line[:-1])
    if blank_space_error:
        error_msg_list.append("第%s行 发现存在空格" % blank_space_error[:-1])
    if new_line_error:
        error_msg_list.append("第%s行 发现存在换行符" % new_line_error[:-1])
    return error_msg_list


if __name__ == "__main__":
    data_list =  [
        {"releaserUrl": "https://weibo.com/u/1764615662", "releaser": "娱乐圈贵妃", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/3662247177", "releaser": "捞娱君", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2378564111", "releaser": "娱乐扒皮", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2983578965", "releaser": "娱乐圈小青年", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/3938976579", "releaser": "娱乐捞饭", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/6511177474", "releaser": "小组吃瓜蜀黍", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/6343916471", "releaser": "圈内老顽童", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/6511177474", "releaser": "八组吃瓜蜀黍", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2921603920", "releaser": "娱乐圈新鲜事", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/6470919752", "releaser": "伊丽莎白骨精啊", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2653906910?refer_flag=1001030103_&is_hot=1", "releaser": "娱乐榜姐",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/3115996363?is_hot=1", "releaser": "娱乐星事", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005053212093237/home?from=page_100505&mod=TAB#place", "releaser": "星探扒皮",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/3926129482", "releaser": "星闻追踪", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5509337969?is_hot=1", "releaser": "卦哥娱乐", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5477320351", "releaser": "圈内扒爷", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005055634795408/home?from=page_100505&mod=TAB#place", "releaser": "圈八戒 ",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/6511173721", "releaser": "圈内课代表", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005055471534537/home?from=page_100505&mod=TAB#place", "releaser": "娱闻少女",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/3193443435", "releaser": "圈太妹", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2022990945", "releaser": "圈内狙击手", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1809782810?is_all=1", "releaser": "全娱乐爆料", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5157190426?is_all=1", "releaser": "娱乐扒少", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2125613987?is_all=1", "releaser": "圈内一把手 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005051948622644/home?from=page_100505&mod=TAB#place",
          "releaser": "影视圈扒姐 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2611791490", "releaser": "娱评八公", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1652840683", "releaser": "追星", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5086098727?is_hot=1", "releaser": "闻娱教主", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5101787982?is_all=1", "releaser": "扒婆说", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5101844765?is_hot=1", "releaser": "星娱客 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005052115034114/home?from=page_100505&mod=TAB#place",
          "releaser": "娱乐明星团 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/6473952993?is_hot=1", "releaser": "偶像日报", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5106602573?is_hot=1", "releaser": "八哥", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5909342713?", "releaser": "圈内教父", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/3200673035?", "releaser": "扒圈老鬼", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005055965621313/home?from=page_100505&mod=TAB#place", "releaser": "圈内师爷",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1915749764?is_hot=1", "releaser": "迷妹速报", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1002061836328652/home?from=page_100206&mod=TAB#place", "releaser": "前线娱乐",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5896207859?is_hot=1", "releaser": "娱记者", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5717515328?is_hot=1", "releaser": "娱老汉", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005051795994180/home?from=page_100505&mod=TAB#place",
          "releaser": "娱乐News", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5978818414?is_hot=1", "releaser": "娱圈蜀黍", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2489917511?is_hot=1", "releaser": "芒果捞扒婆 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5279487569?is_hot=1", "releaser": "娱姐速报 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5106602573?is_hot=1", "releaser": "八哥 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5323541229?profile_ftype=1&is_all=1#_0", "releaser": "国内外白富美揭秘 ",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1003062512591982/home?from=page_100306&mod=TAB#place", "releaser": "圈少爷",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2821843050?profile_ftype=1&is_all=1#_0", "releaser": "圈内老鬼",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/3028215832?profile_ftype=1&is_all=1#_0", "releaser": "娱扒爷",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5336756846?profile_ftype=1&is_all=1#_0", "releaser": "兔兔热议",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005051844235935/home?from=page_100505&mod=TAB#place",
          "releaser": "娱乐圈外汉", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005052586409491/home?from=page_100505&mod=TAB#place",
          "releaser": "娱乐圈吃瓜指南 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5255814135", "releaser": "八组兔区爆料", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2871033210?is_hot=1", "releaser": "八组兔区热议 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005052813285937/home?from=page_100505&mod=TAB#place",
          "releaser": "八组兔区娱乐圈", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005052831749482/home?from=page_100505&mod=TAB#place",
          "releaser": "八组兔区揭秘", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/2709814831", "releaser": "娱大蜀黍", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5634795408", "releaser": "圈八戒", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5176743404", "releaser": "瓜瓜搬运机", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5039775130", "releaser": "娱乐揭秘蜀黍", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/7123521074", "releaser": "饭圈日报", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1746658980", "releaser": "饭圈阿姨", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005052453653365/home?from=page_100505&mod=TAB#place", "releaser": "圈内星探",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/6311417880?profile_ftype=1&is_all=1#_0", "releaser": "星扒婆 ",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1420816495?profile_ftype=1&is_all=1#_0", "releaser": "娱尾纹",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1974754790", "releaser": "教父娱乐", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1818950785?refer_flag=1028035010_&is_hot=1", "releaser": "扒圈有鱼",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1893711543", "releaser": "娱乐有饭", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1002061653255165/home?from=page_100206&mod=TAB#place",
          "releaser": "娱乐日爆社", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005052391322817/home?from=page_100505&mod=TAB#place", "releaser": "小娱乐家",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1003061994712500/home?from=page_100306&mod=TAB#place",
          "releaser": "星扒客push", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5700087877", "releaser": "毒舌八卦", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/3779202361", "releaser": "西皮娱乐", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1632619962", "releaser": "瓜组新鲜事", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005052103460752/home?from=page_100505&mod=TAB#place", "releaser": "娱嬷嬷 ",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/5874584452", "releaser": "吃瓜鹅每日搬", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005052397961280/home?from=page_100505&mod=TAB#place", "releaser": "娱大白",
          "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005053246379064/home?from=page_100505&mod=TAB#place",
          "releaser": "娱乐圈扒姐 ", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/u/1830483711", "releaser": "娱乐女记", "platform": "weibo"},
         {"releaserUrl": "https://weibo.com/p/1005053847401640/home?from=page_100505&mod=TAB#place",
          "releaser": "吃瓜爆料每日搬 ", "platform": "weibo"},
        {"releaserUrl": "https://www.douban.com/people/hot_tag",
         "releaser": "hot_tag", "platform": "douban"},
        {"releaserUrl": "https://www.douban.com/people/new_tag",
         "releaser": "new_tag", "platform": "douban"}
    ]
    extra_dic = {
            "department_tags":["策略组"],
             'key_releaser': True,
             'frequency': 3,
    }

    # csv_type = {"SMG": [], "an_hui": [], "ronghe": [], "su_zhou": []}
    #ronghe_releaser_write_es(file, post_by="litao")
    write_to_es(data_list, post_by="litao", extra_dic=extra_dic, push_to_redis=False)