import datetime import json import elasticsearch from elasticsearch.helpers import scan hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) class ReleaseMeta: def __init__(self, scan, index, doc_type): self.scan = scan self.index = index self.doc_type = doc_type self.releaser = 'releaser' self.releaser_id = 'releaser_id_str' self.releaserUrl = 'releaserUrl' self.write_releaser = self.releaser self.write_releaser_id = self.releaser_id self.write_releaserUrl = self.releaserUrl def func_write_release_meta_into_es(self): exist_set = set() count = 0 bulk_all_body = '' for one in self.scan: tmp_dict = {} line = one['_source'] try: releaser_id = line[self.releaser_id] except: continue if releaser_id == '': continue if releaser_id in exist_set: continue else: exist_set.add(releaser_id) count += 1 releaser_name = line[self.releaser] try: releaserUrl = line[self.releaserUrl] except KeyError: pass timestamp = int(datetime.datetime.now().timestamp()*1000) platform = line['platform'] tmp_dict[self.write_releaser_id] = releaser_id try: tmp_dict[self.write_releaserUrl] = releaserUrl except: pass tmp_dict[self.write_releaser] = releaser_name tmp_dict['platform'] = platform tmp_dict['timestamp'] = timestamp doc_id = releaser_id bulk_head = '{"index": {"_id":"%s"}}' % doc_id bulk_body = json.dumps(tmp_dict, ensure_ascii=False) bulk_one_body = bulk_head + '\n' + bulk_body + '\n' bulk_all_body += bulk_one_body if count % 1000 == 0: print(count) eror_dic = es.bulk(index=self.index, doc_type=self.doc_type, body=bulk_all_body, request_timeout=200) bulk_all_body = '' if eror_dic['errors'] is True: print(eror_dic['items']) if bulk_all_body != '': eror_dic = es.bulk(index=self.index, doc_type=self.doc_type, body=bulk_all_body, request_timeout=200) if eror_dic['errors'] is True: print(eror_dic) class FansMeta(ReleaseMeta): def __init__(self, scan, index, doc_type): super().__init__(scan, index, doc_type) self.releaser = 'releaser' self.releaser_id = 'releaser_id_str' self.releaserUrl = 'releaser_followers_count' class WeiBoReleaserMeta(ReleaseMeta): def __init__(self, scan, index, doc_type): super().__init__(scan, index, doc_type) self.releaser = 'wb_name' self.releaser_id = 'UID' self.releaserUrl = 'releaserUrl' self.write_releaser = "wb_name" self.write_releaser_id = "UID" self.write_releaserUrl = self.releaserUrl class WeiBoFansMeta(ReleaseMeta): def __init__(self, scan, index, doc_type): super().__init__(scan, index, doc_type) self.releaser = 'releaser' self.releaser_id = 'releaser_id_str' self.releaserUrl = 'releaser_followers_count' self.write_releaser = "wb_name" self.write_releaser_id = "UID" self.write_releaserUrl = self.releaserUrl if __name__ == '__main__': # 短视频releaser测试 search_body = { "query": { "bool": { "filter": [ {"term": {"data_month": 11}}, {"term": {"data_year": 2019}} ] } } } scan_re = scan(client=es, index='releaser', doc_type='releasers', query=search_body) test = ReleaseMeta(scan=scan_re, index='releaser_meta', doc_type='doc') test.func_write_release_meta_into_es() # # 短视频粉丝量测试 # search_body = { # "query": { # "bool": { # "filter": [ # {"term": {"data_month": 11}}, # {"term": {"data_day": 28}}, # {"term": {"data_year": 2019}}, # ], # "must_not": [ # {"term": {"platform.keyword": "weibo"}} # ] # } # } # } # scan_re = scan(client=es, index='releaser_fans', doc_type='doc', query=search_body) # test2 = FansMeta(scan=scan_re, index='releaser_fans_latest-v1', doc_type='doc') # test2.func_write_release_meta_into_es() # 微博releaser 测试 # search_body = { # "query": { # "bool": { # "filter": [ # {"term": {"data_month": 11}}, # {"term": {"data_year": 2019}} # ] # } # } # } # scan_re = scan(client=es, index='ronghe_weibo_releaser', doc_type='doc', query=search_body) # test = WeiBoReleaserMeta(scan=scan_re, index='releaser_meta', doc_type='doc') # test.func_write_release_meta_into_es() # 微博粉丝量测试 # search_body = { # "query": { # "bool": { # "filter": [ # {"term": {"data_month": 12}}, # {"term": {"data_day": 1}}, # {"term": {"data_year": 2019}}, # {"term": {"platform.keyword": "weibo"}} # ] # } # } # } # scan_re = scan(client=es, index='releaser_fans', doc_type='doc', query=search_body) # test = WeiBoFansMeta(scan=scan_re, index='releaser_fans_latest-v1', doc_type='doc') # test.func_write_release_meta_into_es()