1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# coding:utf-8
import hashlib
from lxml import html
from django.conf import settings
from urllib.parse import urljoin
from gm_upload.utils.image_utils import Picture
def convert_image(image, watermark=False):
"""图片统一走这个公共方法"""
return {
'image_half': Picture.get_half_path(image),
'image_thumb': Picture.get_thumb_path(image),
'image_wide': Picture.get_wide_path(image),
'small_wide': Picture.get_smallwide_path(image),
'image_slimwidth': Picture.get_slimwidth_path(image),
'image': Picture.get_w_path(image) if watermark else image
}
def get_data_from_rich_text(rich_text, regex):
"""
从富文本中获取需要的数据
:param rich_text:
:param regex:
:return:
"""
if not rich_text:
return None, []
element_obj = html.fromstring(rich_text)
return element_obj, element_obj.xpath(regex)
def cleaned_video_url(video_url):
if not video_url:
return ''
return video_url.replace(settings.VIDEO_HOST, '')
def replace_video_url_for_rich_text(rich_text, url_dict):
"""
替换富文本中的地址
:param rich_text: 富文本内容
:param url_dict: 地址字典
:return:
"""
_default = ("", True)
if not rich_text:
return _default
regex = u'//video[not(@name="new_video")]' # 获取所有 video 中 不带 name 属性的标签
element_obj, video_list = get_data_from_rich_text(rich_text, regex)
replace_count = 0
if not video_list:
return rich_text, True
for element in video_list:
inline_style = element.attrib
_video_url = cleaned_video_url(inline_style.get("src", ""))
new_video_url = url_dict.get(_video_url, "")
if not new_video_url or new_video_url == _video_url:
continue
inline_style.update({
"src": urljoin(settings.VIDEO_HOST, new_video_url),
"name": "new_video",
})
replace_count += 1
rich_text = html.tostring(element_obj, encoding="unicode")
return rich_text, len(video_list) == replace_count
def get_new_video_name(raw_name):
if not isinstance(raw_name, bytes):
raw_name = raw_name.encode("utf-8")
return "{new_video_name}.mp4".format(new_video_name=hashlib.md5(raw_name).hexdigest())
def big_data_iter(qs, fetch_num=100):
"""
大数据截断处理
:param qs: 数据 列表
:param fetch_num: 每次处理数量
:return: list
"""
bgn = 0
while bgn <= len(qs):
iter_list = qs[bgn: bgn + fetch_num]
if not iter_list:
break
yield iter_list
bgn += fetch_num