dagster_airflow.
make_airflow_dag
(module_name, pipeline_name, run_config=None, mode=None, instance=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None)[source]¶Construct an Airflow DAG corresponding to a given Dagster pipeline.
Tasks in the resulting DAG will execute the Dagster logic they encapsulate as a Python
callable, run by an underlying PythonOperator
. As a
consequence, both dagster, any Python dependencies required by your solid logic, and the module
containing your pipeline definition must be available in the Python environment within which
your Airflow tasks execute. If you cannot install requirements into this environment, or you
are looking for a containerized solution to provide better isolation, see instead
make_airflow_dag_containerized()
.
This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.
module_name (str) – The name of the importable module in which the pipeline definition can be found.
pipeline_name (str) – The name of the pipeline definition.
run_config (Optional[dict]) – The config, if any, with which to compile the pipeline to an execution plan, as a Python dict.
mode (Optional[str]) – The mode in which to execute the pipeline.
instance (Optional[DagsterInstance]) – The Dagster instance to use to execute the pipeline.
dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to
DAG
).
dag_description (Optional[str]) – The description to use for the compiled Airflow DAG
(passed through to DAG
)
dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow
DAG
constructor, including default_args
.
op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow
operator (a subclass of
PythonOperator
).
The generated Airflow DAG, and a list of its constituent tasks.
dagster_airflow.
make_airflow_dag_for_operator
(recon_repo, pipeline_name, operator, run_config=None, mode=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None)[source]¶Construct an Airflow DAG corresponding to a given Dagster pipeline and custom operator.
Tasks in the resulting DAG will execute the Dagster logic they encapsulate run by the given
Operator BaseOperator
. If you
are looking for a containerized solution to provide better isolation, see instead
make_airflow_dag_containerized()
.
This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.
recon_repo (dagster.ReconstructableRepository
) – reference to a Dagster RepositoryDefinition
that can be reconstructed in another process
pipeline_name (str) – The name of the pipeline definition.
operator (type) – The operator to use. Must be a class that inherits from
BaseOperator
run_config (Optional[dict]) – The config, if any, with which to compile the pipeline to an execution plan, as a Python dict.
mode (Optional[str]) – The mode in which to execute the pipeline.
instance (Optional[DagsterInstance]) – The Dagster instance to use to execute the pipeline.
dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to
DAG
).
dag_description (Optional[str]) – The description to use for the compiled Airflow DAG
(passed through to DAG
)
dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow
DAG
constructor, including default_args
.
op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow operator.
The generated Airflow DAG, and a list of its constituent tasks.
dagster_airflow.
make_airflow_dag_containerized
(module_name, pipeline_name, image, run_config=None, mode=None, dag_id=None, dag_description=None, dag_kwargs=None, op_kwargs=None)[source]¶Construct a containerized Airflow DAG corresponding to a given Dagster pipeline.
Tasks in the resulting DAG will execute the Dagster logic they encapsulate using a subclass of
DockerOperator
. As a
consequence, both dagster, any Python dependencies required by your solid logic, and the module
containing your pipeline definition must be available in the container spun up by this operator.
Typically you’ll want to install these requirements onto the image you’re using.
This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.
module_name (str) – The name of the importable module in which the pipeline definition can be found.
pipeline_name (str) – The name of the pipeline definition.
image (str) – The name of the Docker image to use for execution (passed through to
DockerOperator
).
run_config (Optional[dict]) – The config, if any, with which to compile the pipeline to an execution plan, as a Python dict.
mode (Optional[str]) – The mode in which to execute the pipeline.
dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to
DAG
).
dag_description (Optional[str]) – The description to use for the compiled Airflow DAG
(passed through to DAG
)
dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow
DAG
constructor, including default_args
.
op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow
operator (a subclass of
DockerOperator
).
The generated Airflow DAG, and a list of its constituent tasks.
dagster_airflow.
make_dagster_pipeline_from_airflow_dag
(dag, tags=None, use_airflow_template_context=False, unique_id=None)[source]¶Construct a Dagster pipeline corresponding to a given Airflow DAG.
Tasks in the resulting pipeline will execute the execute()
method on the corresponding
Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module
containing your DAG definition must be available in the Python environment within which your
Dagster solids execute.
To set Airflow’s execution_date
for use with Airflow Operator’s execute()
methods,
either:
time (in UTC) of pipeline invocation:
execute_pipeline(
pipeline=make_dagster_pipeline_from_airflow_dag(dag=dag),
preset='default')
Add {'airflow_execution_date': utc_date_string}
to the PipelineDefinition tags. This will
override behavior from (1).
execute_pipeline( make_dagster_pipeline_from_airflow_dag( dag=dag, tags={'airflow_execution_date': utc_execution_date_str} ) )
{'airflow_execution_date': utc_date_string}
to the PipelineRun tags,such as in the Dagit UI. This will override behavior from (1) and (2)
We apply normalized_name() to the dag id and task ids when generating pipeline name and solid names to ensure that names conform to Dagster’s naming conventions.
dag (DAG) – The Airflow DAG to compile into a Dagster pipeline
tags (Dict[str, Field]) – Pipeline tags. Optionally include tags={‘airflow_execution_date’: utc_date_string} to specify execution_date used within execution of Airflow Operators.
use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)
unique_id (int) – If not None, this id will be postpended to generated solid names. Used by framework authors to enforce unique solid names within a repo.
The generated Dagster pipeline
pipeline_def (PipelineDefinition)
dagster_airflow.
make_dagster_repo_from_airflow_dags_path
(dag_path, repo_name, safe_mode=True, store_serialized_dags=False, use_airflow_template_context=False)[source]¶Construct a Dagster repository corresponding to Airflow DAGs in dag_path.
DagBag.get_dag()
dependency requires Airflow DB to be initialized.
Create make_dagster_repo.py
:
from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dags_path
def make_repo_from_dir():
return make_dagster_repo_from_airflow_dags_path(
'/path/to/dags/', 'my_repo_name'
)
Use RepositoryDefinition as usual, for example:
dagit -f path/to/make_dagster_repo.py -n make_repo_from_dir
dag_path (str) – Path to directory or file that contains Airflow Dags
repo_name (str) – Name for generated RepositoryDefinition
include_examples (bool) – True to include Airflow’s example DAGs. (default: False)
safe_mode (bool) – True to use Airflow’s default heuristic to find files that contain DAGs (ie find files that contain both b’DAG’ and b’airflow’) (default: True)
store_serialized_dags (bool) – True to read Airflow DAGS from Airflow DB. False to read DAGS from Python files. (default: False)
use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)
RepositoryDefinition
dagster_airflow.
make_dagster_repo_from_airflow_dag_bag
(dag_bag, repo_name, refresh_from_airflow_db=False, use_airflow_template_context=False)[source]¶Construct a Dagster repository corresponding to Airflow DAGs in DagBag.
from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dag_bag from airflow_home import my_dag_bag
return make_dagster_repo_from_airflow_dag_bag(my_dag_bag, ‘my_repo_name’)
dagit -f path/to/make_dagster_repo.py -n make_repo_from_dag_bag
dag_path (str) – Path to directory or file that contains Airflow Dags
repo_name (str) – Name for generated RepositoryDefinition
refresh_from_airflow_db (bool) – If True, will refresh DAG if expired via DagBag.get_dag(), which requires access to initialized Airflow DB. If False (recommended), gets dag from DagBag’s dags dict without depending on Airflow DB. (default: False)
use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)
RepositoryDefinition
dagster_airflow.
make_dagster_repo_from_airflow_example_dags
(repo_name='airflow_example_dags_repo')[source]¶Construct a Dagster repository for Airflow’s example DAGs.
‘example_external_task_marker_child’, ‘example_pig_operator’, ‘example_skip_dag’, ‘example_trigger_target_dag’, ‘example_xcom’, ‘test_utils’,
Usage:
- Create make_dagster_repo.py:
from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_example_dags
- def make_airflow_example_dags():
return make_dagster_repo_from_airflow_example_dags()
- Use RepositoryDefinition as usual, for example:
dagit -f path/to/make_dagster_repo.py -n make_airflow_example_dags
repo_name (str) – Name for generated RepositoryDefinition
RepositoryDefinition