common.py 2.63 KB
# coding:utf-8

import hashlib

from lxml import html
from django.conf import settings
from urllib.parse import urljoin

from gm_upload.utils.image_utils import Picture


def convert_image(image, watermark=False):
    """图片统一走这个公共方法"""

    return {
        'image_half': Picture.get_half_path(image),
        'image_thumb': Picture.get_thumb_path(image),
        'image_wide': Picture.get_wide_path(image),
        'small_wide': Picture.get_smallwide_path(image),
        'image_slimwidth': Picture.get_slimwidth_path(image),
        'image': Picture.get_w_path(image) if watermark else image
    }


def get_data_from_rich_text(rich_text, regex):
    """
    从富文本中获取需要的数据
    :param rich_text:
    :param regex:
    :return:
    """
    if not rich_text:
        return None, []

    element_obj = html.fromstring(rich_text)
    return element_obj, element_obj.xpath(regex)


def cleaned_video_url(video_url):
    if not video_url:
        return ''

    return video_url.replace(settings.VIDEO_HOST, '')


def replace_video_url_for_rich_text(rich_text, url_dict):
    """
    替换富文本中的地址
    :param rich_text: 富文本内容
    :param url_dict: 地址字典
    :return:
    """
    _default = ("", True)

    if not rich_text:
        return _default

    regex = u'//video[not(@name="new_video")]'   # 获取所有 video 中 不带 name 属性的标签
    element_obj, video_list = get_data_from_rich_text(rich_text, regex)
    replace_count = 0

    if not video_list:
        return rich_text, True

    for element in video_list:
        inline_style = element.attrib
        _video_url = cleaned_video_url(inline_style.get("src", ""))

        new_video_url = url_dict.get(_video_url, "")
        if not new_video_url or new_video_url == _video_url:
            continue

        inline_style.update({
            "src": urljoin(settings.VIDEO_HOST, new_video_url),
            "name": "new_video",
        })
        replace_count += 1

    rich_text = html.tostring(element_obj, encoding="unicode")

    return rich_text, len(video_list) == replace_count


def get_new_video_name(raw_name):
    if not isinstance(raw_name, bytes):
        raw_name = raw_name.encode("utf-8")

    return "{new_video_name}.mp4".format(new_video_name=hashlib.md5(raw_name).hexdigest())


def big_data_iter(qs, fetch_num=100):
    """
    大数据截断处理
    :param qs: 数据 列表
    :param fetch_num: 每次处理数量
    :return: list
    """
    bgn = 0
    while bgn <= len(qs):
        iter_list = qs[bgn: bgn + fetch_num]
        if not iter_list:
            break
        yield iter_list
        bgn += fetch_num