Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
A
airflow-dags-hub
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
唐香港
airflow-dags-hub
Commits
3db7e0f0
Commit
3db7e0f0
authored
Sep 02, 2019
by
edz
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
modify subprocess popen stdout and stderr to leave
parent
669ba9f4
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
46 additions
and
23 deletions
+46
-23
alarm_flink_job.py
dags/flink/alarm_flink_job.py
+13
-6
alarm_kafka_connect.py
dags/kafka/alarm_kafka_connect.py
+20
-10
alarm_kafka_connect_v2.py
dags/kafka/alarm_kafka_connect_v2.py
+13
-7
No files found.
dags/flink/alarm_flink_job.py
View file @
3db7e0f0
...
@@ -29,7 +29,7 @@ class TimeoutError(Exception):
...
@@ -29,7 +29,7 @@ class TimeoutError(Exception):
def
run_command
(
cmd
,
timeout
=
60
):
def
run_command
(
cmd
,
timeout
=
60
):
is_linux
=
platform
.
system
()
==
'Linux'
is_linux
=
platform
.
system
()
==
'Linux'
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
STDOUT
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
t_beginning
=
time
.
time
()
t_beginning
=
time
.
time
()
seconds_passed
=
0
seconds_passed
=
0
while
True
:
while
True
:
...
@@ -44,7 +44,7 @@ def run_command(cmd, timeout=60):
...
@@ -44,7 +44,7 @@ def run_command(cmd, timeout=60):
p
.
terminate
()
p
.
terminate
()
raise
TimeoutError
(
cmd
,
timeout
)
raise
TimeoutError
(
cmd
,
timeout
)
time
.
sleep
(
0.1
)
time
.
sleep
(
0.1
)
return
p
.
stdout
.
read
(
)
return
(
p
.
stdout
.
read
(),
p
.
stderr
.
read
()
)
default_args
=
{
default_args
=
{
'owner'
:
'tangxianggang'
,
'owner'
:
'tangxianggang'
,
...
@@ -80,12 +80,19 @@ def get_jobs(cmd):
...
@@ -80,12 +80,19 @@ def get_jobs(cmd):
timeout
=
10
timeout
=
10
try
:
try
:
result
=
run_command
(
cmd
,
timeout
)
(
stdout
,
stderr
)
=
run_command
(
cmd
,
timeout
)
result
=
result
.
decode
()
stdout
=
stdout
.
decode
()
if
'Could not resolve host'
in
result
:
stderr
=
stderr
.
decode
()
logging
.
debug
(
stderr
)
logging
.
debug
(
stdout
)
if
'Could not resolve host'
in
stderr
and
not
stdout
:
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
raise
Exception
(
msg
)
raise
Exception
(
msg
)
jobs_dict
=
json
.
loads
(
result
.
split
(
'
\n
'
)[
-
1
])
elif
not
stdout
:
msg
=
'
\n
- **error_msg**: connectors is empty'
raise
Exception
(
msg
)
jobs_dict
=
json
.
loads
(
stdout
)
if
'errors'
in
jobs_dict
:
if
'errors'
in
jobs_dict
:
msg
=
'
\n
- **errors**: '
+
jobs_dict
[
'errors'
][
0
]
msg
=
'
\n
- **errors**: '
+
jobs_dict
[
'errors'
][
0
]
logging
.
error
(
msg
)
logging
.
error
(
msg
)
...
...
dags/kafka/alarm_kafka_connect.py
View file @
3db7e0f0
...
@@ -30,7 +30,7 @@ class TimeoutError(Exception):
...
@@ -30,7 +30,7 @@ class TimeoutError(Exception):
def
run_command
(
cmd
,
timeout
=
60
):
def
run_command
(
cmd
,
timeout
=
60
):
is_linux
=
platform
.
system
()
==
'Linux'
is_linux
=
platform
.
system
()
==
'Linux'
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
STDOUT
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
t_beginning
=
time
.
time
()
t_beginning
=
time
.
time
()
seconds_passed
=
0
seconds_passed
=
0
while
True
:
while
True
:
...
@@ -45,7 +45,7 @@ def run_command(cmd, timeout=60):
...
@@ -45,7 +45,7 @@ def run_command(cmd, timeout=60):
p
.
terminate
()
p
.
terminate
()
raise
TimeoutError
(
cmd
,
timeout
)
raise
TimeoutError
(
cmd
,
timeout
)
time
.
sleep
(
0.1
)
time
.
sleep
(
0.1
)
return
p
.
stdout
.
read
(
)
return
(
p
.
stdout
.
read
(),
p
.
stderr
.
read
()
)
default_args
=
{
default_args
=
{
'owner'
:
'tangxianggang'
,
'owner'
:
'tangxianggang'
,
...
@@ -81,12 +81,19 @@ def get_kafka_connectors(cmd):
...
@@ -81,12 +81,19 @@ def get_kafka_connectors(cmd):
timeout
=
10
timeout
=
10
try
:
try
:
result
=
run_command
(
cmd
,
timeout
)
(
stdout
,
stderr
)
=
run_command
(
cmd
,
timeout
)
result
=
result
.
decode
()
stdout
=
stdout
.
decode
()
if
'Could not resolve host'
in
result
:
stderr
=
stderr
.
decode
()
logging
.
debug
(
stderr
)
logging
.
debug
(
stdout
)
if
'Could not resolve host'
in
stderr
and
not
stdout
:
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
raise
Exception
(
msg
)
raise
Exception
(
msg
)
connectors_list
=
json
.
loads
(
result
.
split
(
'
\n
'
)[
-
1
])
elif
not
stdout
:
msg
=
'
\n
- **error_msg**: connectors is empty'
raise
Exception
(
msg
)
connectors_list
=
json
.
loads
(
stdout
)
if
'error_code'
in
connectors_list
:
if
'error_code'
in
connectors_list
:
msg
=
'
\n
- **error_code**: '
+
connectors_list
[
'error_code'
]
+
'**error_msg**: '
+
connectors_list
[
'message'
]
msg
=
'
\n
- **error_code**: '
+
connectors_list
[
'error_code'
]
+
'**error_msg**: '
+
connectors_list
[
'message'
]
logging
.
error
(
msg
)
logging
.
error
(
msg
)
...
@@ -96,7 +103,6 @@ def get_kafka_connectors(cmd):
...
@@ -96,7 +103,6 @@ def get_kafka_connectors(cmd):
logging
.
error
(
msg
)
logging
.
error
(
msg
)
raise
Exception
(
msg
)
raise
Exception
(
msg
)
logging
.
info
(
str
(
connectors_list
))
return
connectors_list
return
connectors_list
def
get_connector_status
(
cmd
):
def
get_connector_status
(
cmd
):
...
@@ -105,9 +111,13 @@ def get_connector_status(cmd):
...
@@ -105,9 +111,13 @@ def get_connector_status(cmd):
outdict
=
{}
outdict
=
{}
try
:
try
:
result
=
run_command
(
cmd
,
timeout
)
(
stdout
,
stderr
)
=
run_command
(
cmd
,
timeout
)
result
=
result
.
decode
()
stdout
=
stdout
.
decode
()
outdict
=
json
.
loads
(
result
.
split
(
'
\n
'
)[
-
1
])
stderr
=
stderr
.
decode
()
logging
.
debug
(
stderr
)
logging
.
debug
(
stdout
)
outdict
=
json
.
loads
(
stdout
)
except
TimeoutError
:
except
TimeoutError
:
msg
=
'
\n
- **error_msg**: excute command=(
%
s) timeout after
%
i s'
%
(
cmd
,
timeout
)
msg
=
'
\n
- **error_msg**: excute command=(
%
s) timeout after
%
i s'
%
(
cmd
,
timeout
)
logging
.
error
(
msg
)
logging
.
error
(
msg
)
...
...
dags/kafka/alarm_kafka_connect_v2.py
View file @
3db7e0f0
...
@@ -28,7 +28,7 @@ class TimeoutError(Exception):
...
@@ -28,7 +28,7 @@ class TimeoutError(Exception):
def
run_command
(
cmd
,
timeout
=
60
):
def
run_command
(
cmd
,
timeout
=
60
):
is_linux
=
platform
.
system
()
==
'Linux'
is_linux
=
platform
.
system
()
==
'Linux'
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
STDOUT
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
p
=
subprocess
.
Popen
(
cmd
,
stderr
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
shell
=
True
,
preexec_fn
=
os
.
setsid
if
is_linux
else
None
)
t_beginning
=
time
.
time
()
t_beginning
=
time
.
time
()
seconds_passed
=
0
seconds_passed
=
0
while
True
:
while
True
:
...
@@ -43,7 +43,7 @@ def run_command(cmd, timeout=60):
...
@@ -43,7 +43,7 @@ def run_command(cmd, timeout=60):
p
.
terminate
()
p
.
terminate
()
raise
TimeoutError
(
cmd
,
timeout
)
raise
TimeoutError
(
cmd
,
timeout
)
time
.
sleep
(
0.1
)
time
.
sleep
(
0.1
)
return
p
.
stdout
.
read
(
)
return
(
p
.
stdout
.
read
(),
p
.
stderr
.
read
()
)
default_args
=
{
default_args
=
{
'owner'
:
'tangxianggang'
,
'owner'
:
'tangxianggang'
,
...
@@ -79,13 +79,19 @@ def get_kafka_connectors_status(cmd):
...
@@ -79,13 +79,19 @@ def get_kafka_connectors_status(cmd):
timeout
=
10
timeout
=
10
try
:
try
:
result
=
run_command
(
cmd
,
timeout
)
(
stdout
,
stderr
)
=
run_command
(
cmd
,
timeout
)
result
=
result
.
decode
()
stdout
=
stdout
.
decode
()
logging
.
info
(
result
)
stderr
=
stderr
.
decode
()
if
'Could not resolve host'
in
result
:
logging
.
debug
(
stderr
)
logging
.
debug
(
stdout
)
if
'Could not resolve host'
in
stderr
and
not
stdout
:
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
msg
=
'
\n
- **error_msg**: Could not resolve host
%
s'
%
(
cmd
)
raise
Exception
(
msg
)
raise
Exception
(
msg
)
connectors_dict
=
json
.
loads
(
result
.
split
(
'
\n
'
)[
-
1
])
elif
not
stdout
:
msg
=
'
\n
- **error_msg**: connectors is empty'
raise
Exception
(
msg
)
connectors_dict
=
json
.
loads
(
stdout
)
if
'error_code'
in
connectors_dict
:
if
'error_code'
in
connectors_dict
:
msg
=
'
\n
- **error_code**: '
+
connectors_dict
[
'error_code'
]
+
'**error_msg**: '
+
connectors_dict
[
'message'
]
msg
=
'
\n
- **error_code**: '
+
connectors_dict
[
'error_code'
]
+
'**error_msg**: '
+
connectors_dict
[
'message'
]
logging
.
error
(
msg
)
logging
.
error
(
msg
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment