Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

We have just started including airflow for scheduling.

(我们刚刚开始包括气流以进行调度。)

One of my scripts runs daily.

(我的脚本之一每天运行。)

It uses the template parameter ({{ ds_nodash }}) to get the dates.

(它使用模板参数({{ds_nodash}})获取日期。)

But I have to rerun for one day load (Past dated), how can I provide input parameter.

(但是我必须重新运行一天的负载(过去的日期),如何提供输入参数。)

Input parameter will override the ds_nodash.

(输入参数将覆盖ds_nodash。)

I have :
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} " 

Would like to run for 
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade **20190601** " 

Code snippet Below:

(下面的代码段:)

import os
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 19),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('create-data-set-job', default_args=default_args)
projct_dr='/home/airflow/projects/'

trade_acx_ld="/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh" 
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} " 


t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

if os.path.exists(trade_acx_ld):
   t2 = BashOperator(
        task_id= 'Dataset_create',
        bash_command=trade_acx_ld_cmd,
        dag=dag
   )
else:
    raise Exception("Cannot locate {0}".format(trade_acx_ld_cmd))

t2.set_upstream(t1)
  ask by user3858193 translate from so

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
1.4k views
Welcome To Ask or Share your Answers For Others

1 Answer

You can just manually trigger the DAG using airflow trigger_dag .

(您可以使用airflow trigger_dag手动触发DAG。)

The {{ ds_nodash }} will take execution_date so if you trigger the DAG with an old execution date, {{ ds_nodash }} will use that older execution_date instead of today's date.

({{ ds_nodash }}将采用execution_date日期,因此,如果您使用旧的执行日期来触发DAG,则{{ ds_nodash }}将使用较旧的execution_date日期而不是今天的日期。)

You can pass the execution_date to trigger_dag command as follows.

(您可以按以下方式将execution_date传递给trigger_dag命令。)

airflow trigger_dag gcs-file-load-job -e "2019-01-01"

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...