Commit 6015f242 authored by ibuler's avatar ibuler

[Fixture] 添加PlaybookRunner

parent 01e50d59
...@@ -14,7 +14,7 @@ from ansible.executor.task_queue_manager import TaskQueueManager ...@@ -14,7 +14,7 @@ from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.inventory import Inventory, Host, Group from ansible.inventory import Inventory, Host, Group
from ansible.vars import VariableManager from ansible.vars import VariableManager
from ansible.parsing.dataloader import DataLoader from ansible.parsing.dataloader import DataLoader
from ansible.executor import playbook_executor from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.utils.display import Display from ansible.utils.display import Display
from ansible.playbook.play import Play from ansible.playbook.play import Play
from ansible.plugins.callback import CallbackBase from ansible.plugins.callback import CallbackBase
...@@ -23,6 +23,8 @@ from ansible.utils.vars import load_extra_vars ...@@ -23,6 +23,8 @@ from ansible.utils.vars import load_extra_vars
from ansible.utils.vars import load_options_vars from ansible.utils.vars import load_options_vars
from ..models import TaskRecord, AnsiblePlay, AnsibleTask, AnsibleHostResult from ..models import TaskRecord, AnsiblePlay, AnsibleTask, AnsibleHostResult
from .inventory import JMSInventory
__all__ = ["ADHocRunner", "Options"] __all__ = ["ADHocRunner", "Options"]
...@@ -35,159 +37,6 @@ class AnsibleError(StandardError): ...@@ -35,159 +37,6 @@ class AnsibleError(StandardError):
pass pass
# class Options(object):
# """Ansible运行时配置类, 用于初始化Ansible的一些默认配置.
# """
# def __init__(self, verbosity=None, inventory=None, listhosts=None, subset=None, module_paths=None, extra_vars=None,
# forks=10, ask_vault_pass=False, vault_password_files=None, new_vault_password_file=None,
# output_file=None, tags=None, skip_tags=None, one_line=None, tree=None, ask_sudo_pass=False, ask_su_pass=False,
# sudo=None, sudo_user=None, become=None, become_method=None, become_user=None, become_ask_pass=False,
# ask_pass=False, private_key_file=None, remote_user=None, connection="smart", timeout=10, ssh_common_args=None,
# sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, poll_interval=None, seconds=None, check=False,
# syntax=None, diff=None, force_handlers=None, flush_cache=None, listtasks=None, listtags=None, module_path=None):
# self.verbosity = verbosity
# self.inventory = inventory
# self.listhosts = listhosts
# self.subset = subset
# self.module_paths = module_paths
# self.extra_vars = extra_vars
# self.forks = forks
# self.ask_vault_pass = ask_vault_pass
# self.vault_password_files = vault_password_files
# self.new_vault_password_file = new_vault_password_file
# self.output_file = output_file
# self.tags = tags
# self.skip_tags = skip_tags
# self.one_line = one_line
# self.tree = tree
# self.ask_sudo_pass = ask_sudo_pass
# self.ask_su_pass = ask_su_pass
# self.sudo = sudo
# self.sudo_user = sudo_user
# self.become = become
# self.become_method = become_method
# self.become_user = become_user
# self.become_ask_pass = become_ask_pass
# self.ask_pass = ask_pass
# self.private_key_file = private_key_file
# self.remote_user = remote_user
# self.connection = connection
# self.timeout = timeout
# self.ssh_common_args = ssh_common_args
# self.sftp_extra_args = sftp_extra_args
# self.scp_extra_args = scp_extra_args
# self.ssh_extra_args = ssh_extra_args
# self.poll_interval = poll_interval
# self.seconds = seconds
# self.check = check
# self.syntax = syntax
# self.diff = diff
# self.force_handlers = force_handlers
# self.flush_cache = flush_cache
# self.listtasks = listtasks
# self.listtags = listtags
# self.module_path = module_path
# self.__overwrite_default()
#
# def __overwrite_default(self):
# """上面并不能包含Ansible所有的配置, 如果有其他的配置,
# 可以通过替换default_config模块里面的变量进行重载, 
# 比如 default_config.DEFAULT_ASK_PASS = False.
# """
# default_config.HOST_KEY_CHECKING = False
Options = namedtuple("Options", [
'connection', 'module_path', 'private_key_file', "remote_user", "timeout",
'forks', 'become', 'become_method', 'become_user', 'check', "extra_vars",
]
)
class JMSHost(Host):
def __init__(self, asset):
self.asset = asset
self.name = name = asset.get('hostname') or asset.get('ip')
self.port = port = asset.get('port') or 22
super(JMSHost, self).__init__(name, port)
self.set_all_variable()
def set_all_variable(self):
asset = self.asset
self.set_variable('ansible_host', asset['ip'])
self.set_variable('ansible_port', asset['port'])
self.set_variable('ansible_user', asset['username'])
# 添加密码和秘钥
if asset.get('password'):
self.set_variable('ansible_ssh_pass', asset['password'])
if asset.get('key'):
self.set_variable('ansible_ssh_private_key_file', asset['private_key'])
# 添加become支持
become = asset.get("become", None)
if become is not None:
self.set_variable("ansible_become", True)
self.set_variable("ansible_become_method", become.get('method'))
self.set_variable("ansible_become_user", become.get('user'))
self.set_variable("ansible_become_pass", become.get('pass'))
else:
self.set_variable("ansible_become", False)
class JMSInventory(Inventory):
"""
提供生成Ansible inventory对象的方法
"""
def __init__(self, host_list=None):
if host_list is None:
host_list = []
assert isinstance(host_list, list)
self.host_list = host_list
self.loader = DataLoader()
self.variable_manager = VariableManager()
super(JMSInventory, self).__init__(self.loader, self.variable_manager,
host_list=host_list)
def parse_inventory(self, host_list):
"""用于生成动态构建Ansible Inventory.
self.host_list: [
{"name": "asset_name",
"ip": <ip>,
"port": <port>,
"user": <user>,
"pass": <pass>,
"key": <sshKey>,
"groups": ['group1', 'group2'],
"other_host_var": <other>},
{...},
]
:return: 返回一个Ansible的inventory对象
"""
# TODO: 验证输入
# 创建Ansible Group,如果没有则创建default组
ungrouped = Group('ungrouped')
all = Group('all')
all.add_child_group(ungrouped)
self.groups = dict(all=all, ungrouped=ungrouped)
for asset in host_list:
host = JMSHost(asset=asset)
asset_groups = asset.get('groups')
if asset_groups:
for group_name in asset_groups:
if group_name not in self.groups:
group = Group(group_name)
self.groups[group_name] = group
else:
group = self.groups[group_name]
group.add_host(host)
else:
ungrouped.add_host(host)
all.add_host(host)
class BasicResultCallback(CallbackBase): class BasicResultCallback(CallbackBase):
""" """
Custom Callback Custom Callback
...@@ -218,6 +67,102 @@ class BasicResultCallback(CallbackBase): ...@@ -218,6 +67,102 @@ class BasicResultCallback(CallbackBase):
pass pass
class PlaybookCallBack(CallbackBase):
"""
Custom callback model for handlering the output data of
execute playbook file,
Base on the build-in callback plugins of ansible which named `json`.
"""
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'stdout'
CALLBACK_NAME = 'Dict'
def __init__(self, display=None):
super(PlaybookCallBack, self).__init__(display)
self.results = []
self.output = ""
self.item_results = {} # {"host": []}
def _new_play(self, play):
return {
'play': {
'name': play.name,
'id': str(play._uuid)
},
'tasks': []
}
def _new_task(self, task):
return {
'task': {
'name': task.get_name(),
},
'hosts': {}
}
def v2_playbook_on_no_hosts_matched(self):
self.output = "skipping: No match hosts."
def v2_playbook_on_no_hosts_remaining(self):
pass
def v2_playbook_on_task_start(self, task, is_conditional):
self.results[-1]['tasks'].append(self._new_task(task))
def v2_playbook_on_play_start(self, play):
self.results.append(self._new_play(play))
def v2_playbook_on_stats(self, stats):
hosts = sorted(stats.processed.keys())
summary = {}
for h in hosts:
s = stats.summarize(h)
summary[h] = s
if self.output:
pass
else:
self.output = {
'plays': self.results,
'stats': summary
}
def gather_result(self, res):
if res._task.loop and "results" in res._result and res._host.name in self.item_results:
res._result.update({"results": self.item_results[res._host.name]})
del self.item_results[res._host.name]
self.results[-1]['tasks'][-1]['hosts'][res._host.name] = res._result
def v2_runner_on_ok(self, res, **kwargs):
if "ansible_facts" in res._result:
del res._result["ansible_facts"]
self.gather_result(res)
def v2_runner_on_failed(self, res, **kwargs):
self.gather_result(res)
def v2_runner_on_unreachable(self, res, **kwargs):
self.gather_result(res)
def v2_runner_on_skipped(self, res, **kwargs):
self.gather_result(res)
def gather_item_result(self, res):
self.item_results.setdefault(res._host.name, []).append(res._result)
def v2_runner_item_on_ok(self, res):
self.gather_item_result(res)
def v2_runner_item_on_failed(self, res):
self.gather_item_result(res)
def v2_runner_item_on_skipped(self, res):
self.gather_item_result(res)
class CallbackModule(CallbackBase): class CallbackModule(CallbackBase):
"""处理和分析Ansible运行结果,并保存数据. """处理和分析Ansible运行结果,并保存数据.
""" """
...@@ -356,90 +301,110 @@ class CallbackModule(CallbackBase): ...@@ -356,90 +301,110 @@ class CallbackModule(CallbackBase):
class PlayBookRunner(object): class PlayBookRunner(object):
"""用于执行AnsiblePlaybook的接口.简化Playbook对象的使用. """用于执行AnsiblePlaybook的接口.简化Playbook对象的使用.
""" """
Options = namedtuple('Options', [
'listtags', 'listtasks', 'listhosts', 'syntax', 'connection',
'module_path', 'forks', 'remote_user', 'private_key_file', 'timeout',
'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args',
'scp_extra_args',
'become', 'become_method', 'become_user', 'verbosity', 'check',
'extra_vars'])
def __init__(self, config, palybook_path, playbook_var, def __init__(self,
become_pass, *hosts, **group_vars): hosts=None,
""" playbook_path=None,
forks=C.DEFAULT_FORKS,
:param config: Config实例 listtags=False,
:param palybook_path: playbook的路径 listtasks=False,
:param playbook_var: 执行Playbook时的变量 listhosts=False,
:param become_pass: sudo passsword syntax=False,
:param hosts: 可变位置参数, 为一个资产列表, 每一个资产用dict表示, 以下是这个dict必须包含的key module_path=None,
[{ remote_user='root',
"name": "asset_name", timeout=C.DEFAULT_TIMEOUT,
"ip": "asset_ip", ssh_common_args=None,
"port": "asset_port", ssh_extra_args=None,
"username": "asset_user", sftp_extra_args=None,
"password": "asset_pass", scp_extra_args=None,
"key": "asset_private_key", become=True,
"group": "asset_group_name", become_method=None,
... become_user="root",
}] verbosity=None,
:param group_vars: 可变关键字参数, 是资产组变量, 记录对应的资产组变量 extra_vars=None,
"groupName1": {"group_variable1": "value1",...} connection_type="ssh",
"groupName2": {"group_variable1": "value1",...} passwords=None,
""" private_key_file=None,
check=False):
self.options = config
# 设置verbosity级别, 及命令行的--verbose选项 C.RETRY_FILES_ENABLED = False
self.display = Display() self.callbackmodule = PlaybookCallBack()
self.display.verbosity = self.options.verbosity if playbook_path is None or not os.path.exists(playbook_path):
playbook_executor.verbosity = self.options.verbosity raise AnsibleError(
"Not Found the playbook file: %s." % playbook_path)
self.playbook_path = playbook_path
self.loader = DataLoader()
self.variable_manager = VariableManager()
self.passwords = passwords or {}
self.inventory = JMSInventory(hosts)
# sudo成其他用户的配置 self.options = self.Options(
self.options.become = True listtags=listtags,
self.options.become_method = 'sudo' listtasks=listtasks,
self.options.become_user = 'root' listhosts=listhosts,
passwords = {'become_pass': become_pass} syntax=syntax,
timeout=timeout,
connection=connection_type,
module_path=module_path,
forks=forks,
remote_user=remote_user,
private_key_file=private_key_file,
ssh_common_args=ssh_common_args or "",
ssh_extra_args=ssh_extra_args or "",
sftp_extra_args=sftp_extra_args,
scp_extra_args=scp_extra_args,
become=become,
become_method=become_method,
become_user=become_user,
verbosity=verbosity,
extra_vars=extra_vars or [],
check=check
)
# 传入playbook的路径,以及执行需要的变量 self.variable_manager.extra_vars = load_extra_vars(loader=self.loader,
pb_dir = os.path.dirname(__file__) options=self.options)
playbook = "%s/%s" % (pb_dir, palybook_path) self.variable_manager.options_vars = load_options_vars(self.options)
# 生成Ansible inventory, 这些变量Mixin都会用到 self.variable_manager.set_inventory(self.inventory)
self.hosts = hosts
self.group_vars = group_vars
self.loader = DataLoader()
self.variable_manager = VariableManager()
self.groups = []
self.variable_manager.extra_vars = playbook_var
self.inventory = self.gen_inventory()
# 初始化playbook的executor # 初始化playbook的executor
self.pbex = playbook_executor.PlaybookExecutor( self.runner = PlaybookExecutor(
playbooks=[playbook], playbooks=[self.playbook_path],
inventory=self.inventory, inventory=self.inventory,
variable_manager=self.variable_manager, variable_manager=self.variable_manager,
loader=self.loader, loader=self.loader,
options=self.options, options=self.options,
passwords=passwords) passwords=self.passwords)
def run(self): if self.runner._tqm:
"""执行Playbook, 记录执行日志, 处理执行结果. self.runner._tqm._stdout_callback = self.callbackmodule
:return: <AnsibleResult>对象
"""
self.pbex.run()
stats = self.pbex._tqm._stats
# 测试执行是否成功 def run(self):
run_success = True if not self.inventory.list_hosts('all'):
hosts = sorted(stats.processed.keys()) raise AnsibleError('Inventory is empty')
for h in hosts: self.runner.run()
t = stats.summarize(h) self.runner._tqm.cleanup()
if t['unreachable'] > 0 or t['failures'] > 0: return self.callbackmodule.output
run_success = False
# TODO: 记录执行日志, 处理执行结果.
return stats
class ADHocRunner(object): class ADHocRunner(object):
""" """
ADHoc接口 ADHoc接口
""" """
Options = namedtuple("Options", [
'connection', 'module_path', 'private_key_file', "remote_user",
'timeout', 'forks', 'become', 'become_method', 'become_user',
'check', 'extra_vars',
]
)
def __init__(self, def __init__(self,
hosts=C.DEFAULT_HOST_LIST, hosts=C.DEFAULT_HOST_LIST,
module_name=C.DEFAULT_MODULE_NAME, # * command module_name=C.DEFAULT_MODULE_NAME, # * command
...@@ -467,7 +432,7 @@ class ADHocRunner(object): ...@@ -467,7 +432,7 @@ class ADHocRunner(object):
self.check_module_args() self.check_module_args()
self.gather_facts = gather_facts self.gather_facts = gather_facts
self.results_callback = BasicResultCallback() self.results_callback = BasicResultCallback()
self.options = Options( self.options = self.Options(
connection=connection_type, connection=connection_type,
timeout=timeout, timeout=timeout,
module_path=module_path, module_path=module_path,
...@@ -491,10 +456,13 @@ class ADHocRunner(object): ...@@ -491,10 +456,13 @@ class ADHocRunner(object):
name='Ansible Ad-hoc', name='Ansible Ad-hoc',
hosts=self.pattern, hosts=self.pattern,
gather_facts=self.gather_facts, gather_facts=self.gather_facts,
tasks=[dict(action=dict( tasks=[
dict(action=dict(
module=self.module_name, module=self.module_name,
args=self.module_args args=self.module_args,
))] )
)
]
) )
self.play = Play().load( self.play = Play().load(
......
# ~*~ coding: utf-8 ~*~
from ansible.inventory import Inventory, Host, Group
from ansible.vars import VariableManager
from ansible.parsing.dataloader import DataLoader
class JMSHost(Host):
def __init__(self, asset):
self.asset = asset
self.name = name = asset.get('hostname') or asset.get('ip')
self.port = port = asset.get('port') or 22
super(JMSHost, self).__init__(name, port)
self.set_all_variable()
def set_all_variable(self):
asset = self.asset
self.set_variable('ansible_host', asset['ip'])
self.set_variable('ansible_port', asset['port'])
self.set_variable('ansible_user', asset['username'])
# 添加密码和秘钥
if asset.get('password'):
self.set_variable('ansible_ssh_pass', asset['password'])
if asset.get('key'):
self.set_variable('ansible_ssh_private_key_file', asset['private_key'])
# 添加become支持
become = asset.get("become", None)
if become is not None:
self.set_variable("ansible_become", True)
self.set_variable("ansible_become_method", become.get('method'))
self.set_variable("ansible_become_user", become.get('user'))
self.set_variable("ansible_become_pass", become.get('pass'))
else:
self.set_variable("ansible_become", False)
class JMSInventory(Inventory):
"""
提供生成Ansible inventory对象的方法
"""
def __init__(self, host_list=None):
if host_list is None:
host_list = []
assert isinstance(host_list, list)
self.host_list = host_list
self.loader = DataLoader()
self.variable_manager = VariableManager()
super(JMSInventory, self).__init__(self.loader, self.variable_manager,
host_list=host_list)
def parse_inventory(self, host_list):
"""用于生成动态构建Ansible Inventory.
self.host_list: [
{"name": "asset_name",
"ip": <ip>,
"port": <port>,
"user": <user>,
"pass": <pass>,
"key": <sshKey>,
"groups": ['group1', 'group2'],
"other_host_var": <other>},
{...},
]
:return: 返回一个Ansible的inventory对象
"""
# TODO: 验证输入
# 创建Ansible Group,如果没有则创建default组
ungrouped = Group('ungrouped')
all = Group('all')
all.add_child_group(ungrouped)
self.groups = dict(all=all, ungrouped=ungrouped)
for asset in host_list:
host = JMSHost(asset=asset)
asset_groups = asset.get('groups')
if asset_groups:
for group_name in asset_groups:
if group_name not in self.groups:
group = Group(group_name)
self.groups[group_name] = group
else:
group = self.groups[group_name]
group.add_host(host)
else:
ungrouped.add_host(host)
all.add_host(host)
# ~*~ coding: utf-8 ~*~
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment