• 老广's avatar
    Dev2 (#1766) · fe45d839
    老广 authored
    * [Update] 初始化操作日志
    
    * [Feature] 完成操作日志记录
    
    * [Update] 修改mfa失败提示
    
    * [Update] 修改增加created by内容
    
    * [Update] 增加改密日志
    
    * [Update] 登录日志迁移到日志审计中
    
    * [Update] change block user logic, if login success, clean block limit
    
    *  [Update] 更新中/英文翻译(ALL) (#1662)
    
    * Revert "授权页面分页问题"
    
    * 增加命令导出 (#1566)
    
    * [Update] gunicorn不使用eventlet
    
    * [Update] 添加eventlet
    
    * 替换淘宝IP查询接口
    
    * [Feature] 添加命令记录下载功能 (#1559)
    
    * [Feature] 添加命令记录下载功能
    
    * [Update] 文案修改,导出记录、提交,取消全部命令导出
    
    * [Update] 命令导出,修复时间问题
    
    * [Update] paramiko => 2.4.1
    
    * [Update] 修改settings
    
    * [Update] 修改权限判断
    
    * Dev (#1646)
    
    * [Update] 添加org
    
    * [Update] 修改url
    
    * [Update] 完成基本框架
    
    * [Update] 修改一些逻辑
    
    * [Update] 修改用户view
    
    * [Update] 修改资产
    
    * [Update] 修改asset api
    
    * [Update] 修改协议小问题
    
    * [Update] stash it
    
    * [Update] 修改约束
    
    * [Update] 修改外键为org_id
    
    * [Update] 删掉Premiddleware
    
    * [Update] 修改Node
    
    * [Update] 修改get_current_org 为 proxy对象 current_org
    
    * [Bugfix] 解决Node.root() 死循环,移动AdminRequired到permission中 (#1571)
    
    * [Update] 修改permission (#1574)
    
    * Tmp org (#1579)
    
    * [Update] 添加org api, 升级到django 2.0
    
    * [Update] fix some bug
    
    * [Update] 修改一些bug
    
    * [Update] 添加授权规则org (#1580)
    
    * [Update] 修复创建授权规则,显示org_name不是有效UUID的bug
    
    * [Update] 更新org之间隔离授权规则,解决QuerySet与Manager问题;修复创建用户,显示org_name不是有效UUID之bug;
    
    * Tmp org (#1583)
    
    * [Update] 修改一些内容
    
    * [Update] 修改datatable 支持process
    
    * [Bugfix] 修复asset queryset 没有valid方法的bug
    
    * [Update] 在线/历史/命令model添加org;修复命令记录保存org失败bug (#1584)
    
    * [Update] 修复创建授权规则,显示org_name不是有效UUID的bug
    
    * [Update] 更新org之间隔离授权规则,解决QuerySet与Manager问题;修复创建用户,显示org_name不是有效UUID之bug;
    
    * [Update] 在线/历史/命令model添加org
    
    * [Bugfix] 修复命令记录,保存org不成功bug
    
    * [Update] Org功能修改
    
    * [Bugfix] 修复merge带来的问题
    
    * [Update] org admin显示资产详情右侧选项卡;修复资产授权添加用户,会显示其他org用户的bug (#1594)
    
    * [Bugfix] 修复资产授权添加用户,显示其他org的用户bug
    
    * [Update] org admin 显示资产详情右侧选项卡
    
    * Tmp org (#1596)
    
    * [Update] 修改index view
    
    * [Update] 修改nav
    
    * [Update] 修改profile
    
    * [Bugfix] 修复org下普通用户打开web终端看不到已被授权的资产和节点bug
    
    * [Update] 修改get_all_assets
    
    * [Bugfix] 修复节点前面有个空目录
    
    * [Bugfix] 修复merge引起的bug
    
    * [Update] Add init
    
    * [Update] Node get_all_assets 过滤游离资产,条件nodes_key=None -> nodes=None
    
    * [Update] 恢复原来的api地址
    
    * [Update] 修改api
    
    * [Bugfix] 修复org下用户查看我的资产不显示已授权节点/资产的bug
    
    * [Bugfix] Fix perm name unique
    
    * [Bugfix] 修复校验失败api
    
    * [Update] Merge with org
    
    * [Merge] 修改一下bug
    
    * [Update] 暂时修改一些url
    
    * [Update] 修改url 为django 2.0 path
    
    * [Update] 优化datatable 和显示组织优化
    
    * [Update] 升级url
    
    * [Bugfix] 修复coco启动失败(load_config_from_server)、硬件刷新,测试连接,str 没有 decode(… (#1613)
    
    * [Bugfix] 修复coco启动失败(load_config_from_server)、硬件刷新,测试连接,str 没有 decode() method的bug
    
    * [Bugfix] (task任务系统)修复资产连接性测试、硬件刷新和系统用户连接性测试失败等bug
    
    * [Bugfix] 修复一些bug
    
    * [Bugfix] 修复一些bug
    
    *  [Update] 更新org下普通用户的资产详情 (#1619)
    
    * [Update] 更新org下普通用户查看资产详情,只显示数据
    
    * [Update] 优化org下普通用户查看资产详情前端代码
    
    * [Update] 创建/更新用户的role选项;密码强度提示信息中英文; (#1623)
    
    * [Update] 修改 超级管理员/组织管理员 在 创建/更新 用户时role的选项 问题
    
    * [Update] 用户密码强度提示信息支持中英文
    
    * [Update] 修改token返回
    
    * [Update] Asset返回org name
    
    * [Update] 修改支持xpack
    
    * [Update] 修改url
    
    * [Bugfix] 修复不登录就能查看资产的bug
    
    * [Update] 用户修改
    
    * [Bugfix] ...
    
    * [Bugfix] 修复跳转错误的问题
    
    *  [Update] xpack/orgs组织添加删除功能-js; 修复Label继承Org后bug; (#1644)
    
    * [Update] 更新xpack下orgs的翻译信息
    
    * [Update] 更新model Label,继承OrgModelMixin;
    
    * [Update] xpack/orgs组织添加删除功能-js; 修复Label继承Org后bug;
    
    * [Bugfix] 修复小bug
    
    * [Update] 优化一些api
    
    * [Update] 优化用户资产页面
    
    * [Update] 更新 xpack/orgs 删除功能:限制在当前org下删除当前org (#1645)
    
    * [Update] 修改版本号
    
    * [Update] 添加功能: 语言切换(中/英);修改 header_bar <商业支持、文档>显示方式
    
    * [Update] 中/英切换文案修改;修改django_language key 从 settings 中获取
    
    * [Update] 修改Dashboard页面文案,支持英文
    
    * [Update] 更新中/英文翻译(ALL)
    
    * [Update] 解决翻译文件冲突
    
    * [Update] 系统用户支持单独隋松
    
    * [Update] 重置用户MFA
    
    * [Update] 设置session空闲时间
    
    * [Update] 加密setting配置
    
    * [Update] 修改单独推送和测试资产可连接性
    
    *  [Update] 添加功能:用户个人详情页添加 更改MFA操作 (#1748)
    
    * [Update] 添加功能:用户个人详情页添加 更改MFA操作
    
    * [Update] 删除print
    
    * [Bugfix] 添加部分views的权限控制;从组织移除用户,同时从授权规则和用户组中移除此用户。 (#1746)
    
    * [Bugfix] 修复上传command log 为空
    
    * [Update] 修复执行任务的bug
    
    * [Bugfix] 修复将用户从组内移除,其依然具有之前的组权限的bug, perms and user_groups
    
    * [Bugfix] 修复组管理员可以访问部分url-views的bug(如: /settings/)添加views权限控制
    
    * [Update] 修改日志滚动
    
    * [Bugfix] 修复组织权限控制的bug (#1763)
    
    * [Bugfix] 修复将用户从组内移除,其依然具有之前的组权限的bug, perms and user_groups
    
    * [Bugfix] 修复组管理员可以访问部分url-views的bug(如: /settings/)添加views权限控制
    fe45d839
tasks.py 14.9 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
# ~*~ coding: utf-8 ~*~
import json
import re
import os

from celery import shared_task
from django.core.cache import cache
from django.utils.translation import ugettext as _

from common.utils import get_object_or_none, capacity_convert, \
    sum_capacity, encrypt_password, get_logger
from ops.celery.utils import register_as_period_task, after_app_shutdown_clean, \
    after_app_ready_start
from ops.celery import app as celery_app

from .models import SystemUser, AdminUser, Asset
from . import const


FORKS = 10
TIMEOUT = 60
logger = get_logger(__file__)
CACHE_MAX_TIME = 60*60*60
disk_pattern = re.compile(r'^hd|sd|xvd|vd')
PERIOD_TASK = os.environ.get("PERIOD_TASK", "off")


@shared_task
def set_assets_hardware_info(result, **kwargs):
    """
    Using ops task run result, to update asset info

    @shared_task must be exit, because we using it as a task callback, is must
    be a celery task also
    :param result:
    :param kwargs: {task_name: ""}
    :return:
    """
    result_raw = result[0]
    assets_updated = []
    for hostname, info in result_raw.get('ok', {}).items():
        info = info.get('setup', {}).get('ansible_facts', {})
        if not info:
            logger.error("Get asset info failed: {}".format(hostname))
            continue

        asset = Asset.objects.get_object_by_fullname(hostname)
        if not asset:
            continue

        ___vendor = info.get('ansible_system_vendor', 'Unknown')
        ___model = info.get('ansible_product_name', 'Unknown')
        ___sn = info.get('ansible_product_serial', 'Unknown')

        for ___cpu_model in info.get('ansible_processor', []):
            if ___cpu_model.endswith('GHz') or ___cpu_model.startswith("Intel"):
                break
        else:
            ___cpu_model = 'Unknown'
        ___cpu_model = ___cpu_model[:64]
        ___cpu_count = info.get('ansible_processor_count', 0)
        ___cpu_cores = info.get('ansible_processor_cores', None) or len(info.get('ansible_processor', []))
        ___cpu_vcpus = info.get('ansible_processor_vcpus', 0)
        ___memory = '%s %s' % capacity_convert('{} MB'.format(info.get('ansible_memtotal_mb')))
        disk_info = {}
        for dev, dev_info in info.get('ansible_devices', {}).items():
            if disk_pattern.match(dev) and dev_info['removable'] == '0':
                disk_info[dev] = dev_info['size']
        ___disk_total = '%s %s' % sum_capacity(disk_info.values())
        ___disk_info = json.dumps(disk_info)

        ___platform = info.get('ansible_system', 'Unknown')
        ___os = info.get('ansible_distribution', 'Unknown')
        ___os_version = info.get('ansible_distribution_version', 'Unknown')
        ___os_arch = info.get('ansible_architecture', 'Unknown')
        ___hostname_raw = info.get('ansible_hostname', 'Unknown')

        for k, v in locals().items():
            if k.startswith('___'):
                setattr(asset, k.strip('_'), v)
        asset.save()
        assets_updated.append(asset)
    return assets_updated


@shared_task
def update_assets_hardware_info_util(assets, task_name=None):
    """
    Using ansible api to update asset hardware info
    :param assets:  asset seq
    :param task_name: task_name running
    :return: result summary ['contacted': {}, 'dark': {}]
    """
    from ops.utils import update_or_create_ansible_task
    if task_name is None:
        task_name = _("Update some assets hardware info")
        # task_name = _("更新资产硬件信息")
    tasks = const.UPDATE_ASSETS_HARDWARE_TASKS
    hostname_list = [asset.fullname for asset in assets if asset.is_active and asset.is_unixlike()]
    if not hostname_list:
        logger.info("Not hosts get, may be asset is not active or not unixlike platform")
        return {}
    task, created = update_or_create_ansible_task(
        task_name, hosts=hostname_list, tasks=tasks, pattern='all',
        options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
    )
    result = task.run()
    # Todo: may be somewhere using
    # Manual run callback function
    set_assets_hardware_info(result)
    return result


@shared_task
def update_asset_hardware_info_manual(asset):
    task_name = _("Update asset hardware info")
    # task_name = _("更新资产硬件信息")
    return update_assets_hardware_info_util([asset], task_name=task_name)


@celery_app.task
@register_as_period_task(interval=3600)
@after_app_ready_start
@after_app_shutdown_clean
def update_assets_hardware_info_period():
    """
    Update asset hardware period task
    :return:
    """
    if PERIOD_TASK != "on":
        logger.debug("Period task disabled, update assets hardware info pass")
        return

    from ops.utils import update_or_create_ansible_task
    task_name = _("Update assets hardware info period")
    # task_name = _("定期更新资产硬件信息")
    hostname_list = [
        asset.fullname for asset in Asset.objects.all()
        if asset.is_active and asset.is_unixlike()
    ]
    tasks = const.UPDATE_ASSETS_HARDWARE_TASKS

    # Only create, schedule by celery beat
    update_or_create_ansible_task(
        task_name, hosts=hostname_list, tasks=tasks, pattern='all',
        options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
        interval=60*60*24, is_periodic=True, callback=set_assets_hardware_info.name,
    )


##  ADMIN USER CONNECTIVE  ##

@shared_task
def set_admin_user_connectability_info(result, **kwargs):
    admin_user = kwargs.get("admin_user")
    task_name = kwargs.get("task_name")
    if admin_user is None and task_name is not None:
        admin_user = task_name.split(":")[-1]

    raw, summary = result
    cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user)
    cache.set(cache_key, summary, CACHE_MAX_TIME)

    for i in summary.get('contacted', []):
        asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i)
        cache.set(asset_conn_cache_key, 1, CACHE_MAX_TIME)

    for i, msg in summary.get('dark', {}).items():
        asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i)
        cache.set(asset_conn_cache_key, 0, CACHE_MAX_TIME)
        logger.error(msg)


@shared_task
def test_admin_user_connectability_util(admin_user, task_name):
    """
    Test asset admin user can connect or not. Using ansible api do that
    :param admin_user:
    :param task_name:
    :return:
    """
    from ops.utils import update_or_create_ansible_task

    assets = admin_user.get_related_assets()
    hosts = [asset.fullname for asset in assets
             if asset.is_active and asset.is_unixlike()]
    if not hosts:
        return
    tasks = const.TEST_ADMIN_USER_CONN_TASKS
    task, created = update_or_create_ansible_task(
        task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
        options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
    )
    result = task.run()
    set_admin_user_connectability_info(result, admin_user=admin_user.name)
    return result


@celery_app.task
@register_as_period_task(interval=3600)
@after_app_ready_start
@after_app_shutdown_clean
def test_admin_user_connectability_period():
    """
    A period task that update the ansible task period
    """
    if PERIOD_TASK != "on":
        logger.debug("Period task disabled, test admin user connectability pass")
        return

    admin_users = AdminUser.objects.all()
    for admin_user in admin_users:
        task_name = _("Test admin user connectability period: {}".format(admin_user.name))
        # task_name = _("定期测试管理账号可连接性: {}".format(admin_user.name))
        test_admin_user_connectability_util(admin_user, task_name)


@shared_task
def test_admin_user_connectability_manual(admin_user):
    task_name = _("Test admin user connectability: {}").format(admin_user.name)
    # task_name = _("测试管理行号可连接性: {}").format(admin_user.name)
    return test_admin_user_connectability_util(admin_user, task_name)


@shared_task
def test_asset_connectability_util(assets, task_name=None):
    from ops.utils import update_or_create_ansible_task

    if task_name is None:
        task_name = _("Test assets connectability")
        # task_name = _("测试资产可连接性")
    hosts = [asset.fullname for asset in assets if asset.is_active and asset.is_unixlike()]
    if not hosts:
        logger.info("No hosts, passed")
        return {}
    tasks = const.TEST_ADMIN_USER_CONN_TASKS
    task, created = update_or_create_ansible_task(
        task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
        options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
    )
    result = task.run()
    summary = result[1]
    for k in summary.get('dark'):
        cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(k), 0, CACHE_MAX_TIME)

    for k in summary.get('contacted'):
        cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(k), 1, CACHE_MAX_TIME)
    return summary


@shared_task
def test_asset_connectability_manual(asset):
    summary = test_asset_connectability_util([asset])

    if summary.get('dark'):
        return False, summary['dark']
    else:
        return True, ""


##  System user connective ##

@shared_task
def set_system_user_connectablity_info(result, **kwargs):
    summary = result[1]
    task_name = kwargs.get("task_name")
    system_user = kwargs.get("system_user")
    if system_user is None:
        system_user = task_name.split(":")[-1]
    cache_key = const.SYSTEM_USER_CONN_CACHE_KEY.format(system_user)
    cache.set(cache_key, summary, CACHE_MAX_TIME)


@shared_task
def test_system_user_connectability_util(system_user, assets, task_name):
    """
    Test system cant connect his assets or not.
    :param system_user:
    :param assets:
    :param task_name:
    :return:
    """
    from ops.utils import update_or_create_ansible_task
    # assets = system_user.get_assets()
    hosts = [asset.fullname for asset in assets if asset.is_active and asset.is_unixlike()]
    tasks = const.TEST_SYSTEM_USER_CONN_TASKS
    if not hosts:
        logger.info("No hosts, passed")
        return {}
    task, created = update_or_create_ansible_task(
        task_name, hosts=hosts, tasks=tasks, pattern='all',
        options=const.TASK_OPTIONS,
        run_as=system_user.name, created_by="System",
    )
    result = task.run()
    set_system_user_connectablity_info(result, system_user=system_user.name)
    return result


@shared_task
def test_system_user_connectability_manual(system_user):
    task_name = _("Test system user connectability: {}").format(system_user)
    assets = system_user.get_assets()
    return test_system_user_connectability_util(system_user, assets, task_name)


@shared_task
def test_system_user_connectability_a_asset(system_user, asset):
    task_name = _("Test system user connectability: {} => {}").format(
        system_user, asset
    )
    return test_system_user_connectability_util(system_user, [asset], task_name)


@shared_task
@register_as_period_task(interval=3600)
@after_app_ready_start
@after_app_shutdown_clean
def test_system_user_connectability_period():
    if PERIOD_TASK != "on":
        logger.debug("Period task disabled, test system user connectability pass")
        return

    system_users = SystemUser.objects.all()
    for system_user in system_users:
        task_name = _("Test system user connectability period: {}".format(system_user))
        # task_name = _("定期测试系统用户可连接性: {}".format(system_user))
        test_system_user_connectability_util(system_user, task_name)


####  Push system user tasks ####

def get_push_system_user_tasks(system_user):
    # Set root as system user is dangerous
    if system_user.username == "root":
        return []

    tasks = []
    if system_user.password:
        tasks.append({
            'name': 'Add user {}'.format(system_user.username),
            'action': {
                'module': 'user',
                'args': 'name={} shell={} state=present password={}'.format(
                    system_user.username, system_user.shell,
                    encrypt_password(system_user.password, salt="K3mIlKK"),
                ),
            }
        })
    if system_user.public_key:
        tasks.append({
            'name': 'Set {} authorized key'.format(system_user.username),
            'action': {
                'module': 'authorized_key',
                'args': "user={} state=present key='{}'".format(
                    system_user.username, system_user.public_key
                )
            }
        })
    if system_user.sudo:
        tasks.append({
            'name': 'Set {} sudo setting'.format(system_user.username),
            'action': {
                'module': 'lineinfile',
                'args': "dest=/etc/sudoers state=present regexp='^{0} ALL=' "
                        "line='{0} ALL=(ALL) NOPASSWD: {1}' "
                        "validate='visudo -cf %s'".format(
                    system_user.username,
                    system_user.sudo,
                )
            }
        })
    return tasks


@shared_task
def push_system_user_util(system_users, assets, task_name):
    from ops.utils import update_or_create_ansible_task
    tasks = []
    for system_user in system_users:
        if not system_user.is_need_push():
            msg = "push system user `{}` passed, may be not auto push or ssh " \
                  "protocol is not ssh".format(system_user.name)
            logger.info(msg)
            continue
        tasks.extend(get_push_system_user_tasks(system_user))

    if not tasks:
        logger.info("Not tasks, passed")
        return {}

    hosts = [asset.fullname for asset in assets if asset.is_active and asset.is_unixlike()]
    if not hosts:
        logger.info("Not hosts, passed")
        return {}
    task, created = update_or_create_ansible_task(
        task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
        options=const.TASK_OPTIONS, run_as_admin=True, created_by='System'
    )
    return task.run()


@shared_task
def push_system_user_to_assets_manual(system_user):
    assets = system_user.get_assets()
    # task_name = "推送系统用户到入资产: {}".format(system_user.name)
    task_name = _("Push system users to assets: {}").format(system_user.name)
    return push_system_user_util([system_user], assets, task_name=task_name)


@shared_task
def push_system_user_a_asset_manual(system_user, asset):
    task_name = _("Push system users to asset: {} => {}").format(
        system_user.name, asset.fullname
    )
    return push_system_user_util([system_user], [asset], task_name=task_name)


@shared_task
def push_system_user_to_assets(system_user, assets):
    # task_name = _("推送系统用户到入资产: {}").format(system_user.name)
    task_name = _("Push system users to assets: {}").format(system_user.name)
    return push_system_user_util.delay([system_user], assets, task_name)


# @shared_task
# @register_as_period_task(interval=3600)
# @after_app_ready_start
# # @after_app_shutdown_clean
# def push_system_user_period():
#     for system_user in SystemUser.objects.all():
#         push_system_user_related_nodes(system_user)