# coding:utf-8 import hashlib from lxml import html from django.conf import settings from urllib.parse import urljoin from gm_upload.utils.image_utils import Picture def convert_image(image, watermark=False): """图片统一走这个公共方法""" return { 'image_half': Picture.get_half_path(image), 'image_thumb': Picture.get_thumb_path(image), 'image_wide': Picture.get_wide_path(image), 'small_wide': Picture.get_smallwide_path(image), 'image_slimwidth': Picture.get_slimwidth_path(image), 'image': Picture.get_w_path(image) if watermark else image } def get_data_from_rich_text(rich_text, regex): """ 从富文本中获取需要的数据 :param rich_text: :param regex: :return: """ if not rich_text: return None, [] element_obj = html.fromstring(rich_text) return element_obj, element_obj.xpath(regex) def cleaned_video_url(video_url): if not video_url: return '' return video_url.replace(settings.VIDEO_HOST, '') def replace_video_url_for_rich_text(rich_text, url_dict): """ 替换富文本中的地址 :param rich_text: 富文本内容 :param url_dict: 地址字典 :return: """ _default = ("", True) if not rich_text: return _default regex = u'//video[not(@name="new_video")]' # 获取所有 video 中 不带 name 属性的标签 element_obj, video_list = get_data_from_rich_text(rich_text, regex) replace_count = 0 if not video_list: return rich_text, True for element in video_list: inline_style = element.attrib _video_url = cleaned_video_url(inline_style.get("src", "")) new_video_url = url_dict.get(_video_url, "") if not new_video_url or new_video_url == _video_url: continue inline_style.update({ "src": urljoin(settings.VIDEO_HOST, new_video_url), "name": "new_video", }) replace_count += 1 rich_text = html.tostring(element_obj, encoding="unicode") return rich_text, len(video_list) == replace_count def get_new_video_name(raw_name): if not isinstance(raw_name, bytes): raw_name = raw_name.encode("utf-8") return "{new_video_name}.mp4".format(new_video_name=hashlib.md5(raw_name).hexdigest()) def big_data_iter(qs, fetch_num=100): """ 大数据截断处理 :param qs: 数据 列表 :param fetch_num: 每次处理数量 :return: list """ bgn = 0 while bgn <= len(qs): iter_list = qs[bgn: bgn + fetch_num] if not iter_list: break yield iter_list bgn += fetch_num