Commit e2cf384a authored by 吴升宇's avatar 吴升宇

update pgsql

parent 14e7e634
...@@ -19,44 +19,67 @@ dag = DAG( ...@@ -19,44 +19,67 @@ dag = DAG(
default_args=default_args, default_args=default_args,
catchup=False, catchup=False,
description='', description='',
schedule_interval='0 0 * * * 0', # 每天零点执行一次 schedule_interval='0 0 1 * * 0', # 每月一号零点执行一次
) )
time_arg = datetime.now() import datetime, calendar
time_arg = time_arg.strftime('%Y%m%d')
# t1 =SSHOperator( # 获取当前日期
# task_id='diagnosis_check_start_consultation_per_2m_task_01', now_time = datetime.datetime.now()
# ssh_conn_id='bj-gm-prod-ubu-celery005-gmuser', # 获取当前时间的星期数和月数
# command='source /srv/envs/diagnosis-celery/bin/activate && ' week, days_num = calendar.monthrange(now_time.year, now_time.month)
# 'cd /srv/apps/diagnosis-celery/ && ' # 获取本月的最后一天
# 'python manage.py schedule_check_start_dispatch_ok', end_day_in_mouth = now_time.replace(day=days_num)
# dag=dag, # 获取下月的第一天
# do_xcom_push=True start_day = end_day_in_mouth + datetime.timedelta(days=1)
# ) month = start_day.strftime('y%Ym%m')
# 获取下月的星期数和月数
week, days_num = calendar.monthrange(start_day.year, start_day.month)
# 获取下个月的最后一天
end_day = start_day + datetime.timedelta(days=(days_num-1))
day = datetime.now() end_str = str(end_day.date())
day_str = day.strftime('%Y%m%d') start_str = str(start_day.date())
day_date_str = day.strftime('%Y-%m-%d') sql1 = """
next_day = day + timedelta(days=1) CREATE TABLE events_{month} partition of events
for values FROM ('{start_str}') TO ('{end_str}');
next_day_date_str = next_day.strftime('%Y-%m-%d') """.format(month=month, start_str=start_str, end_str=end_str)
sql = """ print(sql1)
CREATE TABLE events_{day_str} partition of events sql2 = """
for values FROM ('{day_date_str}') TO ('{next_day_date_str}'); ALTER TABLE events_{month} ADD CONSTRAINT pk_events_{month} PRIMARY KEY(id);
""".format(month=month)
print(sql2)
""".format(day_str=day_str, day_date_str=day_date_str, next_day_date_str=next_day_date_str) sql3 = """
create index idx_events_{month}_event_time on events_{month}(event_time);
""".format(month=month)
print(sql3)
t1 = PostgresOperator( t1 = PostgresOperator(
task_id='psql_auto_create_table_per_day_task_01', task_id='psql_auto_create_table_per_day_task_01',
postgres_conn_id='postgres-test', postgres_conn_id='postgres-test',
sql=sql, sql=sql1,
dag=dag, dag=dag,
database='ruler', database='ruler',
params={'period': '201905'},
pool='pricing_pool') pool='pricing_pool')
t2 = PostgresOperator(
task_id='psql_auto_create_table_per_day_task_02',
postgres_conn_id='postgres-test',
sql=sql2,
dag=dag,
database='ruler',
pool='pricing_pool')
t3 = PostgresOperator(
task_id='psql_auto_create_table_per_day_task_03',
postgres_conn_id='postgres-test',
sql=sql3,
dag=dag,
database='ruler',
pool='pricing_pool')
t1 >> t2 >> t3
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment