Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
J
jumpserver
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ops
jumpserver
Commits
ba288396
Commit
ba288396
authored
Aug 01, 2019
by
ibuler
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[Update] 修改 jms脚本
parent
448b7bf1
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
247 additions
and
119 deletions
+247
-119
tasks.py
apps/assets/tasks.py
+18
-18
settings.py
apps/jumpserver/settings.py
+4
-4
__init__.py
apps/ops/celery/__init__.py
+9
-0
tasks.py
apps/ops/tasks.py
+2
-2
jms
jms
+213
-95
requirements.txt
requirements/requirements.txt
+1
-0
No files found.
apps/assets/tasks.py
View file @
ba288396
...
@@ -62,7 +62,7 @@ def clean_hosts_by_protocol(system_user, assets):
...
@@ -62,7 +62,7 @@ def clean_hosts_by_protocol(system_user, assets):
return
hosts
return
hosts
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
set_assets_hardware_info
(
assets
,
result
,
**
kwargs
):
def
set_assets_hardware_info
(
assets
,
result
,
**
kwargs
):
"""
"""
Using ops task run result, to update asset info
Using ops task run result, to update asset info
...
@@ -148,7 +148,7 @@ def update_assets_hardware_info_util(assets, task_name=None):
...
@@ -148,7 +148,7 @@ def update_assets_hardware_info_util(assets, task_name=None):
return
result
return
result
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
update_asset_hardware_info_manual
(
asset
):
def
update_asset_hardware_info_manual
(
asset
):
task_name
=
_
(
"Update asset hardware info: {}"
)
.
format
(
asset
.
hostname
)
task_name
=
_
(
"Update asset hardware info: {}"
)
.
format
(
asset
.
hostname
)
update_assets_hardware_info_util
(
update_assets_hardware_info_util
(
...
@@ -156,7 +156,7 @@ def update_asset_hardware_info_manual(asset):
...
@@ -156,7 +156,7 @@ def update_asset_hardware_info_manual(asset):
)
)
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
update_assets_hardware_info_period
():
def
update_assets_hardware_info_period
():
"""
"""
Update asset hardware period task
Update asset hardware period task
...
@@ -170,7 +170,7 @@ def update_assets_hardware_info_period():
...
@@ -170,7 +170,7 @@ def update_assets_hardware_info_period():
## ADMIN USER CONNECTIVE ##
## ADMIN USER CONNECTIVE ##
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_asset_connectivity_util
(
assets
,
task_name
=
None
):
def
test_asset_connectivity_util
(
assets
,
task_name
=
None
):
from
ops.utils
import
update_or_create_ansible_task
from
ops.utils
import
update_or_create_ansible_task
...
@@ -227,7 +227,7 @@ def test_asset_connectivity_util(assets, task_name=None):
...
@@ -227,7 +227,7 @@ def test_asset_connectivity_util(assets, task_name=None):
return
results_summary
return
results_summary
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_asset_connectivity_manual
(
asset
):
def
test_asset_connectivity_manual
(
asset
):
task_name
=
_
(
"Test assets connectivity: {}"
)
.
format
(
asset
)
task_name
=
_
(
"Test assets connectivity: {}"
)
.
format
(
asset
)
summary
=
test_asset_connectivity_util
([
asset
],
task_name
=
task_name
)
summary
=
test_asset_connectivity_util
([
asset
],
task_name
=
task_name
)
...
@@ -238,7 +238,7 @@ def test_asset_connectivity_manual(asset):
...
@@ -238,7 +238,7 @@ def test_asset_connectivity_manual(asset):
return
True
,
""
return
True
,
""
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_admin_user_connectivity_util
(
admin_user
,
task_name
):
def
test_admin_user_connectivity_util
(
admin_user
,
task_name
):
"""
"""
Test asset admin user can connect or not. Using ansible api do that
Test asset admin user can connect or not. Using ansible api do that
...
@@ -254,7 +254,7 @@ def test_admin_user_connectivity_util(admin_user, task_name):
...
@@ -254,7 +254,7 @@ def test_admin_user_connectivity_util(admin_user, task_name):
return
summary
return
summary
@shared_task
@shared_task
(
queue
=
"ansible"
)
@register_as_period_task
(
interval
=
3600
)
@register_as_period_task
(
interval
=
3600
)
def
test_admin_user_connectivity_period
():
def
test_admin_user_connectivity_period
():
"""
"""
...
@@ -276,7 +276,7 @@ def test_admin_user_connectivity_period():
...
@@ -276,7 +276,7 @@ def test_admin_user_connectivity_period():
cache
.
set
(
key
,
1
,
60
*
40
)
cache
.
set
(
key
,
1
,
60
*
40
)
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_admin_user_connectivity_manual
(
admin_user
):
def
test_admin_user_connectivity_manual
(
admin_user
):
task_name
=
_
(
"Test admin user connectivity: {}"
)
.
format
(
admin_user
.
name
)
task_name
=
_
(
"Test admin user connectivity: {}"
)
.
format
(
admin_user
.
name
)
test_admin_user_connectivity_util
(
admin_user
,
task_name
)
test_admin_user_connectivity_util
(
admin_user
,
task_name
)
...
@@ -286,7 +286,7 @@ def test_admin_user_connectivity_manual(admin_user):
...
@@ -286,7 +286,7 @@ def test_admin_user_connectivity_manual(admin_user):
## System user connective ##
## System user connective ##
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_system_user_connectivity_util
(
system_user
,
assets
,
task_name
):
def
test_system_user_connectivity_util
(
system_user
,
assets
,
task_name
):
"""
"""
Test system cant connect his assets or not.
Test system cant connect his assets or not.
...
@@ -344,14 +344,14 @@ def test_system_user_connectivity_util(system_user, assets, task_name):
...
@@ -344,14 +344,14 @@ def test_system_user_connectivity_util(system_user, assets, task_name):
return
results_summary
return
results_summary
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_system_user_connectivity_manual
(
system_user
):
def
test_system_user_connectivity_manual
(
system_user
):
task_name
=
_
(
"Test system user connectivity: {}"
)
.
format
(
system_user
)
task_name
=
_
(
"Test system user connectivity: {}"
)
.
format
(
system_user
)
assets
=
system_user
.
get_all_assets
()
assets
=
system_user
.
get_all_assets
()
return
test_system_user_connectivity_util
(
system_user
,
assets
,
task_name
)
return
test_system_user_connectivity_util
(
system_user
,
assets
,
task_name
)
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_system_user_connectivity_a_asset
(
system_user
,
asset
):
def
test_system_user_connectivity_a_asset
(
system_user
,
asset
):
task_name
=
_
(
"Test system user connectivity: {} => {}"
)
.
format
(
task_name
=
_
(
"Test system user connectivity: {} => {}"
)
.
format
(
system_user
,
asset
system_user
,
asset
...
@@ -359,7 +359,7 @@ def test_system_user_connectivity_a_asset(system_user, asset):
...
@@ -359,7 +359,7 @@ def test_system_user_connectivity_a_asset(system_user, asset):
return
test_system_user_connectivity_util
(
system_user
,
[
asset
],
task_name
)
return
test_system_user_connectivity_util
(
system_user
,
[
asset
],
task_name
)
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_system_user_connectivity_period
():
def
test_system_user_connectivity_period
():
if
PERIOD_TASK
!=
"on"
:
if
PERIOD_TASK
!=
"on"
:
logger
.
debug
(
"Period task disabled, test system user connectivity pass"
)
logger
.
debug
(
"Period task disabled, test system user connectivity pass"
)
...
@@ -483,7 +483,7 @@ def get_push_system_user_tasks(host, system_user):
...
@@ -483,7 +483,7 @@ def get_push_system_user_tasks(host, system_user):
return
tasks
return
tasks
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
push_system_user_util
(
system_user
,
assets
,
task_name
):
def
push_system_user_util
(
system_user
,
assets
,
task_name
):
from
ops.utils
import
update_or_create_ansible_task
from
ops.utils
import
update_or_create_ansible_task
if
not
system_user
.
is_need_push
():
if
not
system_user
.
is_need_push
():
...
@@ -519,14 +519,14 @@ def push_system_user_util(system_user, assets, task_name):
...
@@ -519,14 +519,14 @@ def push_system_user_util(system_user, assets, task_name):
task
.
run
()
task
.
run
()
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
push_system_user_to_assets_manual
(
system_user
):
def
push_system_user_to_assets_manual
(
system_user
):
assets
=
system_user
.
get_all_assets
()
assets
=
system_user
.
get_all_assets
()
task_name
=
_
(
"Push system users to assets: {}"
)
.
format
(
system_user
.
name
)
task_name
=
_
(
"Push system users to assets: {}"
)
.
format
(
system_user
.
name
)
return
push_system_user_util
(
system_user
,
assets
,
task_name
=
task_name
)
return
push_system_user_util
(
system_user
,
assets
,
task_name
=
task_name
)
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
push_system_user_a_asset_manual
(
system_user
,
asset
):
def
push_system_user_a_asset_manual
(
system_user
,
asset
):
task_name
=
_
(
"Push system users to asset: {} => {}"
)
.
format
(
task_name
=
_
(
"Push system users to asset: {} => {}"
)
.
format
(
system_user
.
name
,
asset
system_user
.
name
,
asset
...
@@ -534,7 +534,7 @@ def push_system_user_a_asset_manual(system_user, asset):
...
@@ -534,7 +534,7 @@ def push_system_user_a_asset_manual(system_user, asset):
return
push_system_user_util
(
system_user
,
[
asset
],
task_name
=
task_name
)
return
push_system_user_util
(
system_user
,
[
asset
],
task_name
=
task_name
)
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
push_system_user_to_assets
(
system_user
,
assets
):
def
push_system_user_to_assets
(
system_user
,
assets
):
task_name
=
_
(
"Push system users to assets: {}"
)
.
format
(
system_user
.
name
)
task_name
=
_
(
"Push system users to assets: {}"
)
.
format
(
system_user
.
name
)
return
push_system_user_util
(
system_user
,
assets
,
task_name
)
return
push_system_user_util
(
system_user
,
assets
,
task_name
)
...
@@ -569,7 +569,7 @@ def get_test_asset_user_connectivity_tasks(asset):
...
@@ -569,7 +569,7 @@ def get_test_asset_user_connectivity_tasks(asset):
return
tasks
return
tasks
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_asset_user_connectivity_util
(
asset_user
,
task_name
,
run_as_admin
=
False
):
def
test_asset_user_connectivity_util
(
asset_user
,
task_name
,
run_as_admin
=
False
):
"""
"""
:param asset_user: <AuthBook>对象
:param asset_user: <AuthBook>对象
...
@@ -602,7 +602,7 @@ def test_asset_user_connectivity_util(asset_user, task_name, run_as_admin=False)
...
@@ -602,7 +602,7 @@ def test_asset_user_connectivity_util(asset_user, task_name, run_as_admin=False)
asset_user
.
set_connectivity
(
summary
)
asset_user
.
set_connectivity
(
summary
)
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
test_asset_users_connectivity_manual
(
asset_users
,
run_as_admin
=
False
):
def
test_asset_users_connectivity_manual
(
asset_users
,
run_as_admin
=
False
):
"""
"""
:param asset_users: <AuthBook>对象
:param asset_users: <AuthBook>对象
...
...
apps/jumpserver/settings.py
View file @
ba288396
...
@@ -297,10 +297,10 @@ LOGGING = {
...
@@ -297,10 +297,10 @@ LOGGING = {
'handlers'
:
[
'console'
,
'file'
],
'handlers'
:
[
'console'
,
'file'
],
'level'
:
"INFO"
,
'level'
:
"INFO"
,
},
},
'gunicorn'
:
{
#
'gunicorn': {
'handlers'
:
[
'gunicorn_console'
,
'gunicorn_file'
],
#
'handlers': ['gunicorn_console', 'gunicorn_file'],
'level'
:
'INFO'
,
#
'level': 'INFO',
},
#
},
# 'django.db': {
# 'django.db': {
# 'handlers': ['console', 'file'],
# 'handlers': ['console', 'file'],
# 'level': 'DEBUG'
# 'level': 'DEBUG'
...
...
apps/ops/celery/__init__.py
View file @
ba288396
...
@@ -2,6 +2,7 @@
...
@@ -2,6 +2,7 @@
import
os
import
os
from
kombu
import
Exchange
,
Queue
from
celery
import
Celery
from
celery
import
Celery
# set the default Django settings module for the 'celery' program.
# set the default Django settings module for the 'celery' program.
...
@@ -15,6 +16,14 @@ configs = {k: v for k, v in settings.__dict__.items() if k.startswith('CELERY')}
...
@@ -15,6 +16,14 @@ configs = {k: v for k, v in settings.__dict__.items() if k.startswith('CELERY')}
# Using a string here means the worker will not have to
# Using a string here means the worker will not have to
# pickle the object when using Windows.
# pickle the object when using Windows.
# app.config_from_object('django.conf:settings', namespace='CELERY')
# app.config_from_object('django.conf:settings', namespace='CELERY')
configs
[
"CELERY_QUEUES"
]
=
[
Queue
(
"celery"
,
Exchange
(
"celery"
),
routing_key
=
"celery"
),
Queue
(
"ansible"
,
Exchange
(
"ansible"
),
routing_key
=
"ansible"
),
]
configs
[
"CELERY_ROUTES"
]
=
{
"ops.tasks.run_ansible_task"
:
{
'exchange'
:
'ansible'
,
'routing_key'
:
'ansible'
},
}
app
.
namespace
=
'CELERY'
app
.
namespace
=
'CELERY'
app
.
conf
.
update
(
configs
)
app
.
conf
.
update
(
configs
)
app
.
autodiscover_tasks
(
lambda
:
[
app_config
.
split
(
'.'
)[
0
]
for
app_config
in
settings
.
INSTALLED_APPS
])
app
.
autodiscover_tasks
(
lambda
:
[
app_config
.
split
(
'.'
)[
0
]
for
app_config
in
settings
.
INSTALLED_APPS
])
apps/ops/tasks.py
View file @
ba288396
...
@@ -23,7 +23,7 @@ def rerun_task():
...
@@ -23,7 +23,7 @@ def rerun_task():
pass
pass
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
run_ansible_task
(
tid
,
callback
=
None
,
**
kwargs
):
def
run_ansible_task
(
tid
,
callback
=
None
,
**
kwargs
):
"""
"""
:param tid: is the tasks serialized data
:param tid: is the tasks serialized data
...
@@ -98,7 +98,7 @@ def create_or_update_registered_periodic_tasks():
...
@@ -98,7 +98,7 @@ def create_or_update_registered_periodic_tasks():
create_or_update_celery_periodic_tasks
(
task
)
create_or_update_celery_periodic_tasks
(
task
)
@shared_task
@shared_task
(
queue
=
"ansible"
)
def
hello
(
name
,
callback
=
None
):
def
hello
(
name
,
callback
=
None
):
import
time
import
time
time
.
sleep
(
10
)
time
.
sleep
(
10
)
...
...
jms
View file @
ba288396
...
@@ -4,10 +4,15 @@
...
@@ -4,10 +4,15 @@
import
os
import
os
import
subprocess
import
subprocess
import
threading
import
threading
import
logging
import
logging.handlers
import
time
import
time
import
argparse
import
argparse
import
sys
import
sys
import
signal
import
signal
from
collections
import
defaultdict
import
daemon
from
daemon
import
pidfile
BASE_DIR
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
))
BASE_DIR
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
))
sys
.
path
.
insert
(
0
,
BASE_DIR
)
sys
.
path
.
insert
(
0
,
BASE_DIR
)
...
@@ -50,7 +55,9 @@ WORKERS = 4
...
@@ -50,7 +55,9 @@ WORKERS = 4
DAEMON
=
False
DAEMON
=
False
EXIT_EVENT
=
threading
.
Event
()
EXIT_EVENT
=
threading
.
Event
()
all_services
=
[
'gunicorn'
,
'celery'
,
'beat'
]
LOCK
=
threading
.
Lock
()
daemon_pid_file
=
''
try
:
try
:
os
.
makedirs
(
os
.
path
.
join
(
BASE_DIR
,
"data"
,
"static"
))
os
.
makedirs
(
os
.
path
.
join
(
BASE_DIR
,
"data"
,
"static"
))
...
@@ -58,6 +65,58 @@ try:
...
@@ -58,6 +65,58 @@ try:
except
:
except
:
pass
pass
class
LogPipe
(
threading
.
Thread
):
def
__init__
(
self
,
name
,
file_path
,
to_stdout
=
False
):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading
.
Thread
.
__init__
(
self
)
self
.
daemon
=
False
self
.
name
=
name
self
.
file_path
=
file_path
self
.
to_stdout
=
to_stdout
self
.
fdRead
,
self
.
fdWrite
=
os
.
pipe
()
self
.
pipeReader
=
os
.
fdopen
(
self
.
fdRead
)
self
.
logger
=
self
.
init_logger
()
self
.
start
()
def
init_logger
(
self
):
_logger
=
logging
.
getLogger
(
self
.
name
)
_logger
.
setLevel
(
logging
.
INFO
)
_formatter
=
logging
.
Formatter
(
'
%(message)
s'
)
_handler
=
logging
.
handlers
.
RotatingFileHandler
(
self
.
file_path
,
mode
=
'a'
,
maxBytes
=
5
*
1024
*
1024
,
backupCount
=
5
)
_handler
.
setFormatter
(
_formatter
)
_handler
.
setLevel
(
logging
.
INFO
)
_logger
.
addHandler
(
_handler
)
if
self
.
to_stdout
:
_console
=
logging
.
StreamHandler
()
_console
.
setLevel
(
logging
.
INFO
)
_console
.
setFormatter
(
_formatter
)
_logger
.
addHandler
(
_console
)
return
_logger
def
fileno
(
self
):
"""Return the write file descriptor of the pipe
"""
return
self
.
fdWrite
def
run
(
self
):
"""Run the thread, logging everything.
"""
for
line
in
iter
(
self
.
pipeReader
.
readline
,
''
):
self
.
logger
.
info
(
line
.
strip
(
'
\n
'
))
self
.
pipeReader
.
close
()
def
close
(
self
):
"""Close the write end of the pipe.
"""
os
.
close
(
self
.
fdWrite
)
def
check_database_connection
():
def
check_database_connection
():
os
.
chdir
(
os
.
path
.
join
(
BASE_DIR
,
'apps'
))
os
.
chdir
(
os
.
path
.
join
(
BASE_DIR
,
'apps'
))
for
i
in
range
(
60
):
for
i
in
range
(
60
):
...
@@ -81,7 +140,9 @@ def make_migrations():
...
@@ -81,7 +140,9 @@ def make_migrations():
def
collect_static
():
def
collect_static
():
print
(
"Collect static files"
)
print
(
"Collect static files"
)
os
.
chdir
(
os
.
path
.
join
(
BASE_DIR
,
'apps'
))
os
.
chdir
(
os
.
path
.
join
(
BASE_DIR
,
'apps'
))
subprocess
.
call
(
'python3 manage.py collectstatic --no-input -c &> /dev/null && echo "Collect static file done"'
,
shell
=
True
)
command
=
'python3 manage.py collectstatic --no-input -c &> /dev/null '
\
'&& echo "Collect static file done"'
subprocess
.
call
(
command
,
shell
=
True
)
def
prepare
():
def
prepare
():
...
@@ -100,18 +161,17 @@ def check_pid(pid):
...
@@ -100,18 +161,17 @@ def check_pid(pid):
return
True
return
True
def
get_pid_file_path
(
s
ervice
):
def
get_pid_file_path
(
s
):
return
os
.
path
.
join
(
TMP_DIR
,
'{}.pid'
.
format
(
service
))
return
os
.
path
.
join
(
'/tmp'
,
'{}.pid'
.
format
(
s
))
def
get_log_file_path
(
s
ervice
):
def
get_log_file_path
(
s
):
return
os
.
path
.
join
(
LOG_DIR
,
'{}.log'
.
format
(
s
ervice
))
return
os
.
path
.
join
(
LOG_DIR
,
'{}.log'
.
format
(
s
))
def
get_pid
(
service
):
def
get_pid_from_file
(
path
):
pid_file
=
get_pid_file_path
(
service
)
if
os
.
path
.
isfile
(
path
):
if
os
.
path
.
isfile
(
pid_file
):
with
open
(
path
)
as
f
:
with
open
(
pid_file
)
as
f
:
try
:
try
:
return
int
(
f
.
read
()
.
strip
())
return
int
(
f
.
read
()
.
strip
())
except
ValueError
:
except
ValueError
:
...
@@ -119,12 +179,19 @@ def get_pid(service):
...
@@ -119,12 +179,19 @@ def get_pid(service):
return
0
return
0
def
get_pid
(
s
):
pid_file
=
get_pid_file_path
(
s
)
return
get_pid_from_file
(
pid_file
)
def
is_running
(
s
,
unlink
=
True
):
def
is_running
(
s
,
unlink
=
True
):
pid_file
=
get_pid_file_path
(
s
)
pid_file
=
get_pid_file_path
(
s
)
if
os
.
path
.
isfile
(
pid_file
):
if
os
.
path
.
isfile
(
pid_file
):
pid
=
get_pid
(
s
)
pid
=
get_pid
(
s
)
if
check_pid
(
pid
):
if
pid
==
0
:
return
False
elif
check_pid
(
pid
):
return
True
return
True
if
unlink
:
if
unlink
:
...
@@ -133,156 +200,191 @@ def is_running(s, unlink=True):
...
@@ -133,156 +200,191 @@ def is_running(s, unlink=True):
def
parse_service
(
s
):
def
parse_service
(
s
):
all_services
=
[
'gunicorn'
,
'celery_ansible'
,
'celery_default'
,
'beat'
]
if
s
==
'all'
:
if
s
==
'all'
:
return
all_services
return
all_services
elif
s
==
"celery"
:
return
[
"celery_ansible"
,
"celery_default"
]
elif
","
in
s
:
elif
","
in
s
:
return
[
i
.
strip
()
for
i
in
s
.
split
(
','
)]
services
=
set
()
for
i
in
s
.
split
(
','
):
services
.
update
(
parse_service
(
i
))
return
services
else
:
else
:
return
[
s
]
return
[
s
]
def
start_gunicorn
():
def
get_start_gunicorn_kwargs
():
print
(
"
\n
- Start Gunicorn WSGI HTTP Server"
)
print
(
"
\n
- Start Gunicorn WSGI HTTP Server"
)
prepare
()
prepare
()
service
=
'gunicorn'
service
=
'gunicorn'
bind
=
'{}:{}'
.
format
(
HTTP_HOST
,
HTTP_PORT
)
bind
=
'{}:{}'
.
format
(
HTTP_HOST
,
HTTP_PORT
)
log_format
=
'
%(h)
s
%(t)
s "
%(r)
s"
%(s)
s
%(b)
s '
log_format
=
'
%(h)
s
%(t)
s "
%(r)
s"
%(s)
s
%(b)
s '
pid_file
=
get_pid_file_path
(
service
)
pid_file
=
get_pid_file_path
(
service
)
log_file
=
get_log_file_path
(
service
)
cmd
=
[
cmd
=
[
'gunicorn'
,
'jumpserver.wsgi'
,
'gunicorn'
,
'jumpserver.wsgi'
,
'-b'
,
bind
,
'-b'
,
bind
,
#'-k', 'eventlet',
'-k'
,
'gthread'
,
'-k'
,
'gthread'
,
'--threads'
,
'10'
,
'--threads'
,
'10'
,
'-w'
,
str
(
WORKERS
),
'-w'
,
str
(
WORKERS
),
'--max-requests'
,
'4096'
,
'--max-requests'
,
'4096'
,
'--access-logformat'
,
log_format
,
'--access-logformat'
,
log_format
,
'-p'
,
pid_file
,
'-p'
,
pid_file
,
'--access-logfile'
,
'-'
]
]
if
DAEMON
:
cmd
.
extend
([
'--daemon'
,
])
else
:
cmd
.
extend
([
'--access-logfile'
,
'-'
])
if
DEBUG
:
if
DEBUG
:
cmd
.
append
(
'--reload'
)
cmd
.
append
(
'--reload'
)
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stdout
,
stderr
=
sys
.
stderr
,
cwd
=
APPS_DIR
)
return
{
'cmd'
:
cmd
,
'cwd'
:
APPS_DIR
}
return
p
def
start_celery
():
def
get_start_celery_ansible_kwargs
():
print
(
"
\n
- Start Celery as Distributed Task Queue"
)
print
(
"
\n
- Start Celery as Distributed Task Queue"
)
return
get_start_worker_kwargs
(
'ansible'
,
4
)
def
get_start_celery_default_kwargs
():
return
get_start_worker_kwargs
(
'celery'
,
2
)
def
get_start_worker_kwargs
(
queue
,
num
):
# Todo: Must set this environment, otherwise not no ansible result return
# Todo: Must set this environment, otherwise not no ansible result return
os
.
environ
.
setdefault
(
'PYTHONOPTIMIZE'
,
'1'
)
os
.
environ
.
setdefault
(
'PYTHONOPTIMIZE'
,
'1'
)
if
os
.
getuid
()
==
0
:
if
os
.
getuid
()
==
0
:
os
.
environ
.
setdefault
(
'C_FORCE_ROOT'
,
'1'
)
os
.
environ
.
setdefault
(
'C_FORCE_ROOT'
,
'1'
)
service
=
'celery'
pid_file
=
get_pid_file_path
(
service
)
cmd
=
[
cmd
=
[
'celery'
,
'worker'
,
'celery'
,
'worker'
,
'-A'
,
'ops'
,
'-A'
,
'ops'
,
'-l'
,
'INFO'
,
'-l'
,
'INFO'
,
'-
-pidfile'
,
pid_file
,
'-
c'
,
str
(
num
)
,
'-
-autoscale'
,
'20,4'
,
'-
Q'
,
queue
,
]
]
if
DAEMON
:
return
{
"cmd"
:
cmd
,
"cwd"
:
APPS_DIR
}
cmd
.
extend
([
'--logfile'
,
os
.
path
.
join
(
LOG_DIR
,
'celery.log'
),
'--detach'
,
])
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stdout
,
stderr
=
sys
.
stderr
,
cwd
=
APPS_DIR
)
return
p
def
start_beat
():
def
get_start_beat_kwargs
():
print
(
"
\n
- Start Beat as Periodic Task Scheduler"
)
print
(
"
\n
- Start Beat as Periodic Task Scheduler"
)
pid_file
=
get_pid_file_path
(
'beat'
)
log_file
=
get_log_file_path
(
'beat'
)
os
.
environ
.
setdefault
(
'PYTHONOPTIMIZE'
,
'1'
)
os
.
environ
.
setdefault
(
'PYTHONOPTIMIZE'
,
'1'
)
if
os
.
getuid
()
==
0
:
if
os
.
getuid
()
==
0
:
os
.
environ
.
setdefault
(
'C_FORCE_ROOT'
,
'1'
)
os
.
environ
.
setdefault
(
'C_FORCE_ROOT'
,
'1'
)
scheduler
=
"django_celery_beat.schedulers:DatabaseScheduler"
scheduler
=
"django_celery_beat.schedulers:DatabaseScheduler"
cmd
=
[
cmd
=
[
'celery'
,
'beat'
,
'celery'
,
'beat'
,
'-A'
,
'ops'
,
'-A'
,
'ops'
,
'--pidfile'
,
pid_file
,
'-l'
,
'INFO'
,
'-l'
,
LOG_LEVEL
,
'--scheduler'
,
scheduler
,
'--scheduler'
,
scheduler
,
'--max-interval'
,
'60'
'--max-interval'
,
'60'
]
]
if
DAEMON
:
return
{
"cmd"
:
cmd
,
'cwd'
:
APPS_DIR
}
cmd
.
extend
([
'--logfile'
,
log_file
,
'--detach'
,
processes
=
{}
])
p
=
subprocess
.
Popen
(
cmd
,
stdout
=
sys
.
stdout
,
stderr
=
sys
.
stderr
,
cwd
=
APPS_DIR
)
return
p
def
watch_services
():
max_retry
=
3
signal
.
signal
(
signal
.
SIGTERM
,
lambda
x
,
y
:
clean_up
())
services_retry
=
defaultdict
(
int
)
stopped_services
=
{}
def
check_services
():
for
s
,
p
in
processes
.
items
():
try
:
p
.
wait
(
timeout
=
1
)
stopped_services
[
s
]
=
''
except
subprocess
.
TimeoutExpired
:
stopped_services
.
pop
(
s
,
None
)
services_retry
.
pop
(
s
,
None
)
continue
def
retry_start_stopped_services
():
for
s
in
stopped_services
:
if
services_retry
[
s
]
>
max_retry
:
print
(
"
\n
Service start failed, exit: "
,
s
)
EXIT_EVENT
.
set
()
break
print
(
"
\n
> Find {} stopped, retry {}"
.
format
(
s
,
services_retry
[
s
]
+
1
)
)
p
=
start_service
(
s
)
processes
[
s
]
=
p
services_retry
[
s
]
+=
1
while
not
EXIT_EVENT
.
is_set
():
try
:
with
LOCK
:
check_services
()
retry_start_stopped_services
()
time
.
sleep
(
10
)
except
KeyboardInterrupt
:
time
.
sleep
(
1
)
break
clean_up
()
def
start_service
(
s
):
def
start_service
(
s
):
services_kwargs
=
{
"gunicorn"
:
get_start_gunicorn_kwargs
,
"celery_ansible"
:
get_start_celery_ansible_kwargs
,
"celery_default"
:
get_start_celery_default_kwargs
,
"beat"
:
get_start_beat_kwargs
,
}
kwargs
=
services_kwargs
.
get
(
s
)()
pid_file
=
get_pid_file_path
(
s
)
if
os
.
path
.
isfile
(
pid_file
):
os
.
unlink
(
pid_file
)
cmd
=
kwargs
.
pop
(
'cmd'
)
to_stdout
=
False
if
not
DAEMON
:
to_stdout
=
True
log_file
=
get_log_file_path
(
s
)
_logger
=
LogPipe
(
s
,
log_file
,
to_stdout
=
to_stdout
)
stderr
=
stdout
=
_logger
kwargs
.
update
({
"stderr"
:
stderr
,
"stdout"
:
stdout
})
p
=
subprocess
.
Popen
(
cmd
,
**
kwargs
)
with
open
(
pid_file
,
'w'
)
as
f
:
f
.
write
(
str
(
p
.
pid
))
return
p
def
start_services_and_watch
(
s
):
print
(
time
.
ctime
())
print
(
time
.
ctime
())
print
(
'Jumpserver version {}, more see https://www.jumpserver.org'
.
format
(
print
(
'Jumpserver version {}, more see https://www.jumpserver.org'
.
format
(
__version__
))
__version__
)
)
services_handler
=
{
"gunicorn"
:
start_gunicorn
,
"celery"
:
start_celery
,
"beat"
:
start_beat
}
services_set
=
parse_service
(
s
)
services_set
=
parse_service
(
s
)
processes
=
[]
for
i
in
services_set
:
for
i
in
services_set
:
if
is_running
(
i
):
if
is_running
(
i
):
show_service_status
(
i
)
show_service_status
(
i
)
continue
continue
func
=
services_handler
.
get
(
i
)
p
=
start_service
(
i
)
p
=
func
()
time
.
sleep
(
2
)
processes
.
append
(
p
)
processes
[
i
]
=
p
now
=
int
(
time
.
time
())
for
i
in
services_set
:
while
not
is_running
(
i
):
if
int
(
time
.
time
())
-
now
<
START_TIMEOUT
:
time
.
sleep
(
1
)
continue
else
:
print
(
"Error: {} start error"
.
format
(
i
))
stop_multi_services
(
services_set
)
return
stop_event
=
threading
.
Event
()
if
not
DAEMON
:
if
not
DAEMON
:
signal
.
signal
(
signal
.
SIGTERM
,
lambda
x
,
y
:
stop_event
.
set
())
watch_services
()
while
not
stop_event
.
is_set
():
try
:
time
.
sleep
(
10
)
except
KeyboardInterrupt
:
stop_event
.
set
()
break
print
(
"Stop services"
)
for
p
in
processes
:
p
.
terminate
()
for
i
in
services_set
:
stop_service
(
i
)
else
:
else
:
print
()
show_service_status
(
s
)
show_service_status
(
s
)
global
daemon_pid_file
daemon_pid_file
=
get_pid_file_path
(
'jms'
)
context
=
daemon
.
DaemonContext
(
pidfile
=
pidfile
.
TimeoutPIDLockFile
(
daemon_pid_file
),
signal_map
=
{
signal
.
SIGTERM
:
clean_up
,
signal
.
SIGHUP
:
'terminate'
,
},
)
with
context
:
watch_services
()
def
stop_service
(
s
,
sig
=
15
):
def
stop_service
(
s
,
sig
=
15
):
...
@@ -294,6 +396,12 @@ def stop_service(s, sig=15):
...
@@ -294,6 +396,12 @@ def stop_service(s, sig=15):
print
(
"Stop service: {}"
.
format
(
s
))
print
(
"Stop service: {}"
.
format
(
s
))
pid
=
get_pid
(
s
)
pid
=
get_pid
(
s
)
os
.
kill
(
pid
,
sig
)
os
.
kill
(
pid
,
sig
)
with
LOCK
:
processes
.
pop
(
s
,
None
)
if
s
==
"all"
:
pid
=
get_pid
(
'jms'
)
os
.
kill
(
pid
,
sig
)
def
stop_multi_services
(
services
):
def
stop_multi_services
(
services
):
...
@@ -305,6 +413,15 @@ def stop_service_force(s):
...
@@ -305,6 +413,15 @@ def stop_service_force(s):
stop_service
(
s
,
sig
=
9
)
stop_service
(
s
,
sig
=
9
)
def
clean_up
():
if
not
EXIT_EVENT
.
is_set
():
EXIT_EVENT
.
set
()
processes_dump
=
{
k
:
v
for
k
,
v
in
processes
.
items
()}
for
s1
,
p1
in
processes_dump
.
items
():
stop_service
(
s1
)
p1
.
wait
()
def
show_service_status
(
s
):
def
show_service_status
(
s
):
services_set
=
parse_service
(
s
)
services_set
=
parse_service
(
s
)
for
ns
in
services_set
:
for
ns
in
services_set
:
...
@@ -348,13 +465,14 @@ if __name__ == '__main__':
...
@@ -348,13 +465,14 @@ if __name__ == '__main__':
srv
=
args
.
service
srv
=
args
.
service
if
action
==
"start"
:
if
action
==
"start"
:
start_service
(
srv
)
start_services_and_watch
(
srv
)
os
.
_exit
(
0
)
elif
action
==
"stop"
:
elif
action
==
"stop"
:
stop_service
(
srv
)
stop_service
(
srv
)
elif
action
==
"restart"
:
elif
action
==
"restart"
:
DAEMON
=
True
DAEMON
=
True
stop_service
(
srv
)
stop_service
(
srv
)
time
.
sleep
(
5
)
time
.
sleep
(
5
)
start_service
(
srv
)
start_service
s_and_watch
(
srv
)
else
:
else
:
show_service_status
(
srv
)
show_service_status
(
srv
)
requirements/requirements.txt
View file @
ba288396
...
@@ -81,3 +81,4 @@ django-radius==1.3.3
...
@@ -81,3 +81,4 @@ django-radius==1.3.3
ipip-ipdb==1.2.1
ipip-ipdb==1.2.1
django-redis-sessions==0.6.1
django-redis-sessions==0.6.1
unicodecsv==0.14.1
unicodecsv==0.14.1
python-daemon==2.2.3
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment