1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# -*- coding:utf-8 -*-
# @Time : 2019/7/26 14:33
# @Author : litao
# -*- coding: utf-8 -*-
"""
Created on Mon Dec 17 10:05:18 2018
@author: zhouyujiang
从csv 写入 target_releaser索引
"""
import json, re
import datetime, copy
from elasticsearch import Elasticsearch
from write_data_into_es.func_get_releaser_id import get_releaser_id
import redis
import hashlib
hosts = '172.16.32.37'
port = 9200
es = Elasticsearch(hosts=hosts, port=port)
# pool = redis.ConnectionPool(host='192.168.17.60', port=6379, db=2, decode_responses=True)
# rds = redis.Redis(connection_pool=pool)
today = datetime.datetime.now()
first_day = datetime.datetime(today.year, today.month, 1)
day_before_first_day = first_day - datetime.timedelta(1)
l_month = day_before_first_day.month
l_year = day_before_first_day.year
count = 0
def parse_line_dict(line, line_dict, blank_space_error, new_line_error, err_id_line):
for k in line_dict:
try:
if " " in line_dict[k]:
blank_space_error = blank_space_error + str(line + 2) + ","
if "\r" in line_dict[k]:
new_line_error = new_line_error + str(line + 2) + ","
if "\n" in line_dict[k]:
new_line_error = new_line_error + str(line + 2) + ","
if "\t" in line_dict[k]:
new_line_error = new_line_error + str(line + 2) + ","
line_dict[k] = line_dict[k].replace("\r", "").replace("\n", "").replace("\t", "").replace(" ", "")
except Exception as e:
# print(e)
continue
return line_dict, blank_space_error, new_line_error, err_id_line
def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, update_dic={}, extra_dic={}, **kwargs):
"""
:param file:
:param kwargs: not_push_to_redis = True 不push到redis中
department : Str 所属部门
key_releaser: bool 用于判断是否重点发布者
add_departments: list 用于增加部门
del_departments: list 用于删除部门
add_project_tags: list 用于增加项目标签
del_project_tags: list 用于删除项目标签
kwargs: extra_dic 用于添加额外的信息
导入的csv中添加 purchase_end_time 和 is_purchased 字段
用于表示是否采购 purchase_end_time (%Y-%m-%d)
is_purchased (0/1)
:return:
"""
bulk_all_body = ""
err_id_line = ""
blank_space_error = ""
new_line_error = ""
error_msg_list = []
bluk_purchase_list = []
count = 0
try:
f = open(file, 'r', encoding="gb18030")
head = f.readline()
head_list = head.strip().split(',')
except:
f = file
for line, i in enumerate(f):
if type(file) != list:
try:
line_list = i.strip().split(',')
line_dict = dict(zip(head_list, line_list))
except:
line_dict = f
else:
line_dict = i
print(i)
try:
platform = line_dict['platform']
if platform == "short_video":
line_dict['platform'] = line_dict['releaser_platform']
platform = line_dict['releaser_platform']
except:
new_line_error += str(line + 2) + ","
continue
line_dict, blank_space_error, new_line_error, err_id_line = parse_line_dict(line, line_dict, blank_space_error,
new_line_error, err_id_line)
if "" in line_dict:
line_dict.pop("")
try:
releaserUrl = line_dict['releaserUrl']
except:
releaserUrl = line_dict['releaserUrl']
if extra_dic:
line_dict.update(extra_dic)
# import pdb;
# pdb.set_trace()
# print(str(get_releaser_id(platform=platform, releaserUrl=releaserUrl)))
line_dict["releaser_id"] = get_releaser_id(platform=platform, releaserUrl=releaserUrl)
if line_dict["releaser_id"]:
doc_id = platform + '_' + line_dict['releaser_id']
else:
doc_id = platform + '_' + line_dict['releaser']
err_id_line += str(line + 2) + ","
find_exist = {
"query": {
"bool": {
"filter": [
{"term": {"_id": doc_id}}
]
}
}
}
if not extra_dic.get("project_tags"):
extra_dic.pop("project_tags", 0)
if not extra_dic.get("department_tags"):
extra_dic.pop("department_tags", 0)
# search_re = es.search(index='target_releasers', doc_type='doc', body=find_exist)
# if search_re['hits']['total'] > 0:
# search_source = search_re['hits']['hits'][0]['_source']
# # print(search_source)
# if search_source.get("project_tags"):
# try:
# # print(kwargs.get("extra_dic"))
# line_dict["project_tags"].extend(search_source.get("project_tags"))
# line_dict["project_tags"] = list(set(line_dict["project_tags"]))
# search_source.pop("project_tags", 0)
# except Exception as e:
# pass
# # print("project_tags error", e)
# if search_source.get("department_tags"):
# try:
# # print(kwargs.get("extra_dic"))
# line_dict["department_tags"].extend(search_source.get("department_tags"))
# line_dict["department_tags"] = list(set(line_dict["department_tags"]))
# search_source.pop("department_tags", 0)
# except Exception as e:
# pass
# # print("project_tags error", e)
# if update:
# line_dict.update(search_source)
# line_dict["post_time"] = search_source.get("post_time")
if line_dict.get("post_time"):
pass
else:
line_dict['post_time'] = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
try:
line_dict["releaser_id"] = get_releaser_id(platform=platform, releaserUrl=releaserUrl)
line_dict["releaser_id_str"] = platform + "_" + line_dict["releaser_id"]
line_dict["is_valid"] = "true"
except:
line_dict["releaser_id"] = ""
line_dict["releaser_id_str"] = ""
line_dict["is_valid"] = "false"
if kwargs.get("post_by"):
line_dict["post_by"] = kwargs.get("post_by")
if not line_dict.get("project_tags"):
line_dict["project_tags"] = []
if not line_dict.get("department_tags"):
line_dict["department_tags"] = []
if line_dict.get("add_departments"):
line_dict["department_tags"].extend(line_dict.get("add_departments"))
line_dict["department_tags"] = list(set(line_dict["department_tags"]))
if line_dict.get("del_departments"):
for key in line_dict.get("del_departments"):
try:
line_dict["department_tags"].remove(key)
except:
continue
if line_dict.get("add_project_tags"):
line_dict["project_tags"].extend(line_dict.get("add_project_tags"))
line_dict["project_tags"] = list(set(line_dict["project_tags"]))
if line_dict.get("del_project_tags"):
for key in line_dict.get("del_project_tags"):
try:
line_dict["project_tags"].remove(key)
except:
continue
bulk_dic = {
"releaser": line_dict.get("releaser"),
"releaserUrl": line_dict.get("releaserUrl"),
"platform": line_dict.get("platform"),
"releaser_id": line_dict.get("releaser_id"),
"releaser_id_str": line_dict.get("releaser_id_str"),
"post_by": line_dict.get("post_by"),
"post_time": line_dict.get("post_time"),
"frequency": 3 if line_dict.get("project_tags") else 1,
"key_releaser": line_dict.get("key_releaser"),
"is_valid": line_dict.get("is_valid"),
"has_data": line_dict.get("has_data") if line_dict.get("has_data") else 0,
"project_tags": line_dict.get("project_tags"),
"department_tags": line_dict.get("department_tags"),
'timestamp': int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000),
'media_type': line_dict.get("media_type") if line_dict.get("media_type") else "",
'releaser_type': line_dict.get("releaser_type") if line_dict.get("releaser_type") else "",
}
bulk_head = '{"index": {"_id":"%s"}}' % doc_id
# if push_to_redis:
# rds.lpush("releaser_doc_id_list", doc_id)
data_str = json.dumps(bulk_dic, ensure_ascii=False)
bulk_one_body = bulk_head + '\n' + data_str + '\n'
# print(bulk_one_body)
bulk_all_body += bulk_one_body
count = count + 1
if count % 500 == 0:
eror_dic = es.bulk(index='target_releasers',
body=bulk_all_body)
bulk_all_body = ''
if eror_dic['errors'] is True:
print(eror_dic)
if bulk_all_body != '':
eror_dic = es.bulk(body=bulk_all_body,
index='target_releasers',
)
if eror_dic['errors'] is True:
print(eror_dic)
error_msg_list.append("%s条 写入成功" % count)
if err_id_line:
error_msg_list.append("第%s行 releaserUrl错误" % err_id_line[:-1])
if blank_space_error:
error_msg_list.append("第%s行 发现存在空格" % blank_space_error[:-1])
if new_line_error:
error_msg_list.append("第%s行 发现存在换行符" % new_line_error[:-1])
return error_msg_list
if __name__ == "__main__":
data_list = [
{"releaserUrl": "https://weibo.com/u/1764615662", "releaser": "娱乐圈贵妃", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/3662247177", "releaser": "捞娱君", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2378564111", "releaser": "娱乐扒皮", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2983578965", "releaser": "娱乐圈小青年", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/3938976579", "releaser": "娱乐捞饭", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/6511177474", "releaser": "小组吃瓜蜀黍", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/6343916471", "releaser": "圈内老顽童", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/6511177474", "releaser": "八组吃瓜蜀黍", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2921603920", "releaser": "娱乐圈新鲜事", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/6470919752", "releaser": "伊丽莎白骨精啊", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2653906910?refer_flag=1001030103_&is_hot=1", "releaser": "娱乐榜姐",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/3115996363?is_hot=1", "releaser": "娱乐星事", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005053212093237/home?from=page_100505&mod=TAB#place", "releaser": "星探扒皮",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/3926129482", "releaser": "星闻追踪", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5509337969?is_hot=1", "releaser": "卦哥娱乐", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5477320351", "releaser": "圈内扒爷", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005055634795408/home?from=page_100505&mod=TAB#place", "releaser": "圈八戒 ",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/6511173721", "releaser": "圈内课代表", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005055471534537/home?from=page_100505&mod=TAB#place", "releaser": "娱闻少女",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/3193443435", "releaser": "圈太妹", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2022990945", "releaser": "圈内狙击手", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1809782810?is_all=1", "releaser": "全娱乐爆料", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5157190426?is_all=1", "releaser": "娱乐扒少", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2125613987?is_all=1", "releaser": "圈内一把手 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005051948622644/home?from=page_100505&mod=TAB#place",
"releaser": "影视圈扒姐 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2611791490", "releaser": "娱评八公", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1652840683", "releaser": "追星", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5086098727?is_hot=1", "releaser": "闻娱教主", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5101787982?is_all=1", "releaser": "扒婆说", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5101844765?is_hot=1", "releaser": "星娱客 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005052115034114/home?from=page_100505&mod=TAB#place",
"releaser": "娱乐明星团 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/6473952993?is_hot=1", "releaser": "偶像日报", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5106602573?is_hot=1", "releaser": "八哥", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5909342713?", "releaser": "圈内教父", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/3200673035?", "releaser": "扒圈老鬼", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005055965621313/home?from=page_100505&mod=TAB#place", "releaser": "圈内师爷",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1915749764?is_hot=1", "releaser": "迷妹速报", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1002061836328652/home?from=page_100206&mod=TAB#place", "releaser": "前线娱乐",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5896207859?is_hot=1", "releaser": "娱记者", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5717515328?is_hot=1", "releaser": "娱老汉", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005051795994180/home?from=page_100505&mod=TAB#place",
"releaser": "娱乐News", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5978818414?is_hot=1", "releaser": "娱圈蜀黍", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2489917511?is_hot=1", "releaser": "芒果捞扒婆 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5279487569?is_hot=1", "releaser": "娱姐速报 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5106602573?is_hot=1", "releaser": "八哥 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5323541229?profile_ftype=1&is_all=1#_0", "releaser": "国内外白富美揭秘 ",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1003062512591982/home?from=page_100306&mod=TAB#place", "releaser": "圈少爷",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2821843050?profile_ftype=1&is_all=1#_0", "releaser": "圈内老鬼",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/3028215832?profile_ftype=1&is_all=1#_0", "releaser": "娱扒爷",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5336756846?profile_ftype=1&is_all=1#_0", "releaser": "兔兔热议",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005051844235935/home?from=page_100505&mod=TAB#place",
"releaser": "娱乐圈外汉", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005052586409491/home?from=page_100505&mod=TAB#place",
"releaser": "娱乐圈吃瓜指南 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5255814135", "releaser": "八组兔区爆料", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2871033210?is_hot=1", "releaser": "八组兔区热议 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005052813285937/home?from=page_100505&mod=TAB#place",
"releaser": "八组兔区娱乐圈", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005052831749482/home?from=page_100505&mod=TAB#place",
"releaser": "八组兔区揭秘", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/2709814831", "releaser": "娱大蜀黍", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5634795408", "releaser": "圈八戒", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5176743404", "releaser": "瓜瓜搬运机", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5039775130", "releaser": "娱乐揭秘蜀黍", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/7123521074", "releaser": "饭圈日报", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1746658980", "releaser": "饭圈阿姨", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005052453653365/home?from=page_100505&mod=TAB#place", "releaser": "圈内星探",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/6311417880?profile_ftype=1&is_all=1#_0", "releaser": "星扒婆 ",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1420816495?profile_ftype=1&is_all=1#_0", "releaser": "娱尾纹",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1974754790", "releaser": "教父娱乐", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1818950785?refer_flag=1028035010_&is_hot=1", "releaser": "扒圈有鱼",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1893711543", "releaser": "娱乐有饭", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1002061653255165/home?from=page_100206&mod=TAB#place",
"releaser": "娱乐日爆社", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005052391322817/home?from=page_100505&mod=TAB#place", "releaser": "小娱乐家",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1003061994712500/home?from=page_100306&mod=TAB#place",
"releaser": "星扒客push", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5700087877", "releaser": "毒舌八卦", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/3779202361", "releaser": "西皮娱乐", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1632619962", "releaser": "瓜组新鲜事", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005052103460752/home?from=page_100505&mod=TAB#place", "releaser": "娱嬷嬷 ",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/5874584452", "releaser": "吃瓜鹅每日搬", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005052397961280/home?from=page_100505&mod=TAB#place", "releaser": "娱大白",
"platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005053246379064/home?from=page_100505&mod=TAB#place",
"releaser": "娱乐圈扒姐 ", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/u/1830483711", "releaser": "娱乐女记", "platform": "weibo"},
{"releaserUrl": "https://weibo.com/p/1005053847401640/home?from=page_100505&mod=TAB#place",
"releaser": "吃瓜爆料每日搬 ", "platform": "weibo"},
{"releaserUrl": "https://www.douban.com/people/hot_tag",
"releaser": "hot_tag", "platform": "douban"},
{"releaserUrl": "https://www.douban.com/people/new_tag",
"releaser": "new_tag", "platform": "douban"}
]
extra_dic = {
"department_tags":["策略组"],
'key_releaser': True,
'frequency': 3,
}
# csv_type = {"SMG": [], "an_hui": [], "ronghe": [], "su_zhou": []}
#ronghe_releaser_write_es(file, post_by="litao")
write_to_es(data_list, post_by="litao", extra_dic=extra_dic, push_to_redis=False)