Commit c5391e29 authored by 唐香港's avatar 唐香港

Update example_python_operator.py

parent b1acd358
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os,logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
default_args = {
'owner': 'wangzhaojun',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=2890),
'retries': 2,
'retry_delay': timedelta(seconds=5)
}
dag = DAG(
dag_id='clear_tasks_container_logs',
default_args=default_args,
schedule_interval=timedelta(minutes=1445)
)
def clear_worker_logs():
dt = datetime.now() - timedelta(minutes=1445)
time_str = dt.strftime('%Y-%m-%d')
cmd = 'rm -rf /opt/bitnami/airflow/logs/*/*/%s*' % (time_str)
logger.info('exec cmd : ' + cmd)
os.system(cmd)
task = PythonOperator(
task_id='clear_worker_logs',
python_callable=clear_worker_logs,
dag=dag,
)
if __name__ == '__main__':
dag.cli()
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import time
from builtins import range
from pprint import pprint
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'wangzhaojun',
'start_date': days_ago(2),
}
dag = DAG(
dag_id='example_python_operator',
default_args=args,
schedule_interval=None,
tags=['example']
)
# [START howto_operator_python]
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
# [END howto_operator_python]
# [START howto_operator_python_kwargs]
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag,
)
run_this >> task
# [END howto_operator_python_kwargs]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment