Commit 55ffad5d authored by edz's avatar edz

modify subprocess popen stdout and stderr

parent 8665d5b9
...@@ -29,7 +29,7 @@ class TimeoutError(Exception): ...@@ -29,7 +29,7 @@ class TimeoutError(Exception):
def run_command(cmd, timeout=60): def run_command(cmd, timeout=60):
is_linux = platform.system() == 'Linux' is_linux = platform.system() == 'Linux'
p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid if is_linux else None) p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid if is_linux else None)
t_beginning = time.time() t_beginning = time.time()
seconds_passed = 0 seconds_passed = 0
while True: while True:
...@@ -44,7 +44,7 @@ def run_command(cmd, timeout=60): ...@@ -44,7 +44,7 @@ def run_command(cmd, timeout=60):
p.terminate() p.terminate()
raise TimeoutError(cmd, timeout) raise TimeoutError(cmd, timeout)
time.sleep(0.1) time.sleep(0.1)
return p.stdout.read() return (p.stdout.read(), p.stderr.read())
default_args = { default_args = {
'owner': 'tangxianggang', 'owner': 'tangxianggang',
...@@ -80,12 +80,19 @@ def get_jobs(cmd): ...@@ -80,12 +80,19 @@ def get_jobs(cmd):
timeout = 10 timeout = 10
try: try:
result = run_command(cmd, timeout) (stdout, stderr) = run_command(cmd, timeout)
result = result.decode() stdout = stdout.decode()
if 'Could not resolve host' in result: stderr = stderr.decode()
logging.debug(stderr)
logging.debug(stdout)
if 'Could not resolve host' in stderr and not stdout:
msg = '\n- **error_msg**: Could not resolve host %s' % (cmd) msg = '\n- **error_msg**: Could not resolve host %s' % (cmd)
raise Exception(msg) raise Exception(msg)
jobs_dict = json.loads(result.split('\n')[-1]) elif not stdout:
msg = '\n- **error_msg**: connectors is empty'
raise Exception(msg)
jobs_dict = json.loads(stdout)
if 'errors' in jobs_dict: if 'errors' in jobs_dict:
msg = '\n- **errors**: ' + jobs_dict['errors'][0] msg = '\n- **errors**: ' + jobs_dict['errors'][0]
logging.error(msg) logging.error(msg)
......
...@@ -30,7 +30,7 @@ class TimeoutError(Exception): ...@@ -30,7 +30,7 @@ class TimeoutError(Exception):
def run_command(cmd, timeout=60): def run_command(cmd, timeout=60):
is_linux = platform.system() == 'Linux' is_linux = platform.system() == 'Linux'
p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid if is_linux else None) p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid if is_linux else None)
t_beginning = time.time() t_beginning = time.time()
seconds_passed = 0 seconds_passed = 0
while True: while True:
...@@ -45,7 +45,7 @@ def run_command(cmd, timeout=60): ...@@ -45,7 +45,7 @@ def run_command(cmd, timeout=60):
p.terminate() p.terminate()
raise TimeoutError(cmd, timeout) raise TimeoutError(cmd, timeout)
time.sleep(0.1) time.sleep(0.1)
return p.stdout.read() return (p.stdout.read(), p.stderr.read())
default_args = { default_args = {
'owner': 'tangxianggang', 'owner': 'tangxianggang',
...@@ -81,12 +81,19 @@ def get_kafka_connectors(cmd): ...@@ -81,12 +81,19 @@ def get_kafka_connectors(cmd):
timeout = 10 timeout = 10
try: try:
result = run_command(cmd, timeout) (stdout, stderr) = run_command(cmd, timeout)
result = result.decode() stdout = stdout.decode()
if 'Could not resolve host' in result: stderr = stderr.decode()
logging.debug(stderr)
logging.debug(stdout)
if 'Could not resolve host' in stderr and not stdout:
msg = '\n- **error_msg**: Could not resolve host %s' % (cmd) msg = '\n- **error_msg**: Could not resolve host %s' % (cmd)
raise Exception(msg) raise Exception(msg)
connectors_list = json.loads(result.split('\n')[-1]) elif not stdout:
msg = '\n- **error_msg**: connectors is empty'
raise Exception(msg)
connectors_list = json.loads(stdout)
if 'error_code' in connectors_list: if 'error_code' in connectors_list:
msg = '\n- **error_code**: ' + connectors_list['error_code'] + '**error_msg**: ' + connectors_list['message'] msg = '\n- **error_code**: ' + connectors_list['error_code'] + '**error_msg**: ' + connectors_list['message']
logging.error(msg) logging.error(msg)
...@@ -105,16 +112,19 @@ def get_connector_status(cmd): ...@@ -105,16 +112,19 @@ def get_connector_status(cmd):
outdict = {} outdict = {}
try: try:
result = run_command(cmd, timeout) (stdout, stderr) = run_command(cmd, timeout)
result = result.decode() stdout = stdout.decode()
outdict = json.loads(result.split('\n')[-1]) stderr = stderr.decode()
logging.debug(stderr)
logging.debug(stdout)
outdict = json.loads(stdout)
except TimeoutError: except TimeoutError:
msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout) msg = '\n- **error_msg**: excute command=(%s) timeout after %i s' % (cmd, timeout)
logging.error(msg) logging.error(msg)
errdict = {'error_code':'600','message':'excute command=(%s) timeout after %i s' % (cmd, timeout)} errdict = {'error_code':'600','message':'excute command=(%s) timeout after %i s' % (cmd, timeout)}
return errdict return errdict
logging.info('get connector status : \n' + str(outdict))
return outdict return outdict
def judge_connector_status(connector_status_dict, connector): def judge_connector_status(connector_status_dict, connector):
......
...@@ -14,7 +14,7 @@ import airflow ...@@ -14,7 +14,7 @@ import airflow
#variable parameter #variable parameter
DAG_ID = 'alarm_kafka_connect_v2' DAG_ID = 'alarm_kafka_connect_v2'
START_DATE = datetime.now() - timedelta(minutes=30) START_DATE = datetime.now() - timedelta(minutes=30)
SCHEDULE_INTERVAL = timedelta(minutes=5) SCHEDULE_INTERVAL = timedelta(minutes=15)
SERVICES = [ SERVICES = [
'es-gmei-prod-cp-kafka-connect', 'es-gmei-prod-cp-kafka-connect',
...@@ -28,7 +28,7 @@ class TimeoutError(Exception): ...@@ -28,7 +28,7 @@ class TimeoutError(Exception):
def run_command(cmd, timeout=60): def run_command(cmd, timeout=60):
is_linux = platform.system() == 'Linux' is_linux = platform.system() == 'Linux'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid if is_linux else None) p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setsid if is_linux else None)
t_beginning = time.time() t_beginning = time.time()
seconds_passed = 0 seconds_passed = 0
while True: while True:
...@@ -43,7 +43,7 @@ def run_command(cmd, timeout=60): ...@@ -43,7 +43,7 @@ def run_command(cmd, timeout=60):
p.terminate() p.terminate()
raise TimeoutError(cmd, timeout) raise TimeoutError(cmd, timeout)
time.sleep(0.1) time.sleep(0.1)
return p.stdout.read() return (p.stdout.read(), p.stderr.read())
default_args = { default_args = {
'owner': 'tangxianggang', 'owner': 'tangxianggang',
...@@ -79,13 +79,19 @@ def get_kafka_connectors_status(cmd): ...@@ -79,13 +79,19 @@ def get_kafka_connectors_status(cmd):
timeout = 10 timeout = 10
try: try:
result = run_command(cmd, timeout) (stdout, stderr) = run_command(cmd, timeout)
#result = result.decode() stdout = stdout.decode()
logging.info(result) stderr = stderr.decode()
if not result: logging.debug(stderr)
logging.debug(stdout)
if 'Could not resolve host' in stderr and not stdout:
msg = '\n- **error_msg**: Could not resolve host %s' % (cmd) msg = '\n- **error_msg**: Could not resolve host %s' % (cmd)
raise Exception(msg) raise Exception(msg)
connectors_dict = json.loads(result) elif not stdout:
msg = '\n- **error_msg**: connectors is empty'
raise Exception(msg)
connectors_dict = json.loads(stdout)
if 'error_code' in connectors_dict: if 'error_code' in connectors_dict:
msg = '\n- **error_code**: ' + connectors_dict['error_code'] + '**error_msg**: ' + connectors_dict['message'] msg = '\n- **error_code**: ' + connectors_dict['error_code'] + '**error_msg**: ' + connectors_dict['message']
logging.error(msg) logging.error(msg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment