# coding=utf-8 import json import requests from urllib.parse import urlparse, quote, urljoin from gm_types.mimas.enum import IMAGE_TYPE from django.conf import settings from gm_upload.utils.image_utils import Picture imgcdn = 'http://imgcdn.wanmeizhensuo.com/' qiniucdn = 'http://wanmeizhensuo.qiniudn.com/' SUFFIX_MAP = { '-half': 'imageView2/2/w/320/h/320/q/85', '-small': 'imageView2/2/w/640/q/85', '-thumb': 'imageView2/2/w/120/h/120/q/85', '-w': 'imageView2/2/w/640/h/640/q/100', '-web': 'imageView2/2/w/150/h/150/q/85', '-mid': 'imageView2/2/w/450/q/85', '-200': 'imageView2/2/w/200/h/200/q/85', '-wr': 'imageView2/2/w/640/q/85', } img_type = { "jpeg": IMAGE_TYPE.JPEG, "gif": IMAGE_TYPE.GIF, "jpg": IMAGE_TYPE.JPG, "png": IMAGE_TYPE.PNG, "bmp": IMAGE_TYPE.BMP, "webp": IMAGE_TYPE.WEBP, "tiff": IMAGE_TYPE.TIFF, } def _get_watermark_path(pathname, size): if not SUFFIX_MAP.get(size): return pathname path, watermark = pathname.split("?") return path + "?" + SUFFIX_MAP.get(size) + quote("|") + watermark def get_full_path(path_name, extra=''): if not path_name: return "" if not urlparse(path_name).hostname: path_name = urljoin(settings.QINIU_CDN_HOST, path_name) path_name = path_name.replace(imgcdn, settings.QINIU_CDN_HOST) path_name = path_name.replace(qiniucdn, settings.QINIU_CDN_HOST) if path_name and path_name.endswith(extra): extra = '' elif isinstance(extra, tuple): extra = extra[0] # 带水印标记的图片,特殊处理 if "watermark" in path_name and "?" in path_name: return _get_watermark_path(path_name, extra) return path_name + extra def get_temp_image_path(path_name): return urljoin(settings.QINIU_TEMP_CDN_HOST, path_name) def get_short_path(path_name, extra=''): if path_name: if imgcdn in path_name: short_path = path_name.replace(imgcdn, '') elif qiniucdn in path_name: short_path = path_name.replace(qiniucdn, '') elif settings.QINIU_CDN_HOST in path_name: short_path = path_name.replace(settings.QINIU_CDN_HOST, '') else: short_path = path_name else: short_path = '' return short_path + extra def get_w_path(image_name): return Picture.get_w_path(image_name) def get_thumb_path(image_name): return Picture.get_thumb_path(image_name) def get_half_path(image_name): return Picture.get_half_path(image_name) def get_image_base_info(url, suffix='-imageinfo?ref'): full_path = url + suffix try: resp = requests.get(full_path, timeout=1) if resp.status_code == 200: data = json.loads(resp.text) else: data = {} except: data = {} ret_data = { 'width': data.get('width', 0), 'height': data.get('height', 0), } return ret_data def handle_image_type(image_urls): """处理图片内容类型""" img_dic = {} import asyncio import aiohttp if not image_urls: return img_dic async def get_image_type(image_url): import json from gm_upload.consts import GmImageManager from gm_upload import IMG_TYPE img_domain = GmImageManager(IMG_TYPE.TOPICREPLY).get_domain().replace("https", 'http') request_url = img_domain + "/" + image_url + "-imageinfo" if image_url.startswith("https://heras.igengmei.com"): request_url = image_url.replace("https", 'http') + "-imageinfo" async with aiohttp.ClientSession() as session: async with session.get(request_url) as resp: img_res = await resp.text() img_res = json.loads(img_res) img_dic[image_url] = img_type.get(img_res.get("format"), IMAGE_TYPE.OTHER) or IMAGE_TYPE.OTHER task_list = [] for image_url in image_urls: if not image_url: continue task_list.append(get_image_type(image_url)) new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(task_list)) return img_dic