Commit a18cb3b7 authored by zhanglu's avatar zhanglu

Merge branch 'feature/create_image_zy' into 'dev'

Feature/create image zy

See merge request !56
parents dbe08dcb 075bdd53
......@@ -37,6 +37,8 @@ urlpatterns = [
url(r'^user/get$', UserUpdateOrCreate.as_view()),
url(r'^user/create$', UserUpdateOrCreate.as_view()),
url(r'^user/group/list$', UserGroupView.as_view()),
url(r'^user/images/get$', UserImage.as_view()),
url(r'^user/images/create$', UserImage.as_view()),
# group相关
url(r'^group/list$', GroupListView.as_view()),
......
......@@ -5,9 +5,14 @@
import json
from django.conf import settings
from gm_upload import upload, IMG_TYPE, upload_with_short
from utils.base import APIView
from django.contrib.auth.hashers import make_password
from utils.download import download_img_func
from utils.logger import error_logger
from utils.pic_tools import tailor_image
class UserListView(APIView):
......@@ -90,3 +95,60 @@ class UserGroupView(APIView):
error_logger.error(u'获取小组用户详情失败%s' , e)
raise
return data
class UserImage(APIView):
def post(self, request):
image_id = request.POST.get('image_id', '')
image_url = request.POST.get('image_url', '')
user_id = request.POST.get('user_id', '')
if not all([image_id, image_url, user_id]):
error_logger.error(u'参数不完整')
return
err, data = self.rpc['saber/face/detect'](url=image_url).unwrap()
if err:
error_logger.error(u'获取人脸失败')
faces = [
{
"point": [
item["rect"]["left"],
item["rect"]["top"],
item["rect"]["right"]-item["rect"]["left"],
item["rect"]["bottom"]-item["rect"]["top"],
]
}
for item in data.get("faces", [])
]
try:
point = faces[0].get('point', [])
file_path = download_img_func(image_url, user_id, suffix="png")
images = tailor_image(file_path, point)
header_pic_path = images[0]
with open(header_pic_path, 'rb') as f:
image_url = upload_with_short(f, img_type=IMG_TYPE.USEREXTRA)[0]
except IndexError:
error_logger.error(u'获取人脸失败')
return
except:
error_logger.error(u'上传失败')
return
result = self.rpc['venus/sun/user/create_update_image'](
url=image_url, user_id=user_id).unwrap()
return {
'message': '创建成功' if result else '创建失败'
}
def get(self, request):
user_id = request.GET.get('user_id')
offset = int(request.GET.get('page', 1))
count = int(request.GET.get('limit', 10))
try:
data = self.rpc['venus/sun/user/user_image_list'](
user_id=user_id, offset=(offset - 1) * count, limit=count).unwrap()
except Exception as e:
error_logger.error(u'获取%s用户头像失败%s' % (id, e))
raise
return data
......@@ -13,3 +13,5 @@ git+ssh://git@git.wanmeizhensuo.com/backend/gm-logging.git@v0.7.8
git+ssh://git@git.wanmeizhensuo.com/backend/gm-dataquery.git@v0.2.10
git+ssh://git@git.wanmeizhensuo.com/system/gm-tracer.git@v0.1.2
git+ssh://git@git.wanmeizhensuo.com/alpha/alpha-types.git@master
filetype==1.0.2
Pillow==5.4.1
\ No newline at end of file
......@@ -154,3 +154,6 @@ PUPPET_PASSWORD = 123456
# 重置运营管理登陆密码
OPERATOR_PASSWORD = 123456
# 图片下载的存储路径
DOWNLOAD_IMAGE_PATH = u'/header-images/'
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import requests
from django.conf import settings
class DownloadFunc(object):
base_path = settings.DOWNLOAD_IMAGE_PATH
@staticmethod
def get_file_base_path(sole_id=None):
dir_path = os.path.join(DownloadFunc.base_path, sole_id)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
return dir_path
@staticmethod
def format_file_abs_path_and_name(dir_path, file_name):
"""
拼接文件绝对地址
:param dir_path: 地址
:param file_name: 文件名
:return:
"""
return os.path.join(dir_path, file_name)
@staticmethod
def get_url_content(url):
"""
:param url:
:return:
"""
content = b''
for _ in range(3):
try:
response = requests.get(url, timeout=10)
return response.content
except:
raise Exception('download failed')
return content
@classmethod
def download_resources(cls, url, sole_id, suffix=""):
"""
下载文件资源
:param url: 下载地址
:param sole_id: 唯一id
:param suffix: 后缀
:return:
"""
_content = cls.get_url_content(url)
if _content:
dir_path = cls.get_file_base_path(sole_id)
name = url.rsplit("/", 1)[-1]
if suffix:
name = "{name}.{suffix}".format(name=name, suffix=suffix)
file_path = cls.format_file_abs_path_and_name(dir_path, name)
# 写入本地
with open(file_path, "wb") as w_:
w_.write(_content)
return file_path
raise Exception("write Failed")
download_img_func = DownloadFunc.download_resources
get_dir_path = DownloadFunc.get_file_base_path
import os.path
import mimetypes
import filetype
from PIL import Image
from django.conf import settings
def get_file_type(filepath, ft=None):
ft = filetype.guess(filepath)
ext = ft.extension
return ext if ext else 'txt'
def get_filename(name):
h = hash(name)
if h < 0:
h = -h
return "{}".format(h)
def tailor_image(image_path, points, base_path='.'):
"""以左上角为原点 向右建立x轴 向下建立y轴, 同时截取多张
:param image: Image obj
:param point: [(x, y, delta_x, delta_y)]
:return:
"""
image = Image.open(image_path)
heiget = image.height
width = image.width
images = []
for idx, point in enumerate(points):
if not (width >= point[0] and width >= point[0] + point[2]) or \
not (heiget >= point[1] and heiget >= point[1] + point[3]):
continue
border = (point[0], point[1], point[0] + point[2], point[1] + point[3])
choice_image = image.crop(border)
filename = "{}_{}.{}".format(get_filename(image_path), idx, get_file_type(image_path))
base_dir = os.path.join(settings.DOWNLOAD_IMAGE_PATH)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
file_path = os.path.join(base_dir, filename)
print(file_path)
choice_image.save(file_path)
images.append(file_path)
return images
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment