Commit 94db3ce2 authored by 唐香港's avatar 唐香港

Update example_bash_operator.py

parent 754f79ea
...@@ -16,13 +16,13 @@ ...@@ -16,13 +16,13 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
"""Example DAG demonstrating the usage of the BashOperator.""" """Example DAG demonstrating the usage of the PythonOperator."""
from datetime import timedelta import time
from pprint import pprint
from airflow.models import DAG from airflow.models import DAG
from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
args = { args = {
...@@ -31,44 +31,72 @@ args = { ...@@ -31,44 +31,72 @@ args = {
} }
dag = DAG( dag = DAG(
dag_id='example_bash_operator', dag_id='example_python_operator',
default_args=args, default_args=args,
schedule_interval='0 0 * * *', schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
tags=['example'] tags=['example']
) )
run_this_last = DummyOperator(
task_id='run_this_last',
dag=dag,
)
# [START howto_operator_bash] # [START howto_operator_python]
run_this = BashOperator( def print_context(ds, **kwargs):
task_id='run_after_loop', """Print the Airflow context and ds variable from the context."""
bash_command='echo 1', pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
python_callable=print_context,
dag=dag, dag=dag,
) )
# [END howto_operator_bash] # [END howto_operator_python]
# [START howto_operator_python_kwargs]
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
run_this >> run_this_last
for i in range(3): # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
task = BashOperator( for i in range(5):
task_id='runme_' + str(i), task = PythonOperator(
bash_command='echo "{{ task_instance_key_str }}" && sleep 1', task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag, dag=dag,
) )
task >> run_this
# [START howto_operator_bash_template] run_this >> task
also_run_this = BashOperator( # [END howto_operator_python_kwargs]
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
if __name__ == "__main__": def callable_virtualenv():
dag.cli() """
\ No newline at end of file Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from colorama import Fore, Back, Style
from time import sleep
print(Fore.RED + 'some red text')
print(Back.GREEN + 'and with a green background')
print(Style.DIM + 'and in dim text')
print(Style.RESET_ALL)
for _ in range(10):
print(Style.DIM + 'Please wait...', flush=True)
sleep(10)
print('Finished')
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=[
"colorama==0.4.0"
],
system_site_packages=False,
dag=dag,
)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment