Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
crawler
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
backend
crawler
Commits
fd388d44
Commit
fd388d44
authored
4 years ago
by
litaolemo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
5efa3f31
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
110 additions
and
376 deletions
+110
-376
es_crawler.py
crawler_sys/framework/es_crawler.py
+0
-16
es_target_releasers.py
crawler_sys/framework/es_target_releasers.py
+3
-2
__init__.py
crawler_sys/proxy_pool/__init__.py
+6
-0
func_get_proxy_form_kuaidaili.py
crawler_sys/proxy_pool/func_get_proxy_form_kuaidaili.py
+15
-10
crawler_weibo.py
crawler_sys/site_crawler_test/crawler_weibo.py
+13
-12
output_results.py
crawler_sys/utils/output_results.py
+6
-6
func_get_releaser_id.py
write_data_into_es/func_get_releaser_id.py
+6
-6
target_releaser_add.py
write_data_into_es/target_releaser_add.py
+61
-324
No files found.
crawler_sys/framework/es_crawler.py
View file @
fd388d44
...
...
@@ -67,19 +67,3 @@ def scan_index(index, doc_type, search_body):
return
(
total_hit
,
scan_resp
)
def
construct_id_for_url_register
(
platform
,
url
):
if
platform
==
'new_tudou'
:
vid_bare
=
calculate_newTudou_video_id
(
url
)
vid
=
'new_tudou_
%
s'
%
vid_bare
elif
platform
==
'toutiao'
:
vid_bare
=
calculate_toutiao_video_id
(
url
)
vid
=
'toutiao_
%
s'
%
vid_bare
elif
platform
==
'腾讯新闻'
:
c_time
=
str
(
int
(
time
.
time
()))
vid
=
"tencent_news_
%
s_
%
s"
%
(
url
,
c_time
)
elif
platform
==
'网易新闻'
:
vid
=
"163_news_
%
s"
%
calculate_wangyi_news_id
(
url
)
else
:
vid_bare
=
url
vid
=
vid_bare
return
vid
This diff is collapsed.
Click to expand it.
crawler_sys/framework/es_target_releasers.py
View file @
fd388d44
...
...
@@ -9,13 +9,14 @@ import random
from
elasticsearch
import
Elasticsearch
from
elasticsearch.helpers
import
scan
#rds=redis.StrictRedis(host='192.168.17.26',port=6379,db=0)
#
rds=redis.StrictRedis(host='192.168.17.26',port=6379,db=0)
es_framework
=
Elasticsearch
(
hosts
=
'172.16.32.37'
,
port
=
9200
)
index_target_releaser
=
'target_releasers'
doc_type_target_releaser
=
'doc'
def
bulk_write_target_releasers
(
dict_Lst
,
index
=
index_target_releaser
,
doc_type
=
doc_type_target_releaser
):
...
...
@@ -74,7 +75,7 @@ def get_releaserUrls_from_es(platform,
releaserUrl_Lst
.
append
((
releaserUrl
,
releaser
))
except
:
print
(
'error in :'
,
line
)
print
(
'error in :'
,
line
)
continue
else
:
print
(
'Got zero hits.'
)
...
...
This diff is collapsed.
Click to expand it.
crawler_sys/proxy_pool/__init__.py
0 → 100644
View file @
fd388d44
# -*- coding:UTF-8 -*-
# @Time : 2020/7/24 10:51
# @File : __init__.py
# @email : litao@igengmei.com
# @author : litao
\ No newline at end of file
This diff is collapsed.
Click to expand it.
crawler_sys/proxy_pool/func_get_proxy_form_kuaidaili.py
View file @
fd388d44
...
...
@@ -8,8 +8,8 @@
目前支持的鉴权方式有 "simple" 和 "hmacsha1" 两种,默认使用 "simple"鉴权。
所有方法均可添加关键字参数sign_type修改鉴权方式。
"""
import
redis
,
random
import
kdl
,
requests
import
redis
,
random
import
kdl
,
requests
# from redis.sentinel import Sentinel
...
...
@@ -25,6 +25,7 @@ import kdl,requests
# rds = sentinel.master_for('ida_redis_master', socket_timeout=0.5, db=7, decode_responses=True)
rds
=
redis
.
StrictRedis
(
host
=
'154.8.190.251'
,
port
=
6379
,
db
=
18
,
decode_responses
=
True
)
def
get_proxy_from_redis
():
try
:
one_proxy
=
rds
.
randomkey
()
...
...
@@ -32,14 +33,15 @@ def get_proxy_from_redis():
password
=
"i9mmu0a3"
proxies
=
{
"http"
:
"http://
%(user)
s:
%(pwd)
s@
%(ip)
s/"
%
{
'user'
:
username
,
'pwd'
:
password
,
'ip'
:
one_proxy
},
"https"
:
"http://
%(user)
s:
%(pwd)
s@
%(ip)
s/"
%
{
'user'
:
username
,
'pwd'
:
password
,
'ip'
:
one_proxy
}
"http"
:
"http://
%(user)
s:
%(pwd)
s@
%(ip)
s/"
%
{
'user'
:
username
,
'pwd'
:
password
,
'ip'
:
one_proxy
},
"https"
:
"http://
%(user)
s:
%(pwd)
s@
%(ip)
s/"
%
{
'user'
:
username
,
'pwd'
:
password
,
'ip'
:
one_proxy
}
}
return
proxies
except
Exception
as
e
:
print
(
e
)
return
None
def
func_get_proxy_to_redis
():
# chance = random.random()
auth
=
kdl
.
Auth
(
"990866563045611"
,
"quxguz4hwm9cxnx6wpjhkokx04klpr8v"
)
...
...
@@ -68,14 +70,13 @@ def func_get_proxy_to_redis():
# ips = client.get_dps(1, sign_type='simple', format='json', pt=2, area='北京,上海,广东')
# print("dps proxy: ", ips)
# 检测私密代理有效性: 返回 ip: true/false 组成的dict
#ips = client.get_dps(1, sign_type='simple', format='json')
#
ips = client.get_dps(1, sign_type='simple', format='json')
# valids = client.check_dps_valid(ips)
# print("valids: ", valids)
# 获取私密代理剩余时间: 返回 ip: seconds(剩余秒数) 组成的dict
ips
=
client
.
get_dps
(
1
,
format
=
'json'
,
dedup
=
1
)
ips
=
client
.
get_dps
(
1
,
format
=
'json'
,
dedup
=
1
)
seconds
=
client
.
get_dps_valid_time
(
ips
)
# print("seconds: ", seconds)
for
key
in
seconds
:
...
...
@@ -84,10 +85,12 @@ def func_get_proxy_to_redis():
# 获取计数版ip余额(仅私密代理计数版)
# balance = client.get_ip_balance(sign_type='hmacsha1')
# print("balance: ", balance)
def
proxy_test
(
proxies
):
page_url
=
"http://dev.kdlapi.com/testproxy/"
headers
=
{
"Accept-Encoding"
:
"Gzip"
,
# 使用gzip压缩传输数据让访问更快
"Accept-Encoding"
:
"Gzip"
,
# 使用gzip压缩传输数据让访问更快
}
res
=
requests
.
get
(
url
=
page_url
,
proxies
=
proxies
,
headers
=
headers
)
...
...
@@ -95,6 +98,7 @@ def proxy_test(proxies):
if
res
.
status_code
==
200
:
print
(
res
.
content
.
decode
(
'utf-8'
))
# 获取页面内容
def
get_proxy_dic
(
max_proxies
=
None
):
if
not
max_proxies
:
max_proxies
=
8
...
...
@@ -111,6 +115,7 @@ def get_proxy_dic(max_proxies=None):
else
:
return
get_proxy_from_redis
()
def
get_proxy
(
proxies_num
=
None
):
if
proxies_num
:
proxies
=
get_proxy_dic
(
max_proxies
=
proxies_num
)
...
...
@@ -119,8 +124,9 @@ def get_proxy(proxies_num=None):
else
:
return
None
if
__name__
==
"__main__"
:
proxy_pool_dic
=
get_proxy
(
11
)
print
(
proxy_pool_dic
)
proxy_test
(
proxy_pool_dic
)
print
(
get_proxy_from_redis
())
\ No newline at end of file
print
(
get_proxy_from_redis
())
This diff is collapsed.
Click to expand it.
crawler_sys/site_crawler_test/crawler_weibo.py
View file @
fd388d44
...
...
@@ -227,17 +227,17 @@ class Crawler_weibo():
pass
data_lis
.
append
(
res
)
#
if len(data_lis) >= 100:
#
output_result(result_Lst=data_lis,
#
platform=self.platform,
#
output_to_file=output_to_file,
#
filepath=filepath,
#
push_to_redis=push_to_redis,
#
output_to_es_register=output_to_es_register,
#
output_to_es_raw=output_to_es_raw,
#
es_index=es_index,
#
)
#
data_lis.clear()
if
len
(
data_lis
)
>=
100
:
output_result
(
result_Lst
=
data_lis
,
platform
=
self
.
platform
,
output_to_file
=
output_to_file
,
filepath
=
filepath
,
push_to_redis
=
push_to_redis
,
output_to_es_register
=
output_to_es_register
,
output_to_es_raw
=
output_to_es_raw
,
es_index
=
es_index
,
)
data_lis
.
clear
()
else
:
count_false
+=
1
if
count_false
>
10
:
...
...
@@ -297,7 +297,7 @@ if __name__ == '__main__':
# for r in res:
# print(r)
for
u
in
url_list
:
test
.
releaser_page_by_time
(
1590940800000
,
1595468554268
,
u
,
output_to_es_r
aw
=
Fals
e
,
test
.
releaser_page_by_time
(
1590940800000
,
1595468554268
,
u
,
output_to_es_r
egister
=
Tru
e
,
es_index
=
'crawler-data-raw'
,
doc_type
=
'doc'
,
releaser_page_num_max
=
4000
)
# test.get_single_page(4524055937468233)
\ No newline at end of file
This diff is collapsed.
Click to expand it.
crawler_sys/utils/output_results.py
View file @
fd388d44
...
...
@@ -17,7 +17,7 @@ from crawler_sys.framework.es_ccr_index_defination import es_framework as es_sit
from
crawler_sys.framework.es_ccr_index_defination
import
index_url_register
from
crawler_sys.framework.es_ccr_index_defination
import
doc_type_url_register
from
crawler_sys.framework.es_ccr_index_defination
import
fields_url_register
from
crawler_sys.framework.es_crawler
import
construct_id_for_url_register
from
write_data_into_es.func_cal_doc_id
import
cal_doc_id
from
crawler_sys.utils.write_into_file
import
write_str_into_file
from
crawler.crawler_sys.proxy_pool.func_get_proxy_form_kuaidaili
import
get_proxy
...
...
@@ -82,11 +82,11 @@ def output_result(result_Lst, platform,
# write data into es crawler-url-register index
if
output_to_es_register
:
data_Lst_reg
=
form_data_Lst_for_url_register
(
result_Lst
)
bulk_write_into_es
(
data_Lst_reg
,
index
=
index_url_register
,
#
data_Lst_reg = form_data_Lst_for_url_register(result_Lst)
bulk_write_into_es
(
result_Lst
,
index
=
es_index
,
construct_id
=
True
,
platform
=
platform
platform
=
platform
,
)
# feed url into redis
...
...
@@ -182,7 +182,7 @@ def bulk_write_into_es(dict_Lst,
for
line
in
dict_Lst
:
write_counter
+=
1
if
construct_id
and
platform
is
not
None
:
doc_id
=
c
onstruct_id_for_url_register
(
platform
,
line
[
'url'
]
)
doc_id
=
c
al_doc_id
(
platform
,
url
=
line
[
"url"
],
doc_id_type
=
'all-time-url'
,
data_dict
=
line
)
action_str
=
(
'{ "index" : { "_index" : "
%
s", "_id" : "
%
s" } }'
%
(
index
,
doc_id
))
else
:
...
...
This diff is collapsed.
Click to expand it.
write_data_into_es/func_get_releaser_id.py
View file @
fd388d44
...
...
@@ -256,21 +256,21 @@ def pearvideo(releaserUrl,**kwargs):
def
weibo
(
releaserUrl
,
**
kwargs
):
try
:
containerid
=
""
if
"/u/"
in
releaserUrl
:
releaser_id
=
containerid
=
re
.
findall
(
"/u/(
\
d+)"
,
releaserUrl
)[
0
]
releaser_id
=
re
.
findall
(
"/u/(
\
d+)"
,
releaserUrl
)[
0
]
elif
"/p/"
in
releaserUrl
:
releaser_id
=
containerid
=
re
.
findall
(
"/p/(
\
d+)"
,
releaserUrl
)[
0
]
releaser_id
=
re
.
findall
(
"/p/(
\
d+)"
,
releaserUrl
)[
0
]
if
len
(
releaser_id
)
>=
15
:
releaser_id
=
releaser_id
[
6
:]
elif
"/"
in
releaserUrl
:
releaser_id
=
containerid
=
re
.
findall
(
"(
\
d+)"
,
releaserUrl
)[
0
]
releaser_id
=
re
.
findall
(
"(
\
d+)"
,
releaserUrl
)[
0
]
else
:
try
:
releaserid
=
int
(
releaserUrl
)
releaser
_
id
=
int
(
releaserUrl
)
except
:
return
None
return
releaser_id
,
containerid
return
releaser_id
except
:
return
None
...
...
This diff is collapsed.
Click to expand it.
write_data_into_es/target_releaser_add.py
View file @
fd388d44
...
...
@@ -21,16 +21,11 @@ except:
import
redis
import
hashlib
hosts
=
'192.168.17.11'
port
=
80
user
=
'zhouyujiang'
passwd
=
'8tM9JDN2LVxM'
http_auth
=
(
user
,
passwd
)
es
=
Elasticsearch
(
hosts
=
hosts
,
port
=
port
,
http_auth
=
http_auth
)
index
=
'short-video-production'
doc_type
=
'daily-url'
pool
=
redis
.
ConnectionPool
(
host
=
'192.168.17.60'
,
port
=
6379
,
db
=
2
,
decode_responses
=
True
)
rds
=
redis
.
Redis
(
connection_pool
=
pool
)
hosts
=
'172.16.32.37'
port
=
9200
es
=
Elasticsearch
(
hosts
=
hosts
,
port
=
port
)
# pool = redis.ConnectionPool(host='192.168.17.60', port=6379, db=2, decode_responses=True)
# rds = redis.Redis(connection_pool=pool)
today
=
datetime
.
datetime
.
now
()
first_day
=
datetime
.
datetime
(
today
.
year
,
today
.
month
,
1
)
...
...
@@ -52,14 +47,6 @@ def parse_line_dict(line, line_dict, blank_space_error, new_line_error, err_id_l
if
"
\t
"
in
line_dict
[
k
]:
new_line_error
=
new_line_error
+
str
(
line
+
2
)
+
","
line_dict
[
k
]
=
line_dict
[
k
]
.
replace
(
"
\r
"
,
""
)
.
replace
(
"
\n
"
,
""
)
.
replace
(
"
\t
"
,
""
)
.
replace
(
" "
,
""
)
try
:
if
k
==
"releaserUrl"
:
line_dict
[
k
]
=
\
re
.
findall
(
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+~]|[!*\(\),]|(?:
%
[0-9a-fA-F][0-9a-fA-F]))+"
,
line_dict
[
k
])[
0
]
except
Exception
as
e
:
# print(e)
err_id_line
=
err_id_line
+
str
(
line
+
2
)
+
","
except
Exception
as
e
:
# print(e)
continue
...
...
@@ -124,10 +111,6 @@ def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, updat
line_dict
.
pop
(
""
)
try
:
releaserUrl
=
line_dict
[
'releaserUrl'
]
if
platform
==
'new_tudou'
:
if
releaserUrl
[
-
2
:]
==
'=='
:
releaserUrl
=
releaserUrl
+
'/videos'
line_dict
[
'releaserUrl'
]
=
releaserUrl
except
:
releaserUrl
=
line_dict
[
'releaserUrl'
]
...
...
@@ -153,31 +136,31 @@ def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, updat
if
not
extra_dic
.
get
(
"department_tags"
):
extra_dic
.
pop
(
"department_tags"
,
0
)
search_re
=
es
.
search
(
index
=
'target_releasers'
,
doc_type
=
'doc'
,
body
=
find_exist
)
if
search_re
[
'hits'
][
'total'
]
>
0
:
search_source
=
search_re
[
'hits'
][
'hits'
][
0
][
'_source'
]
# print(search_source)
if
search_source
.
get
(
"project_tags"
):
try
:
# print(kwargs.get("extra_dic"))
line_dict
[
"project_tags"
]
.
extend
(
search_source
.
get
(
"project_tags"
))
line_dict
[
"project_tags"
]
=
list
(
set
(
line_dict
[
"project_tags"
]))
search_source
.
pop
(
"project_tags"
,
0
)
except
Exception
as
e
:
pass
# print("project_tags error", e)
if
search_source
.
get
(
"department_tags"
):
try
:
# print(kwargs.get("extra_dic"))
line_dict
[
"department_tags"
]
.
extend
(
search_source
.
get
(
"department_tags"
))
line_dict
[
"department_tags"
]
=
list
(
set
(
line_dict
[
"department_tags"
]))
search_source
.
pop
(
"department_tags"
,
0
)
except
Exception
as
e
:
pass
# print("project_tags error", e)
if
update
:
line_dict
.
update
(
search_source
)
line_dict
[
"post_time"
]
=
search_source
.
get
(
"post_time"
)
#
search_re = es.search(index='target_releasers', doc_type='doc', body=find_exist)
#
if search_re['hits']['total'] > 0:
#
search_source = search_re['hits']['hits'][0]['_source']
#
# print(search_source)
#
if search_source.get("project_tags"):
#
try:
#
# print(kwargs.get("extra_dic"))
#
line_dict["project_tags"].extend(search_source.get("project_tags"))
#
line_dict["project_tags"] = list(set(line_dict["project_tags"]))
#
search_source.pop("project_tags", 0)
#
except Exception as e:
#
pass
#
# print("project_tags error", e)
#
if search_source.get("department_tags"):
#
try:
#
# print(kwargs.get("extra_dic"))
#
line_dict["department_tags"].extend(search_source.get("department_tags"))
#
line_dict["department_tags"] = list(set(line_dict["department_tags"]))
#
search_source.pop("department_tags", 0)
#
except Exception as e:
#
pass
#
# print("project_tags error", e)
#
if update:
#
line_dict.update(search_source)
#
line_dict["post_time"] = search_source.get("post_time")
if
line_dict
.
get
(
"post_time"
):
pass
...
...
@@ -186,24 +169,14 @@ def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, updat
try
:
line_dict
[
"releaser_id"
]
=
get_releaser_id
(
platform
=
platform
,
releaserUrl
=
releaserUrl
)
if
platform
!=
"weixin"
and
platform
!=
"weibo"
:
line_dict
[
"releaser_id_str"
]
=
platform
+
"_"
+
line_dict
[
"releaser_id"
]
else
:
line_dict
[
"releaser_id_str"
]
=
line_dict
[
"releaser_id"
]
line_dict
[
"releaser_id_str"
]
=
platform
+
"_"
+
line_dict
[
"releaser_id"
]
line_dict
[
"is_valid"
]
=
"true"
except
:
line_dict
[
"releaser_id"
]
=
""
line_dict
[
"releaser_id_str"
]
=
""
line_dict
[
"is_valid"
]
=
"false"
if
kwargs
.
get
(
"post_by"
):
line_dict
[
"post_by"
]
=
kwargs
.
get
(
"post_by"
)
# try:
# line_dict.pop("平台账号主页URL")
# except:
# pass
if
platform
in
[
"weixin"
]
and
not
line_dict
.
get
(
"del_departments"
):
line_dict
[
"is_purchased"
]
=
True
if
not
line_dict
.
get
(
"project_tags"
):
line_dict
[
"project_tags"
]
=
[]
if
not
line_dict
.
get
(
"department_tags"
):
...
...
@@ -233,45 +206,22 @@ def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, updat
"releaser_id"
:
line_dict
.
get
(
"releaser_id"
),
"releaser_id_str"
:
line_dict
.
get
(
"releaser_id_str"
),
"post_by"
:
line_dict
.
get
(
"post_by"
),
"Nov_2018"
:
line_dict
.
get
(
"Nov_2018"
),
"post_time"
:
line_dict
.
get
(
"post_time"
),
"frequency"
:
3
if
line_dict
.
get
(
"project_tags"
)
else
1
,
"key_releaser"
:
line_dict
.
get
(
"key_releaser"
),
"is_valid"
:
line_dict
.
get
(
"is_valid"
),
"systematic"
:
line_dict
.
get
(
"platform"
)
if
line_dict
.
get
(
"platform"
)
==
"weixin"
or
line_dict
.
get
(
"platform"
)
==
"weibo"
or
line_dict
.
get
(
"platform"
)
==
"app"
or
line_dict
.
get
(
"platform"
)
==
"tv"
else
"short_video"
,
"has_data"
:
line_dict
.
get
(
"has_data"
)
if
line_dict
.
get
(
"has_data"
)
else
0
,
"project_tags"
:
line_dict
.
get
(
"project_tags"
),
"department_tags"
:
line_dict
.
get
(
"department_tags"
),
'timestamp'
:
int
(
datetime
.
datetime
.
timestamp
(
datetime
.
datetime
.
now
())
*
1000
),
'media_type'
:
line_dict
.
get
(
"media_type"
)
if
line_dict
.
get
(
"media_type"
)
else
""
,
'tv_station'
:
line_dict
.
get
(
"tv_station"
)
if
line_dict
.
get
(
"tv_station"
)
else
""
,
'releaser_type'
:
line_dict
.
get
(
"releaser_type"
)
if
line_dict
.
get
(
"releaser_type"
)
else
""
,
'channel'
:
line_dict
.
get
(
"channel"
)
if
line_dict
.
get
(
"channel"
)
else
""
,
'channel_type'
:
line_dict
.
get
(
"channel_type"
)
if
line_dict
.
get
(
"channel_type"
)
else
""
,
'program'
:
line_dict
.
get
(
"program"
)
if
line_dict
.
get
(
"program"
)
else
""
,
'tv_type'
:
line_dict
.
get
(
"tv_type"
)
if
line_dict
.
get
(
"tv_type"
)
else
""
,
}
if
line_dict
.
get
(
"is_purchased"
):
bulk_dic
[
"is_purchased"
]
=
True
if
type
(
line_dict
.
get
(
"purchase_end_time"
))
==
str
:
if
"-"
in
line_dict
.
get
(
"purchase_end_time"
):
bulk_dic
[
"purchase_end_time"
]
=
int
(
datetime
.
datetime
.
strptime
(
line_dict
.
get
(
"purchase_end_time"
),
'
%
Y-
%
m-
%
d'
)
.
timestamp
()
*
1e3
)
elif
"/"
in
line_dict
.
get
(
"purchase_end_time"
):
bulk_dic
[
"purchase_end_time"
]
=
int
(
datetime
.
datetime
.
strptime
(
line_dict
.
get
(
"purchase_end_time"
),
'
%
Y/
%
m/
%
d'
)
.
timestamp
()
*
1e3
)
else
:
error_msg_list
.
append
(
"第
%
s行 日期格式错误,请修改后重试"
%
new_line_error
[:
-
1
])
else
:
bulk_dic
[
"purchase_end_time"
]
=
7258089600000
bluk_purchase_list
.
append
(
bulk_dic
)
bulk_head
=
'{"index": {"_id":"
%
s"}}'
%
doc_id
if
push_to_redis
:
rds
.
lpush
(
"releaser_doc_id_list"
,
doc_id
)
#
if push_to_redis:
#
rds.lpush("releaser_doc_id_list", doc_id)
data_str
=
json
.
dumps
(
bulk_dic
,
ensure_ascii
=
False
)
bulk_one_body
=
bulk_head
+
'
\n
'
+
data_str
+
'
\n
'
# print(bulk_one_body)
...
...
@@ -290,10 +240,6 @@ def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, updat
)
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
if
line_dict
.
get
(
"add_departments"
)
and
not
push_to_redis
:
purchase_releaser_add
(
bluk_purchase_list
,
line_dict
.
get
(
"add_departments"
))
if
line_dict
.
get
(
"del_departments"
)
and
not
push_to_redis
:
purchase_releaser_add
(
bluk_purchase_list
,
line_dict
.
get
(
"del_departments"
),
if_add
=
False
)
error_msg_list
.
append
(
"
%
s条 写入成功"
%
count
)
if
err_id_line
:
error_msg_list
.
append
(
"第
%
s行 releaserUrl错误"
%
err_id_line
[:
-
1
])
...
...
@@ -304,245 +250,36 @@ def write_to_es(file, push_to_redis=True, update=True, key_releaser=False, updat
return
error_msg_list
def
purchase_releaser_add
(
bluk_dic
,
departments
,
if_add
=
True
):
start_purchase_time
=
int
(
datetime
.
datetime
.
now
()
.
timestamp
()
*
1e3
)
bulk_all_body
=
""
count
=
0
for
department
in
departments
:
if
if_add
:
for
dic
in
bluk_dic
:
_id
=
department
+
"_"
+
dic
[
"releaser_id_str"
]
try
:
res
=
es
.
get_source
(
index
=
"department_purchase_log"
,
doc_type
=
"doc"
,
id
=
_id
,
timeout
=
"1m"
,
)
start_purchase_time
=
res
[
"purchase_start_time"
]
dic
.
update
(
res
)
except
:
pass
dic
[
"department"
]
=
department
dic
[
"is_purchased"
]
=
True
dic
[
"purchase_start_time"
]
=
start_purchase_time
dic
[
"timestamp"
]
=
start_purchase_time
bulk_head
=
'{"index": {"_id":"
%
s"}}'
%
_id
data_str
=
json
.
dumps
(
dic
,
ensure_ascii
=
False
)
bulk_one_body
=
bulk_head
+
'
\n
'
+
data_str
+
'
\n
'
bulk_all_body
+=
bulk_one_body
count
=
count
+
1
if
count
%
500
==
0
:
eror_dic
=
es
.
bulk
(
index
=
'department_purchase_log'
,
doc_type
=
'doc'
,
body
=
bulk_all_body
)
bulk_all_body
=
''
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
else
:
for
dic
in
bluk_dic
:
_id
=
department
+
"_"
+
dic
[
"releaser_id_str"
]
try
:
res
=
es
.
get_source
(
index
=
"department_purchase_log"
,
doc_type
=
"doc"
,
id
=
_id
)
start_purchase_time
=
res
[
"purchase_start_time"
]
dic
.
update
(
res
)
except
:
pass
dic
[
"department"
]
=
department
dic
[
"is_purchased"
]
=
False
dic
[
"purchase_end_time"
]
=
start_purchase_time
dic
[
"timestamp"
]
=
start_purchase_time
bulk_head
=
'{"index": {"_id":"
%
s"}}'
%
_id
data_str
=
json
.
dumps
(
dic
,
ensure_ascii
=
False
)
bulk_one_body
=
bulk_head
+
'
\n
'
+
data_str
+
'
\n
'
bulk_all_body
+=
bulk_one_body
count
=
count
+
1
if
count
%
500
==
0
:
eror_dic
=
es
.
bulk
(
index
=
'department_purchase_log'
,
doc_type
=
'doc'
,
body
=
bulk_all_body
)
bulk_all_body
=
''
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
if
not
departments
:
for
dic
in
bluk_dic
:
try
:
for
department
in
dic
[
"department_tags"
]:
if
not
dic
.
get
(
"is_purchased"
):
continue
_id
=
department
+
"_"
+
dic
[
"releaser_id_str"
]
try
:
res
=
es
.
get_source
(
index
=
"department_purchase_log"
,
doc_type
=
"doc"
,
id
=
_id
)
start_purchase_time
=
res
[
"purchase_start_time"
]
dic
.
update
(
res
)
except
:
pass
dic
[
"department"
]
=
department
dic
[
"is_purchased"
]
=
True
dic
[
"purchase_start_time"
]
=
start_purchase_time
dic
[
"timestamp"
]
=
start_purchase_time
bulk_head
=
'{"index": {"_id":"
%
s"}}'
%
_id
data_str
=
json
.
dumps
(
dic
,
ensure_ascii
=
False
)
bulk_one_body
=
bulk_head
+
'
\n
'
+
data_str
+
'
\n
'
bulk_all_body
+=
bulk_one_body
count
=
count
+
1
if
count
%
500
==
0
:
eror_dic
=
es
.
bulk
(
index
=
'department_purchase_log'
,
doc_type
=
'doc'
,
body
=
bulk_all_body
)
bulk_all_body
=
''
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
except
:
continue
if
bulk_all_body
!=
''
:
eror_dic
=
es
.
bulk
(
body
=
bulk_all_body
,
index
=
'department_purchase_log'
,
doc_type
=
'doc'
,
)
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
def
ronghe_releaser_write_es
(
target_file
,
extra_dic
=
{},
post_by
=
None
):
target_index
=
'target_releasers'
target_type
=
'doc'
bulk_all_body
=
""
err_id_line
=
""
blank_space_error
=
""
new_line_error
=
""
error_msg_list
=
[]
short_video_list
=
[]
try
:
f
=
open
(
target_file
,
'r'
,
encoding
=
"gb18030"
)
head
=
f
.
readline
()
head_list
=
head
.
strip
()
.
split
(
','
)
except
:
f
=
target_file
for
line
,
i
in
enumerate
(
f
):
if
type
(
target_file
)
!=
list
:
try
:
line_list
=
i
.
strip
()
.
split
(
','
)
line_dict
=
dict
(
zip
(
head_list
,
line_list
))
except
:
line_dict
=
f
else
:
line_dict
=
i
dic
,
blank_space_error
,
new_line_error
,
err_id_line
=
parse_line_dict
(
line
,
line_dict
,
blank_space_error
,
new_line_error
,
err_id_line
)
print
(
"line
%
s"
%
line
)
# dic["timestamp"] = int(datetime.datetime.now().timestamp() * 1e3)
# dic["timestamp"] = 1580976928032
if
dic
[
"channel_id"
]:
dic
[
"channel_id"
]
=
dic
[
"channel_id"
]
.
zfill
(
4
)
if
dic
[
"live_type_id"
]:
dic
[
"live_type_id"
]
=
dic
[
"live_type_id"
]
.
zfill
(
3
)
if
dic
.
get
(
"time_shift_type_id"
):
dic
[
"time_shift_type_id"
]
=
dic
[
"time_shift_type_id"
]
.
zfill
(
3
)
try
:
if
dic
.
get
(
"releaserUrl"
)
and
dic
[
"systematic"
]
==
"short_video"
:
# print(get_releaser_id(platform=dic["releaser_platform"], releaserUrl=dic["releaserUrl"]))
dic
[
"releaser_id_str"
]
=
dic
[
"platform"
]
+
"_"
+
get_releaser_id
(
platform
=
dic
[
"platform"
],
releaserUrl
=
dic
[
"releaserUrl"
])
else
:
dic
[
"releaser_id_str"
]
=
get_releaser_id
(
platform
=
dic
[
"platform"
],
releaserUrl
=
dic
[
"releaserUrl"
])
except
:
print
(
"error_url"
,
dic
.
get
(
"releaserUrl"
))
# ID为如下字段哈希值
# TV_station + platform + channel + releaser_platform + releaser + releaser_platform + releaserUrl
# if dic["platform"] != "weibo":
# continue
if
dic
[
"releaser_type"
]
==
"电视新闻类"
:
dic
[
"media_type"
]
=
[
"traditional_media"
,
"tv_news"
]
if
dic
[
"systematic"
]
==
"short_video"
:
_id
=
dic
[
"releaser_id_str"
]
elif
dic
[
"systematic"
]
==
"weibo"
:
_id
=
"weibo_
%
s"
%
dic
[
"releaser_id_str"
]
elif
dic
[
"systematic"
]
==
"weixin"
:
_id
=
"weixin_
%
s"
%
dic
[
"releaser_id_str"
]
elif
dic
[
"systematic"
]
==
"tv"
:
eid
=
dic
[
"tv_station"
]
+
dic
[
"systematic"
]
+
dic
[
"channel"
]
+
dic
[
"program"
]
+
dic
.
get
(
"releaser"
)
+
dic
.
get
(
"platform"
)
+
dic
[
"releaserUrl"
]
print
(
eid
)
sha1
=
hashlib
.
sha1
()
sha1
.
update
(
eid
.
encode
(
"utf8"
))
_id
=
sha1
.
hexdigest
()
elif
dic
[
"systematic"
]
==
"app"
:
_id
=
"app_
%
s"
%
dic
[
"releaser"
]
else
:
continue
find_exist
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"_id"
:
_id
}}
]
}
}
}
search_re
=
es
.
search
(
index
=
'target_releasers'
,
doc_type
=
'doc'
,
body
=
find_exist
)
if
search_re
[
'hits'
][
'total'
]
>
0
:
search_source
=
search_re
[
'hits'
][
'hits'
][
0
][
'_source'
]
search_source
.
update
(
dic
)
dic
=
search_source
dic
[
"is_ronghe_target_releaser"
]
=
1
dic
[
"is_removed"
]
=
0
dic
[
"is_valid"
]
=
"true"
dic
[
"ronghe"
]
=
"true"
if
not
dic
.
get
(
"department_tags"
):
dic
[
"department_tags"
]
=
[]
dic
[
"department_tags"
]
.
append
(
"CCR"
)
dic
[
"department_tags"
]
=
list
(
set
(
dic
[
"department_tags"
]))
if
dic
.
get
(
"project_tags"
):
try
:
dic
[
"project_tags"
]
.
append
(
"融合指数-CCR"
)
dic
[
"project_tags"
]
=
list
(
set
(
dic
[
"project_tags"
]))
except
Exception
as
e
:
print
(
e
)
print
(
dic
[
"project_tags"
])
else
:
dic
[
"project_tags"
]
=
[
"融合指数-CCR"
]
# dic["timestamp"] = 1580976928032
bulk_head
=
'{"index": {"_id":"
%
s"}}'
%
_id
data_str
=
json
.
dumps
(
dic
,
ensure_ascii
=
False
)
if
dic
[
"platform"
]
in
[
"weixin"
,
"weibo"
,
"short_video"
]:
short_video_list
.
append
(
dic
)
bulk_one_body
=
bulk_head
+
'
\n
'
+
data_str
+
'
\n
'
bulk_all_body
+=
bulk_one_body
if
(
line
+
1
)
%
500
==
0
:
eror_dic
=
es
.
bulk
(
index
=
target_index
,
doc_type
=
target_type
,
body
=
bulk_all_body
,
request_timeout
=
200
)
bulk_all_body
=
''
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
[
'items'
])
print
(
bulk_all_body
)
print
(
line
)
if
bulk_all_body
!=
''
:
eror_dic
=
es
.
bulk
(
body
=
bulk_all_body
,
index
=
target_index
,
doc_type
=
target_type
,
request_timeout
=
200
)
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
bulk_all_body
=
''
# print(platform, releaser, 'end_have:', len(wirte_set), 'add:', len(set_url))
print
(
line
)
error_msg_list
.
append
(
"
%
s条 写入成功"
%
count
)
# write_to_es(short_video_list,extra_dic=extra_dic,post_by=post_by)
if
__name__
==
"__main__"
:
file
=
r'D:\work_file\发布者账号\融媒账号列表\【3月使用的】电视新闻融合指数(仅省级台)对照表0401 的副本.csv'
file
=
r'D:\work_file\发布者账号\一次性需求附件\【1季度】电视新闻融合指数(仅省级台)对照表-微博.csv'
# file = r'D:\wxfile\WeChat Files\litaolemo\FileStorage\File\2020-04\导入账号模板-客服测试(1).csv'
# file = r'D:\work_file\发布者账号\一次性需求附件\【2月使用的】电视新闻融合指数(仅省级台)对照表0309 的副本.csv'
data_list
=
[{
"releaserUrl"
:
"https://weibo.com/u/1764615662"
,
"releaser"
:
"娱乐圈贵妃"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/3662247177"
,
"releaser"
:
"捞娱君"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/2378564111"
,
"releaser"
:
"娱乐扒皮"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/2983578965"
,
"releaser"
:
"娱乐圈小青年"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/3938976579"
,
"releaser"
:
"娱乐捞饭"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/6511177474"
,
"releaser"
:
"小组吃瓜蜀黍"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/6343916471"
,
"releaser"
:
"圈内老顽童"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/6511177474"
,
"releaser"
:
"八组吃瓜蜀黍"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/2921603920"
,
"releaser"
:
"娱乐圈新鲜事"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/6470919752"
,
"releaser"
:
"伊丽莎白骨精啊"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/2653906910?refer_flag=1001030103_&is_hot=1"
,
"releaser"
:
"娱乐榜姐"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/3115996363?is_hot=1"
,
"releaser"
:
"娱乐星事"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/p/1005053212093237/home?from=page_100505&mod=TAB#place"
,
"releaser"
:
"星探扒皮"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/3926129482"
,
"releaser"
:
"星闻追踪"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/5509337969?is_hot=1"
,
"releaser"
:
"卦哥娱乐"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/5477320351"
,
"releaser"
:
"圈内扒爷"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/p/1005055634795408/home?from=page_100505&mod=TAB#place"
,
"releaser"
:
"圈八戒 "
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/u/6511173721"
,
"releaser"
:
"圈内课代表"
,
"platform"
:
"weibo"
},
{
"releaserUrl"
:
"https://weibo.com/p/1005055471534537/home?from=page_100505&mod=TAB&is_hot=1#place"
,
"releaser"
:
"娱闻少女"
,
"platform"
:
"weibo"
}]
extra_dic
=
{
# "department_tags":["客服部"],
# 'key_releaser': "True",
# 'frequency': 3,
# "project_tags":["城市媒体融合"],
# "is_purchased": True,
# "del_departments": ["客服部"],
# "purchase_end_time": "2020-04-28"
"department_tags"
:[
"策略组"
],
'key_releaser'
:
True
,
'frequency'
:
3
,
}
# csv_type = {"SMG": [], "an_hui": [], "ronghe": [], "su_zhou": []}
#ronghe_releaser_write_es(file, post_by="litao")
write_to_es
(
file
,
post_by
=
"litao"
,
extra_dic
=
extra_dic
,
push_to_redis
=
Tru
e
)
write_to_es
(
data_list
,
post_by
=
"litao"
,
extra_dic
=
extra_dic
,
push_to_redis
=
Fals
e
)
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment