import os
import pickle
from dagster import check
from dagster.config import Field
from dagster.config.source import StringSource
from dagster.core.definitions.event_metadata import EventMetadataEntry
from dagster.core.definitions.events import AssetKey, AssetMaterialization
from dagster.core.execution.context.input import InputContext
from dagster.core.execution.context.output import OutputContext
from dagster.core.storage.io_manager import IOManager, io_manager
from dagster.utils import PICKLE_PROTOCOL, mkdir_p
from dagster.utils.backcompat import experimental
[docs]@io_manager(config_schema={"base_dir": Field(StringSource, is_required=False)})
def fs_io_manager(init_context):
"""Built-in filesystem IO manager that stores and retrieves values using pickling.
Allows users to specify a base directory where all the step outputs will be stored. By
default, step outputs will be stored in the directory specified by local_artifact_storage in
your dagster.yaml file (which will be a temporary directory if not explicitly set).
Serializes and deserializes output values using pickling and automatically constructs
the filepaths for the assets.
Example usage:
1. Specify a pipeline-level IO manager using the reserved resource key ``"io_manager"``,
which will set the given IO manager on all solids across a pipeline.
.. code-block:: python
@solid
def solid_a(context, df):
return df
@solid
def solid_b(context, df):
return df[:5]
@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={
"io_manager": fs_io_manager.configured({"base_path": "/my/base/path"})
}
)
]
)
def pipe():
solid_b(solid_a())
2. Specify IO manager on :py:class:`OutputDefinition`, which allows the user to set
different IO managers on different step outputs.
.. code-block:: python
@solid(output_defs=[OutputDefinition(io_manager_key="my_io_manager")])
def solid_a(context, df):
return df
@solid
def solid_b(context, df):
return df[:5]
@pipeline(
mode_defs=[ModeDefinition(resource_defs={"my_io_manager": fs_io_manager})]
)
def pipe():
solid_b(solid_a())
"""
base_dir = init_context.resource_config.get(
"base_dir", init_context.instance.storage_directory()
)
return PickledObjectFilesystemIOManager(base_dir=base_dir)
class PickledObjectFilesystemIOManager(IOManager):
"""Built-in filesystem IO manager that stores and retrieves values using pickling.
Args:
base_dir (Optional[str]): base directory where all the step outputs which use this object
manager will be stored in.
"""
def __init__(self, base_dir=None):
self.base_dir = check.opt_str_param(base_dir, "base_dir")
self.write_mode = "wb"
self.read_mode = "rb"
def _get_path(self, context):
"""Automatically construct filepath."""
keys = context.get_run_scoped_output_identifier()
return os.path.join(self.base_dir, *keys)
def handle_output(self, context, obj):
"""Pickle the data and store the object to a file.
This method omits the AssetMaterialization event so assets generated by it won't be tracked
by the Asset Catalog.
"""
check.inst_param(context, "context", OutputContext)
filepath = self._get_path(context)
context.log.debug(f"Writing file at: {filepath}")
# Ensure path exists
mkdir_p(os.path.dirname(filepath))
with open(filepath, self.write_mode) as write_obj:
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
def load_input(self, context):
"""Unpickle the file and Load it to a data object."""
check.inst_param(context, "context", InputContext)
filepath = self._get_path(context.upstream_output)
context.log.debug(f"Loading file from: {filepath}")
with open(filepath, self.read_mode) as read_obj:
return pickle.load(read_obj)
class CustomPathPickledObjectFilesystemIOManager(IOManager):
"""Built-in filesystem IO managerthat stores and retrieves values using pickling and
allow users to specify file path for outputs.
Args:
base_dir (Optional[str]): base directory where all the step outputs which use this object
manager will be stored in.
"""
def __init__(self, base_dir=None):
self.base_dir = check.opt_str_param(base_dir, "base_dir")
self.write_mode = "wb"
self.read_mode = "rb"
def _get_path(self, path):
return os.path.join(self.base_dir, path)
def handle_output(self, context, obj):
"""Pickle the data and store the object to a custom file path.
This method emits an AssetMaterialization event so the assets will be tracked by the
Asset Catalog.
"""
check.inst_param(context, "context", OutputContext)
metadata = context.metadata
path = check.str_param(metadata.get("path"), "metadata.path")
filepath = self._get_path(path)
# Ensure path exists
mkdir_p(os.path.dirname(filepath))
context.log.debug(f"Writing file at: {filepath}")
with open(filepath, self.write_mode) as write_obj:
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
return AssetMaterialization(
asset_key=AssetKey([context.pipeline_name, context.step_key, context.name]),
metadata_entries=[EventMetadataEntry.fspath(os.path.abspath(filepath))],
)
def load_input(self, context):
"""Unpickle the file from a given file path and Load it to a data object."""
check.inst_param(context, "context", InputContext)
metadata = context.upstream_output.metadata
path = check.str_param(metadata.get("path"), "metadata.path")
filepath = self._get_path(path)
context.log.debug(f"Loading file from: {filepath}")
with open(filepath, self.read_mode) as read_obj:
return pickle.load(read_obj)
[docs]@io_manager(config_schema={"base_dir": Field(StringSource, is_required=True)})
@experimental
def custom_path_fs_io_manager(init_context):
"""Built-in IO manager that allows users to custom output file path per output definition.
It requires users to specify a base directory where all the step output will be stored in. It
serializes and deserializes output values (assets) using pickling and stores the pickled object
in the user-provided file paths.
Example usage:
.. code-block:: python
@solid(
output_defs=[
OutputDefinition(
io_manager_key="io_manager", metadata={"path": "path/to/sample_output"}
)
]
)
def sample_data(context, df):
return df[:5]
my_custom_path_fs_io_manager = custom_path_fs_io_manager.configured(
{"base_dir": "path/to/basedir"}
)
@pipeline(
mode_defs=[ModeDefinition(resource_defs={"io_manager": my_custom_path_fs_io_manager})],
)
def pipe():
sample_data()
"""
return CustomPathPickledObjectFilesystemIOManager(
base_dir=init_context.resource_config.get("base_dir")
)