Commit d9d0c4ba authored by ibuler's avatar ibuler

Merge with zhuima

parents bcfeaa2d 84c9153f
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
import os
import six
import logging
from io import IOBase
from . import utils
from .exception import LoadAccessKeyError
class AccessKeyAuth(object):
def __init__(self, access_key_id, access_key_secret):
self.id = access_key_id
self.secret = access_key_secret
def sign_request(self, req):
req.headers['Date'] = utils.http_date()
signature = utils.make_signature(self.secret)
req.headers['Authorization'] = "Sign {0}:{1}".format(self.id, signature)
return req
class AccessTokenAuth(object):
def __init__(self, token):
self.token = token
def sign_request(self, req):
req.headers['Authorization'] = 'Bearer {0}'.format(self.token)
return req
class SessionAuth(object):
def __init__(self, session_id, csrf_token):
self.session_id = session_id
self.csrf_token = csrf_token
def sign_request(self, req):
cookie = [v for v in req.headers.get('Cookie', '').split(';')
if v.strip()]
cookie.extend(['sessionid='+self.session_id,
'csrftoken='+self.csrf_token])
req.headers['Cookie'] = ';'.join(cookie)
req.headers['X-CSRFTOKEN'] = self.csrf_token
return req
class Auth(object):
def __init__(self, token=None, access_key_id=None, access_key_secret=None,
session_id=None, csrf_token=None):
if token is not None:
self.instance = AccessTokenAuth(token)
elif access_key_id and access_key_secret:
self.instance = AccessKeyAuth(access_key_id, access_key_secret)
elif session_id and csrf_token:
self.instance = SessionAuth(session_id, csrf_token)
else:
raise OSError('Need token or access_key_id, access_key_secret '
'or session_id, csrf_token')
def sign_request(self, req):
return self.instance.sign_request(req)
class AccessKey(object):
def __init__(self, id=None, secret=None):
self.id = id
self.secret = secret
def clean(self, value, delimiter=':', silent=False):
try:
self.id, self.secret = value.split(delimiter)
except (AttributeError, ValueError) as e:
if not silent:
raise LoadAccessKeyError(e)
else:
return ':'.join([self.id, self.secret])
def load_from_env(self, env, delimiter=':', silent=False):
value = os.environ.get(env)
return self.clean(value, delimiter, silent)
def load_from_f(self, f, delimiter=':', silent=False):
value = ''
if isinstance(f, six.string_types) and os.path.isfile(f):
f = open(f)
if hasattr(f, 'read'):
for line in f:
if line and not line.strip().startswith('#'):
value = line.strip()
break
f.close()
return self.clean(value, delimiter, silent)
def save_to_f(self, f, silent=False):
if isinstance(f, six.string_types):
f = open(f, 'w')
try:
f.write(str('{0}:{1}'.format(self.id, self.secret)))
except IOError as e:
logging.error('Save access key error: {}'.format(e))
if not silent:
raise
finally:
f.close()
def __nonzero__(self):
return bool(self.id and self.secret)
__bool__ = __nonzero__
def __str__(self):
return '{0}:{1}'.format(self.id, self.secret)
__repr__ = __str__
class ServiceAccessKey(AccessKey):
"""使用Access key来认证"""
# 默认从配置文件中读取的设置
# 配置文件中ACCESS_KEY值的名称
conf_attr_var = 'ACCESS_KEY'
# 配置文件中配置环境变量的名称
conf_env_var = 'ACCESS_KEY_ENV'
# 配置文件中定义Access key store的位置
conf_store_var = 'ACCESS_KEY_STORE'
# 如果用户配置中没有设置, 方法中也没填入, 使用下面默认
default_key_env = 'ACCESS_KEY_ENV'
default_key_store = os.path.join(os.environ.get('HOME', ''), '.access_key')
def __init__(self, id=None, secret=None, config=None):
super(ServiceAccessKey, self).__init__(id=id, secret=secret)
self.config = config or {}
self._key_store = None
self._key_env = None
# 获取key store位置
@property
def key_store(self):
if self._key_store:
return self._key_store
elif self.conf_store_var in self.config:
return self.config[self.conf_store_var]
else:
return self.default_key_store
@key_store.setter
def key_store(self, value):
self._key_store = value
# 获取access key的环境变量名
@property
def key_env(self):
if self._key_env:
return self._key_env
elif self.conf_env_var in self.config:
return self.config[self.conf_env_var]
else:
return self.default_key_env
@key_env.setter
def key_env(self, value):
self._key_env = value
def load_from_conf_env(self, env=None, delimiter=':'):
if env is None:
env = self.key_env
return super(ServiceAccessKey, self).\
load_from_env(env, delimiter=delimiter)
def load_from_conf_setting(self, attr=None, delimiter=':', silent=False):
value = ''
if attr is None:
attr = self.conf_attr_var
if attr in self.config:
value = self.config.get(attr)
return self.clean(value, delimiter, silent)
def load_from_key_store(self, f=None, delimiter=':', silent=False):
if f is None:
f = self.key_store
return super(ServiceAccessKey, self).load_from_f(f, delimiter, silent)
def load_from_conf_all(self, **kwargs):
"""Should return access_key_id, access_key_secret"""
for method in [self.load_from_conf_setting,
self.load_from_key_store,
self.load_from_conf_env]:
try:
return method(**kwargs)
except LoadAccessKeyError:
continue
if not (bool(self.id) and bool(self.secret)):
logging.error('Load access key failed')
def save_to_key_store(self, key_store=None, silent=True):
if key_store is None:
key_store = self.key_store
return super(ServiceAccessKey, self).save_to_f(key_store, silent)
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
兼容Python版本
"""
import sys
is_py2 = (sys.version_info[0] == 2)
is_py3 = (sys.version_info[0] == 3)
try:
import simplejson as json
except (ImportError, SyntaxError):
import json
if is_py2:
def to_bytes(data):
"""若输入为unicode, 则转为utf-8编码的bytes;其他则原样返回。"""
if isinstance(data, unicode):
return data.encode('utf-8')
else:
return data
def to_string(data):
"""把输入转换为str对象"""
return to_bytes(data)
def to_unicode(data):
"""把输入转换为unicode,要求输入是unicode或者utf-8编码的bytes。"""
if isinstance(data, bytes):
return data.decode('utf-8')
else:
return data
def stringify(input):
if isinstance(input, dict):
return dict([(stringify(key), stringify(value)) for key,value in input.iteritems()])
elif isinstance(input, list):
return [stringify(element) for element in input]
elif isinstance(input, unicode):
return input.encode('utf-8')
else:
return input
builtin_str = str
bytes = str
str = unicode
elif is_py3:
def to_bytes(data):
"""若输入为str(即unicode),则转为utf-8编码的bytes;其他则原样返回"""
if isinstance(data, str):
return data.encode(encoding='utf-8')
else:
return data
def to_string(data):
"""若输入为bytes,则认为是utf-8编码,并返回str"""
if isinstance(data, bytes):
return data.decode('utf-8')
else:
return data
def to_unicode(data):
"""把输入转换为unicode,要求输入是unicode或者utf-8编码的bytes。"""
return to_string(data)
def stringify(input):
return input
builtin_str = str
bytes = bytes
str = str
\ No newline at end of file
......@@ -261,3 +261,22 @@ class Config(dict):
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, dict.__repr__(self))
API_URL_MAPPING = {
'terminal-register': '/api/applications/v1/terminal/register/',
'terminal-heatbeat': '/api/applications/v1/terminal/heatbeat/',
'send-proxy-log': '/api/audits/v1/proxy-log/receive/',
'finish-proxy-log': '/api/audits/v1/proxy-log/%s/',
'send-command-log': '/api/audits/v1/command-log/',
'send-record-log': '/api/audits/v1/record-log/',
'user-auth': '/api/users/v1/auth/',
'user-assets': '/api/perms/v1/user/%s/assets/',
'user-asset-groups': '/api/perms/v1/user/%s/asset-groups/',
'user-asset-groups-assets': '/api/perms/v1/user/my/asset-groups-assets/',
'assets-of-group': '/api/perms/v1/user/my/asset-group/%s/assets/',
'my-profile': '/api/users/v1/profile/',
'system-user-auth-info': '/api/assets/v1/system-user/%s/auth-info/',
'validate-user-asset-permission':
'/api/perms/v1/asset-permission/user/validate/',
}
......@@ -4,3 +4,11 @@
class PermissionFailed(Exception):
pass
class LoadAccessKeyError(Exception):
pass
class RequestError(Exception):
pass
# -*- coding: utf-8 -*-
#
import os
import json
import base64
import logging
import paramiko
import requests
from requests.structures import CaseInsensitiveDict
from cachetools import cached, TTLCache
from .auth import Auth, ServiceAccessKey
from .utils import sort_assets, PKey, to_dotmap, timestamp_to_datetime_str
from .exception import RequestError, LoadAccessKeyError
from .config import API_URL_MAPPING
_USER_AGENT = 'jms-sdk-py'
CACHED_TTL = os.environ.get('CACHED_TTL', 30)
class FakeResponse(object):
def __init__(self):
self.status_code = 500
@staticmethod
def json():
return {}
class Request(object):
func_mapping = {
'get': requests.get,
'post': requests.post,
'patch': requests.patch,
'put': requests.put,
}
def __init__(self, url, method='get', data=None, params=None, headers=None,
content_type='application/json', app_name=''):
self.url = url
self.method = method
self.params = params or {}
self.result = None
if not isinstance(headers, dict):
headers = {}
self.headers = CaseInsensitiveDict(headers)
self.headers['Content-Type'] = content_type
if data is None:
data = {}
self.data = json.dumps(data)
if 'User-Agent' not in self.headers:
if app_name:
self.headers['User-Agent'] = _USER_AGENT + '/' + app_name
else:
self.headers['User-Agent'] = _USER_AGENT
def request(self):
self.result = self.func_mapping.get(self.method)(
url=self.url, headers=self.headers,
data=self.data,
params=self.params)
print(self.headers)
return self.result
class ApiRequest(object):
api_url_mapping = API_URL_MAPPING
def __init__(self, app_name, endpoint, auth=None):
self.app_name = app_name
self._auth = auth
self.req = None
self.endpoint = endpoint
@staticmethod
def parse_result(result):
try:
content = result.json()
except ValueError:
content = {'error': 'We only support json response'}
logging.warning(result.content)
logging.warning(content)
except AttributeError:
content = {'error': 'Request error'}
return result, content
def request(self, api_name=None, pk=None, method='get', use_auth=True,
data=None, params=None, content_type='application/json'):
if api_name in self.api_url_mapping:
path = self.api_url_mapping.get(api_name)
if pk and '%s' in path:
path = path % pk
else:
path = '/'
url = self.endpoint.rstrip('/') + path
print(url)
self.req = req = Request(url, method=method, data=data,
params=params, content_type=content_type,
app_name=self.app_name)
if use_auth:
if not self._auth:
raise RequestError('Authentication required')
else:
self._auth.sign_request(req)
try:
result = req.request()
if result.status_code > 500:
logging.warning('Server internal error')
except (requests.ConnectionError, requests.ConnectTimeout):
result = FakeResponse()
logging.warning('Connect endpoint: {} error'.format(self.endpoint))
return self.parse_result(result)
def get(self, *args, **kwargs):
kwargs['method'] = 'get'
print("+"* 10)
print(*args)
print("+"* 10)
# print(**kwargs)
print("+"* 10)
return self.request(*args, **kwargs)
def post(self, *args, **kwargs):
kwargs['method'] = 'post'
return self.request(*args, **kwargs)
def put(self, *args, **kwargs):
kwargs['method'] = 'put'
return self.request(*args, **kwargs)
def patch(self, *args, **kwargs):
kwargs['method'] = 'patch'
return self.request(*args, **kwargs)
class AppService(ApiRequest):
"""使用该类和Jumpserver api进行通信,将terminal用到的常用api进行了封装,
直接调用方法即可.
from jms import AppService
service = AppService(app_name='coco', endpoint='http://localhost:8080')
# 如果app是第一次启动, 注册一下,并得到 access key, 然后认真
service.register()
service.auth() # 直接使用注册得到的access key进行认证
# 如果已经启动过, 需要使用access key进行认证
service.auth(access_key_id, access_key_secret)
service.check_auth() # 检测一下是否认证有效
data = {
"username": "ibuler",
"name": "Guanghongwei",
"hostname": "localhost",
"ip": "127.0.0.1",
"system_user": "web",
"login_type": "ST",
"is_failed": 0,
"date_start": 1484206685,
}
service.send_proxy_log(data)
"""
access_key_class = ServiceAccessKey
def __init__(self, app_name, endpoint, auth=None, config=None):
super(AppService, self).__init__(app_name, endpoint, auth=auth)
self.config = config
self.access_key = self.access_key_class(config=config)
self.user = None
self.token = None
self.session_id = None
self.csrf_token = None
def auth(self, access_key_id=None, access_key_secret=None):
"""App认证, 请求api需要签名header
:param access_key_id: 注册时或新建app用户生成access key id
:param access_key_secret: 同上access key secret
"""
if None not in (access_key_id, access_key_secret):
self.access_key.id = access_key_id
self.access_key.secret = access_key_secret
self._auth = Auth(access_key_id=self.access_key.id,
access_key_secret=self.access_key.secret)
def auth_magic(self):
"""加载配置文件定义的变量,尝试从配置文件, Keystore, 环境变量加载
Access Key 然后进行认证
"""
self.access_key = self.access_key_class(config=self.config)
self.access_key.load_from_conf_all()
if self.access_key:
self._auth = Auth(access_key_id=self.access_key.id,
access_key_secret=self.access_key.secret)
else:
raise LoadAccessKeyError('Load access key all failed, auth ignore')
def register_terminal(self):
"""注册Terminal, 通常第一次启动需要向Jumpserver注册
content: {
'terminal': {'id': 1, 'name': 'terminal name', ...},
'user': {
'username': 'same as terminal name',
'name': 'same as username',
},
'access_key_id': 'ACCESS KEY ID',
'access_key_secret': 'ACCESS KEY SECRET',
}
"""
r, content = self.post('terminal-register',
data={'name': self.app_name},
use_auth=False)
if r.status_code == 201:
logging.info('Your can save access_key: %s somewhere '
'or set it in config' % content['access_key_id'])
return True, to_dotmap(content)
elif r.status_code == 200:
logging.error('Terminal {} exist already, register failed'
.format(self.app_name))
else:
logging.error('Register terminal {} failed'.format(self.app_name))
return False, None
def terminal_heatbeat(self):
"""和Jumpserver维持心跳, 当Terminal断线后,jumpserver可以知晓
Todo: Jumpserver发送的任务也随heatbeat返回, 并执行,如 断开某用户
"""
r, content = self.post('terminal-heatbeat', use_auth=True)
if r.status_code == 201:
return content
else:
return None
def is_authenticated(self):
"""执行auth后只是构造了请求头, 可以使用该方法连接Jumpserver测试认证"""
result = self.terminal_heatbeat()
return result
def validate_user_asset_permission(self, user_id, asset_id, system_user_id):
"""验证用户是否有登录该资产的权限"""
params = {
'user_id': user_id,
'asset_id': asset_id,
'system_user_id': system_user_id,
}
r, content = self.get('validate-user-asset-permission',
use_auth=True,
params=params)
if r.status_code == 200:
return True
else:
return False
def get_system_user_auth_info(self, system_user):
"""获取系统用户的认证信息: 密码, ssh私钥"""
r, content = self.get('system-user-auth-info', pk=system_user['id'])
if r.status_code == 200:
password = content['password'] or ''
private_key_string = content['private_key'] or ''
if private_key_string and private_key_string.find('PRIVATE KEY'):
private_key = PKey.from_string(private_key_string)
else:
private_key = None
if isinstance(private_key, paramiko.PKey) \
and len(private_key_string.split('\n')) > 2:
private_key_log_msg = private_key_string.split('\n')[1]
else:
private_key_log_msg = 'None'
logging.debug('Get system user %s password: %s*** key: %s***' %
(system_user['username'], password[:4],
private_key_log_msg))
return password, private_key
else:
logging.warning('Get system user %s password or private key failed'
% system_user['username'])
return None, None
def send_proxy_log(self, data):
"""
:param data: 格式如下
data = {
"user": "username",
"asset": "name",
"system_user": "web",
"login_type": "ST",
"was_failed": 0,
"date_start": timestamp,
}
"""
assert isinstance(data.get('date_start'), (int, float))
data['date_start'] = timestamp_to_datetime_str(data['date_start'])
data['is_failed'] = 1 if data.get('is_failed') else 0
r, content = self.post('send-proxy-log', data=data, use_auth=True)
if r.status_code != 201:
logging.warning('Send proxy log failed: %s' % content)
return None
else:
return content['id']
def finish_proxy_log(self, data):
""" 退出登录资产后, 需要汇报结束 时间等
:param data: 格式如下
data = {
"proxy_log_id": 123123,
"date_finished": timestamp,
}
"""
assert isinstance(data.get('date_finished'), (int, float))
data['date_finished'] = timestamp_to_datetime_str(data['date_finished'])
data['is_failed'] = 1 if data.get('is_failed') else 0
data['is_finished'] = 1
proxy_log_id = data.get('proxy_log_id') or 0
r, content = self.patch('finish-proxy-log', pk=proxy_log_id, data=data)
if r.status_code != 200:
logging.warning('Finish proxy log failed: %s' % proxy_log_id)
return False
return True
def send_command_log(self, data):
"""用户输入命令后发送到Jumpserver保存审计
:param data: 格式如下
data = [{
"proxy_log_id": 22,
"user": "admin",
"asset": "localhost",
"system_user": "web",
"command_no": 1,
"command": "ls",
"output": cmd_output, ## base64.b64encode(output),
"timestamp": timestamp,
},..]
"""
assert isinstance(data, (dict, list))
if isinstance(data, dict):
data = [data]
for d in data:
if not d.get('output'):
continue
output = d['output'].encode('utf-8', 'ignore')
d['output'] = base64.b64encode(output).decode("utf-8")
result, content = self.post('send-command-log', data=data)
if result.status_code != 201:
logging.warning('Send command log failed: %s' % content)
return False
return True
def send_record_log(self, data):
"""将输入输出发送给Jumpserver, 用来录像回放
:param data: 格式如下
data = [{
"proxy_log_id": 22,
"output": "backend server output, either input or output",
"timestamp": timestamp,
}, ...]
"""
assert isinstance(data, (dict, list))
if isinstance(data, dict):
data = [data]
for d in data:
if d.get('output') and isinstance(d['output'], str):
d['output'] = d['output'].encode('utf-8')
d['output'] = base64.b64encode(d['output'])
result, content = self.post('send-record-log', data=data)
if result.status_code != 201:
logging.warning('Send record log failed: %s' % content)
return False
return True
# Todo: 或许没什么用
# def check_user_authentication(self, token=None, session_id=None,
# csrf_token=None):
# """
# 用户登陆webterminal或其它网站时,检测用户cookie中的sessionid和csrf_token
# 是否合法, 如果合法返回用户,否则返回空
# :param session_id: cookie中的 sessionid
# :param csrf_token: cookie中的 csrftoken
# :return: user object or None
# """
# user_service = UserService(endpoint=self.endpoint)
# user_service.auth(token=token, session_id=session_id,
# csrf_token=csrf_token)
# user = user_service.is_authenticated()
# return user
def login(self, data):
"""用户登录Terminal时需要向Jumpserver进行认证, 登陆成功后返回用户和token
data = {
'username': 'admin',
'password': 'admin',
'public_key': 'public key string',
'login_type': 'ST', # (('ST', 'SSH Terminal'),
# ('WT', 'Web Terminal'))
'remote_addr': '2.2.2.2', # User ip address not app address
}
"""
r, content = self.post('user-auth', data=data, use_auth=False)
if r.status_code == 200:
self.token = content['token']
self.user = content['user']
self.auth(self.token)
return self.user, self.token
else:
return None, None
@cached(TTLCache(maxsize=100, ttl=60))
def get_user_assets(self, user):
"""获取用户被授权的资产列表
[{'hostname': 'x', 'ip': 'x', ...,
'system_users_granted': [{'id': 1, 'username': 'x',..}]
]
"""
r, content = self.get('user-assets', pk=user['id'], use_auth=True)
if r.status_code == 200:
assets = content
else:
assets = []
assets = sort_assets(assets)
for asset in assets:
asset['system_users'] = \
[system_user for system_user in asset.get('system_users_granted')]
return to_dotmap(assets)
@cached(TTLCache(maxsize=100, ttl=60))
def get_user_asset_groups(self, user):
"""获取用户授权的资产组列表
[{'name': 'x', 'comment': 'x', 'assets_amount': 2}, ..]
"""
r, content = self.get('user-asset-groups', pk=user['id'], uassetsse_auth=True)
if r.status_code == 200:
asset_groups = content
else:
asset_groups = []
asset_groups = [asset_group for asset_group in asset_groups]
return to_dotmap(asset_groups)
@cached(TTLCache(maxsize=100, ttl=60))
def get_user_asset_groups_assets(self, user):
"""获取用户授权的资产组列表及下面的资产
[{'name': 'x', 'comment': 'x', 'assets': []}, ..]
"""
r, content = self.get('user-asset-groups-assets', pk=user['id'], use_auth=True)
if r.status_code == 200:
asset_groups_assets = content
else:
asset_groups_assets = []
return to_dotmap(asset_groups_assets)
@cached(TTLCache(maxsize=100, ttl=60))
def get_assets_in_group(self, asset_group_id):
"""获取用户在该资产组下的资产, 并非该资产组下的所有资产,而是授权了的
返回资产列表, 和获取资产格式一致
:param asset_group_id: 资产组id
"""
r, content = self.get('assets-of-group', use_auth=True,
pk=asset_group_id)
if r.status_code == 200:
assets = content
else:
assets = []
for asset in assets:
asset['system_users'] = \
[system_user for system_user in asset.get('system_users_granted')]
assets = sort_assets(assets)
return to_dotmap([asset for asset in assets])
#!coding: utf-8
import base64
import calendar
import os
import re
import paramiko
from io import StringIO
from __future__ import unicode_literals
import hashlib
import re
import threading
import base64
import calendar
import time
import datetime
from io import StringIO
import pyte
import pytz
from email.utils import formatdate
import paramiko
from dotmap import DotMap
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
from .compat import to_string, to_bytes
def ssh_key_string_to_obj(text):
......@@ -238,3 +252,109 @@ def wrap_with_primary(text, bolder=False):
def wrap_with_title(text):
return wrap_with_color(text, color='black', background='green')
def b64encode_as_string(data):
return to_string(base64.b64encode(data))
def make_signature(access_key_secret, date=None):
if isinstance(date, bytes):
date = date.decode("utf-8")
if isinstance(date, int):
date_gmt = http_date(date)
elif date is None:
date_gmt = http_date(int(time.time()))
else:
date_gmt = date
data = str(access_key_secret) + "\n" + date_gmt
return content_md5(data)
def split_string_int(s):
"""Split string or int
example: test-01-02-db => ['test-', '01', '-', '02', 'db']
"""
string_list = []
index = 0
pre_type = None
word = ''
for i in s:
if index == 0:
pre_type = int if i.isdigit() else str
word = i
else:
if pre_type is int and i.isdigit() or pre_type is str and not i.isdigit():
word += i
else:
string_list.append(word.lower() if not word.isdigit() else int(word))
word = i
pre_type = int if i.isdigit() else str
index += 1
string_list.append(word.lower() if not word.isdigit() else int(word))
return string_list
def sort_assets(assets, order_by='hostname'):
if order_by == 'hostname':
key = lambda asset: split_string_int(asset['hostname'])
# print(assets)
# assets = sorted(assets, key=key)
elif order_by == 'ip':
assets = sorted(assets, key=lambda asset: [int(d) for d in asset['ip'].split('.') if d.isdigit()])
else:
key = lambda asset: asset.__getitem__(order_by)
assets = sorted(assets, key=key)
return assets
class PKey(object):
@classmethod
def from_string(cls, key_string):
try:
pkey = paramiko.RSAKey(file_obj=StringIO(key_string))
return pkey
except paramiko.SSHException:
try:
pkey = paramiko.DSSKey(file_obj=StringIO(key_string))
return pkey
except paramiko.SSHException:
return None
def from_string(cls, key_string):
return cls(key_string=key_string).pkey
def timestamp_to_datetime_str(ts):
datetime_format = '%Y-%m-%dT%H:%M:%S.%fZ'
dt = datetime.datetime.fromtimestamp(ts, tz=pytz.timezone('UTC'))
return dt.strftime(datetime_format)
def to_dotmap(data):
"""将接受dict转换为DotMap"""
if isinstance(data, dict):
data = DotMap(data)
elif isinstance(data, list):
data = [DotMap(d) for d in data]
else:
raise ValueError('Dict or list type required...')
return data
class MultiQueue(Queue):
def mget(self, size=1, block=True, timeout=5):
items = []
for i in range(size):
try:
items.append(self.get(block=block, timeout=timeout))
except Empty:
break
return items
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment