# coding:utf-8 import os.path from functools import partial from io import open as iopen from urllib.parse import urlsplit from multiprocessing import Pool import requests class IMGDownloader(object): """图片下载器""" IMG_TYPES = ['jpg', 'gif', 'png', 'tif', 'svg', 'jpeg'] @classmethod def download(cls, urls, des='.'): """批量下载保存""" if not os.path.exists(des): os.makedirs(des, exist_ok=True) pool = Pool(processes=8) pool.map(partial(cls._download, des), urls) pool.close() pool.join() @classmethod def _download(cls, des, url): try: img_type, content = cls.req_img(url) except: return file_name = cls.get_file_name(url, True) full_path = os.path.join(des, file_name + '.' + img_type) if os.path.exists(full_path): return with iopen(full_path, 'wb') as img: img.write(content) @staticmethod def get_file_name(url, full_url_path=False): """生成图片名称。 :param url: url 文件url :param full_url_path: 是否使用url的全路径作为文件名称, 例如:2018/08/03/67867 -> 2018_08_03_67867 :return: """ split_result = urlsplit(url) if full_url_path: name = split_result.path.replace('/', '_') else: name = split_result.path.split('/')[-1] if not name: raise Exception("请求地址不能为一个跟域名") # 判断文件名是否有后缀,如果有则去掉 return name.rsplit('.', 1)[0] @classmethod def req_img(cls, url): """请求img内容。 返回一个tuple(图片类型, 图片内容) :param url: img url :return: (img_name, img_content) """ res = requests.get(url) if res.status_code == requests.codes.ok: try: img_type = res.headers['Content-Type'].split('/')[1] except: img_type = None if img_type not in cls.IMG_TYPES: raise requests.RequestException("{img_type}图片格式不允许错误".format(img_type=img_type)) return img_type, res.content raise requests.RequestException("请求错误") if __name__ == '__main__': url = ["https://heras.igengmei.com/slide/2018/06/10/0c09d47759"] IMGDownloader.download(url)