1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# coding:utf-8
import os.path
from functools import partial
from io import open as iopen
from urllib.parse import urlsplit
from multiprocessing import Pool
import requests
class IMGDownloader(object):
"""图片下载器"""
IMG_TYPES = ['jpg', 'gif', 'png', 'tif', 'svg', 'jpeg']
@classmethod
def download(cls, urls, des='.'):
"""批量下载保存"""
if not os.path.exists(des):
os.makedirs(des, exist_ok=True)
pool = Pool(processes=8)
pool.map(partial(cls._download, des), urls)
pool.close()
pool.join()
@classmethod
def _download(cls, des, url):
try:
img_type, content = cls.req_img(url)
except:
return
file_name = cls.get_file_name(url, True)
full_path = os.path.join(des, file_name + '.' + img_type)
if os.path.exists(full_path):
return
with iopen(full_path, 'wb') as img:
img.write(content)
@staticmethod
def get_file_name(url, full_url_path=False):
"""生成图片名称。
:param url: url 文件url
:param full_url_path: 是否使用url的全路径作为文件名称,
例如:2018/08/03/67867 -> 2018_08_03_67867
:return:
"""
split_result = urlsplit(url)
if full_url_path:
name = split_result.path.replace('/', '_')
else:
name = split_result.path.split('/')[-1]
if not name:
raise Exception("请求地址不能为一个跟域名")
# 判断文件名是否有后缀,如果有则去掉
return name.rsplit('.', 1)[0]
@classmethod
def req_img(cls, url):
"""请求img内容。
返回一个tuple(图片类型, 图片内容)
:param url: img url
:return: (img_name, img_content)
"""
res = requests.get(url)
if res.status_code == requests.codes.ok:
try:
img_type = res.headers['Content-Type'].split('/')[1]
except:
img_type = None
if img_type not in cls.IMG_TYPES:
raise requests.RequestException("{img_type}图片格式不允许错误".format(img_type=img_type))
return img_type, res.content
raise requests.RequestException("请求错误")
if __name__ == '__main__':
url = ["https://heras.igengmei.com/slide/2018/06/10/0c09d47759"]
IMGDownloader.download(url)