Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
crawler
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
backend
crawler
Commits
eb68175b
Commit
eb68175b
authored
Aug 11, 2020
by
litaolemo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
dbbc805d
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
326 additions
and
1 deletion
+326
-1
cal_ni_and_put_to_backend.py
crawler_sys/scheduler/cal_ni_and_put_to_backend.py
+1
-1
push_crawler_data_to_mysql.py
crawler_sys/scheduler/push_crawler_data_to_mysql.py
+77
-0
trans_qiniu_img.py
crawler_sys/utils/trans_qiniu_img.py
+248
-0
No files found.
crawler_sys/scheduler/cal_ni_and_put_to_backend.py
View file @
eb68175b
...
...
@@ -476,7 +476,7 @@ def task_main():
{content}
"""
.
format
(
tractate_id
=
tractate_id
,
content
=
res_data
[
"content"
],
level
=
res_data
[
"level"
])
send_file_email
(
""
,
""
,
email_group
=
[
"<hongxu@igengmei
@igengmei
.com>"
,
"<yangjiayue@igengmei.com>"
,
"<zhangweiwei@igengmei.com>"
,
"<liuyiting@igengmei.com>"
],
cc_group
=
[
"<duanyingrong@igengmei.com>"
,
"<litao@igengmei.com>"
],
send_file_email
(
""
,
""
,
email_group
=
[
"<hongxu@igengmei.com>"
,
"<yangjiayue@igengmei.com>"
,
"<zhangweiwei@igengmei.com>"
,
"<liuyiting@igengmei.com>"
],
cc_group
=
[
"<duanyingrong@igengmei.com>"
,
"<litao@igengmei.com>"
],
email_msg_body_str
=
body_str
,
title_str
=
title_str
)
print
(
"send to mysql"
)
except
Exception
as
e
:
...
...
crawler_sys/scheduler/push_crawler_data_to_mysql.py
0 → 100644
View file @
eb68175b
# -*- coding:UTF-8 -*-
# @Time : 2020/8/11 9:54
# @File : push_crawler_data_to_mysql.py
# @email : litao@igengmei.com
# @author : litao
import
redis
from
maintenance.func_send_email_with_file
import
send_file_email
from
typing
import
Dict
,
List
from
elasticsearch
import
Elasticsearch
from
elasticsearch.helpers
import
scan
from
crawler.crawler_sys.utils.trans_qiniu_img
import
write_data_into_mysql
es_framework
=
Elasticsearch
(
hosts
=
'172.16.32.37'
,
port
=
9200
)
rds
=
redis
.
StrictRedis
(
host
=
'172.16.40.164'
,
port
=
6379
,
db
=
19
,
password
=
'ReDis!GmTx*0aN12'
)
# rds = redis.StrictRedis(host='172.16.40.164', port=6379, db=19, password='ReDis!GmTx*0aN12')
user_id_list
=
[
33745191
,
33745202
,
33745231
,
33745286
,
33745295
,
33745266
,
33745315
,
33745333
,
33745346
,
33745353
,
33745327
,
33745340
,
33745355
,
33745359
,
33745364
,
33745371
,
33745395
,
33745421
,
33745433
,
33745457
]
def
send_email
(
query_id_dict
:
Dict
):
try
:
if
query_id_dict
:
for
search_keyword
in
query_id_dict
:
title_str
=
"关键词
%
s帖子内容审核"
%
search_keyword
body_str
=
"""
问好:
新的query:{search_keyword}抓取内容需要审核,帖子号为
\n
"""
.
format
(
search_keyword
=
search_keyword
,)
for
tractate_id
in
query_id_dict
[
search_keyword
]:
body_str
+=
tractate_id
+
", "
print
(
"line25"
,
tractate_id
)
send_file_email
(
""
,
""
,
email_group
=
[
"<hongxu@igengmei@igengmei.com>"
,
"<yangjiayue@igengmei.com>"
,
"<zhangweiwei@igengmei.com>"
,
"<liuyiting@igengmei.com>"
],
cc_group
=
[
"<duanyingrong@igengmei.com>"
,
"<litao@igengmei.com>"
],
email_msg_body_str
=
body_str
,
title_str
=
title_str
)
print
(
"send to mysql"
)
except
Exception
as
e
:
print
(
"send email error
%
s"
%
e
)
def
scan_es_to_mysql
():
query_id_dict
=
{}
search_query
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
],
"must"
:
[
{
"exists"
:
{
"field"
:
"search_word"
}}
]
}
}
}
scan_res
=
scan
(
client
=
es_framework
,
query
=
search_query
,
index
=
"crawler-data-raw"
)
for
res
in
scan_res
:
if_exists
=
rds
.
sismember
(
"article_id_list"
,
res
[
"_id"
])
tractate_id
=
None
if
not
if_exists
:
data
=
res
[
"_source"
]
data
[
"doc_id"
]
=
res
[
"_id"
]
try
:
tractate_id
=
write_data_into_mysql
(
data
,
user_id_list
)
except
Exception
as
e
:
print
(
"send to mysql error
%
s"
%
e
)
if
tractate_id
:
search_word
=
data
[
"search_word"
]
query_id_dict
[
search_word
][
tractate_id
]
=
1
send_email
(
query_id_dict
)
if
__name__
==
"__main__"
:
scan_es_to_mysql
()
crawler_sys/utils/trans_qiniu_img.py
0 → 100644
View file @
eb68175b
# -*- coding:UTF-8 -*-
# @Time : 2020/8/11 10:03
# @File : trans_qiniu_img.py
# @email : litao@igengmei.com
# @author : litao
import
datetime
from
maintenance.func_send_email_with_file
import
send_file_email
from
typing
import
Dict
,
List
from
crawler.gm_upload.gm_upload
import
upload
,
upload_file
import
os
import
copy
import
re
# import HTMLParser
import
pymysql
from
crawler.crawler_sys.utils.output_results
import
retry_get_url
from
lxml
import
html
from
lxml.html.clean
import
Cleaner
import
random
class
TRACTATE_PLATFORM
():
"""
新帖子发布来源
"""
GM
=
(
"1"
,
u"更美"
)
HERA
=
(
"2"
,
u"HERA后台"
)
DOCTOR
=
(
"3"
,
u"医生端"
)
XIAOHONGSHU
=
(
"4"
,
u"小红书"
)
WEIBO
=
(
"5"
,
u"微博"
)
SOYOUNG
=
(
"6"
,
u"新氧"
)
MARK
=
(
"7"
,
u"站内打卡活动"
)
VARIETY_SHOW_YOUNG
=
(
"8"
,
"选秀节目(少年之名)打榜活动"
)
GROUP_DETAIL
=
(
"9"
,
"普通小组"
)
GROUP_TOPIC_DETAIL
=
(
"10"
,
"普通小组话题"
)
STRATEGY_WEIBO_HOTSPOT
=
(
"11"
,
"策略微博热点"
)
STRATEGY_DOUBAN_HOTSPOT
=
(
"12"
,
"策略豆瓣鹅组热点"
)
STRATEGY_TOUTIAO
=
(
"13"
,
"策略头条文章"
)
STRATEGY_ZHIHU
=
(
"14"
,
"策略知乎文章"
)
STRATEGY_XIAOHONGSHU
=
(
"15"
,
"策略小红书文章"
)
STRATEGY_SOYOUNG
=
(
"16"
,
"策略新氧文章"
)
STRATEGY_WEIBO
=
(
"17"
,
"策略微博文章"
)
img_type
=
{
"OTHER"
:
1
,
# '其他图片'
"GIF"
:
2
,
# "GIF动图")
"JPG"
:
3
,
# "JPG图片")
"JPEG"
:
4
,
# "JPEG图片")
"PNG"
:
5
,
# "PNG图片")
"BMP"
:
6
,
# "BMP位图")
"WEBP"
:
7
,
# "WEBP图片类型")
"TIFF"
:
8
,
# "TIFF图片类型")
}
WHITE_TAGS
=
{
"basic"
:
[
"div"
,
"p"
,
"span"
,
"img"
,
"br"
,
"video"
,
'a'
],
# 暂定小程序及爬取数据使用
"all"
:
[
"div"
,
"p"
,
"span"
,
"img"
,
"br"
,
"video"
,
"audio"
,
"a"
,
"b"
,
"strong"
,
"i"
,
"ul"
,
"ol"
,
"li"
,
"em"
,
"h1"
,
"h2"
,
"h3"
,
"h4"
,
"h5"
,
"h6"
,
"iframe"
,
]
# 可以展示的所有白标签
}
def
gm_convert_html_tags
(
rich_text
,
all_tags
=
False
,
remove_tags
=
None
):
"""
富文本内容重新清洗,剔除不需要的样式
:param rich_text: 富文本
:param all_tags: 是否需要匹配所有白名单中的标签
:param remove_tags: 需要剔除的,白名单标签 []
:return:
"""
if
not
rich_text
:
return
""
# rich_text = _get_rich_text(rich_text)
# 标签清洗 + 补齐 参数
tags
=
WHITE_TAGS
[
"all"
]
if
all_tags
else
WHITE_TAGS
[
"basic"
]
if
remove_tags
:
tags
=
[
tag
for
tag
in
tags
if
tag
not
in
remove_tags
]
kw
=
{
"remove_unknown_tags"
:
False
,
"allow_tags"
:
tags
,
"safe_attrs"
:
[
"src"
,
],
}
if
"a"
in
tags
:
kw
[
"safe_attrs"
]
.
append
(
"href"
)
elif
all_tags
:
kw
[
"safe_attrs"
]
.
extend
([
"class"
,
"style"
])
if
"iframe"
in
kw
[
"allow_tags"
]:
kw
[
"embedded"
]
=
False
clear
=
Cleaner
(
**
kw
)
rich_text
=
clear
.
clean_html
(
rich_text
)
# 增加样式
element_obj
=
html
.
fromstring
(
rich_text
)
for
element
in
element_obj
.
xpath
(
u"//img|//video"
):
if
not
all_tags
:
# 小程序,普通用户,爬取数据
element
.
attrib
[
"width"
]
=
"100
%
"
# 图片、视频增加宽度 100%
if
element
.
tag
==
"video"
and
all_tags
:
element
.
attrib
[
"class"
]
=
"js_richtext_video"
# 移除a标签中跳转链不是gengmei开头的链接
for
item
in
element_obj
.
xpath
(
"//a[not(starts-with(@href, 'gengmei://'))]"
):
item
.
getparent
()
.
remove
(
item
)
# a 标签追加样式
for
item
in
element_obj
.
xpath
(
"//a"
):
item
.
attrib
[
"style"
]
=
'color:#3FB5AF'
# a标签颜色
rich_text
=
html
.
tostring
(
element_obj
,
encoding
=
"unicode"
)
return
rich_text
def
push_data_to_user
(
res_data
:
Dict
)
->
Dict
:
"""
处理数据为可以入库的格式
:param res_data:
:return:
"""
qiniu_img_list
=
[]
if
res_data
[
"img_list"
]:
for
img_url
in
res_data
[
"img_list"
]:
try
:
img_wb
=
retry_get_url
(
img_url
)
.
content
res
=
upload
(
img_wb
)
print
(
res
)
img_info
=
retry_get_url
(
res
+
"-imageinfo"
)
img_info_json
=
img_info
.
json
()
qiniu_img_list
.
append
((
res
+
"-w"
,
img_info_json
))
except
Exception
as
e
:
print
(
"down load img error
%
s"
%
e
)
return
{}
# 替换图片
content
=
res_data
.
get
(
"content"
)
if
content
:
for
count
,
img_url
in
enumerate
(
res_data
[
"img_list"
]):
# print(qiniu_img_list[count][0])
content
=
content
.
replace
(
img_url
,
qiniu_img_list
[
count
][
0
])
res_data
[
"qiniu_img_list"
]
=
qiniu_img_list
res_data
[
"content"
]
=
content
# 处理格式
res_data
[
"content"
]
=
gm_convert_html_tags
(
res_data
[
"content"
],
all_tags
=
True
)
return
res_data
def
write_data_into_mysql
(
res_data
:
Dict
,
user_id_list
:
List
):
conn
=
pymysql
.
connect
(
host
=
'172.16.30.138'
,
port
=
3306
,
user
=
'mimas'
,
passwd
=
'GJL3UJe1Ck9ggL6aKnZCq4cRvM'
,
db
=
'mimas_prod'
,
charset
=
'utf8mb4'
)
cur
=
conn
.
cursor
()
now_str
=
datetime
.
datetime
.
now
()
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
# 清洗数据为可以入库的格式
data
=
push_data_to_user
(
res_data
)
if
not
data
.
get
(
"content"
):
return
None
if
not
data
.
get
(
"qiniu_img_list"
):
return
None
tractate_id
=
0
# 判断平台id
if
data
[
"platform"
]
==
"weibo"
:
platform_value
=
17
elif
data
[
"platform"
]
==
"douban"
:
platform_value
=
12
elif
data
[
"platform"
]
==
"zhihu"
:
platform_value
=
14
elif
data
[
"platform"
]
==
"toutiao"
:
platform_value
=
13
try
:
sql_query
=
"""insert into api_tractate
(user_id,content,is_online,status,platform,content_level,is_excellent,create_time,last_modified,user_del,low_quality,low_quality_deal,platform_id,pgc_type,title)
values ({user_id},'{content}',{is_online},{status},{platform},{content_level},{is_excellent},'{create_time}','{last_modified}',{user_del},{low_quality},{low_quality_deal},'{platform_id}',{pgc_type},'{title}');"""
.
format
(
user_id
=
random
.
choice
(
user_id_list
),
content
=
data
[
"content"
],
is_online
=
0
,
status
=
2
,
platform
=
platform_value
,
content_level
=
data
[
"level"
],
is_excellent
=
0
,
create_time
=
now_str
,
last_modified
=
now_str
,
user_del
=
0
,
low_quality
=
0
,
low_quality_deal
=
0
,
platform_id
=
data
[
"doc_id"
],
pgc_type
=
0
,
title
=
data
[
"title"
])
res
=
cur
.
execute
(
sql_query
)
tractate_id
=
int
(
conn
.
insert_id
())
if
res
:
conn
.
commit
()
except
Exception
as
e
:
print
(
"commit error
%
s"
%
e
)
print
(
data
)
conn
.
rollback
()
if
data
.
get
(
"qiniu_img_list"
):
for
img_info
in
data
.
get
(
"qiniu_img_list"
):
if
img_info
[
0
]
in
data
.
get
(
"content"
):
image_url_source
=
2
else
:
image_url_source
=
3
try
:
image_type
=
img_type
.
get
(
img_info
[
1
][
"format"
]
.
upper
())
except
:
image_type
=
1
try
:
width
=
img_info
[
1
][
"width"
]
height
=
img_info
[
1
][
"height"
]
except
:
width
=
0
height
=
0
try
:
if
img_type
==
7
:
sql_query
=
"""
insert into api_tractate_images (tractate_id,image_url,width,image_webp,height,image_url_source,image_type,image_webp,create_time,update_time)
values ({tractate_id},'{image_url}',{width},{height},{image_webp},{image_url_source},{image_type},{image_webp},'{create_time}','{update_time}')
"""
.
format
(
tractate_id
=
tractate_id
,
image_url
=
img_info
[
0
],
width
=
width
,
height
=
height
,
image_url_source
=
image_url_source
,
image_type
=
image_type
,
image_webp
=
img_info
[
0
],
create_time
=
now_str
,
update_time
=
now_str
)
else
:
sql_query
=
"""
insert into api_tractate_images (tractate_id,image_url,width,height,image_url_source,image_type,create_time,update_time)
values ({tractate_id},'{image_url}',{width},{height},{image_url_source},{image_type},'{create_time}','{update_time}')
"""
.
format
(
tractate_id
=
tractate_id
,
image_url
=
img_info
[
0
],
width
=
width
,
height
=
height
,
image_url_source
=
image_url_source
,
image_type
=
image_type
,
create_time
=
now_str
,
update_time
=
now_str
)
res
=
cur
.
execute
(
sql_query
)
if
res
:
conn
.
commit
()
except
Exception
as
e
:
print
(
"commit error
%
s"
%
e
)
conn
.
rollback
()
cur
.
close
()
conn
.
close
()
if
tractate_id
:
return
tractate_id
else
:
return
None
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment