Module dynamicio.config.pydantic.io_resources
This module contains pylint models for physical data sources (places the bytes are being read from).
Expand source code
# pylint: disable=no-member, no-self-argument, unused-argument
"""This module contains pylint models for physical data sources (places the bytes are being read from)."""
import enum
import posixpath
from typing import Mapping, Optional, Union
import pydantic
from pydantic import BaseModel, model_validator
import dynamicio.config.pydantic.table_schema as table_spec
@enum.unique
class DataBackendType(str, enum.Enum):
    """Input file types."""
    # pylint: disable=invalid-name
    local = "local"
    local_batch = "local_batch"
    s3 = "s3"  # is there a difference between 's3' and 's3_file' ?
    s3_file = "s3_file"
    s3_path_prefix = "s3_path_prefix"
    postgres = "postgres"
    athena = "athena"
    kafka = "kafka"
@enum.unique
class FileType(str, enum.Enum):
    """List of supported file formats."""
    # pylint: disable=invalid-name
    parquet = "parquet"
    csv = "csv"
    json = "json"
    hdf = "hdf"
class IOBinding(BaseModel):
    """A binding for a single i/o object."""
    name: str = pydantic.Field(alias="__binding_name__")
    environments: Mapping[
        str,
        Union["IOEnvironment", "LocalDataEnvironment", "LocalBatchDataEnvironment", "S3DataEnvironment", "S3PathPrefixEnvironment", "KafkaDataEnvironment", "PostgresDataEnvironment"],
    ]
    dynamicio_schema: Union[table_spec.DataframeSchema, None] = pydantic.Field(default=None, alias="schema")
    def get_binding_for_environment(self, environment: str) -> "IOEnvironment":
        """Fetch the IOEnvironment spec for the name provided."""
        return self.environments[environment]
    @pydantic.validator("environments", pre=True, always=True)
    def pick_correct_env_cls(cls, info):
        """This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`."""
        if not isinstance(info, Mapping):
            raise ValueError(f"Environments input should be a dict. Got {info!r} instead.")
        config_cls_overrides = {
            DataBackendType.local: LocalDataEnvironment,
            DataBackendType.local_batch: LocalBatchDataEnvironment,
            DataBackendType.s3: S3DataEnvironment,
            DataBackendType.s3_file: S3DataEnvironment,
            DataBackendType.s3_path_prefix: S3PathPrefixEnvironment,
            DataBackendType.kafka: KafkaDataEnvironment,
            DataBackendType.postgres: PostgresDataEnvironment,
        }
        out_dict = {}
        for (env_name, env_data) in info.items():
            base_obj: IOEnvironment = IOEnvironment(**env_data)
            override_cls = config_cls_overrides.get(base_obj.data_backend_type)
            if override_cls:
                use_obj = override_cls(**env_data)
            else:
                use_obj = base_obj
            out_dict[env_name] = use_obj
        return out_dict
    @pydantic.root_validator(pre=True)
    def _preprocess_raw_config(cls, values):
        if not isinstance(values, Mapping):
            raise ValueError(f"IOBinding must be a dict at the top level. (got {values!r} instead)")
        remapped_value = {"environments": {}}
        for (key, value) in values.items():
            if key in ("__binding_name__", "schema"):
                # Passthrough params
                remapped_value[key] = value
            else:
                # Assuming an environment config
                remapped_value["environments"][key] = value
        return remapped_value
class IOEnvironment(BaseModel):
    """A section specifiing an data source backed by a particular data backend."""
    _parent: Optional[IOBinding] = None  # noqa: F821
    options: Mapping = pydantic.Field(default_factory=dict)
    data_backend_type: DataBackendType = pydantic.Field(alias="type", const=None)
    class Config:
        """Additional pydantic configuration for the model."""
        underscore_attrs_are_private = True
    @property
    def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]:
        """Returns tabular data structure definition for the data source (if available)."""
        if not self._parent:
            raise Exception("Parent field is not set.")
        return self._parent.dynamicio_schema
    def set_parent(self, parent: IOBinding):  # noqa: F821
        """Helper method to set parent config object."""
        assert self._parent is None
        self._parent = parent
class LocalDataSubSection(BaseModel):
    """Config section for local data provider."""
    file_path: str
    file_type: FileType
class LocalDataEnvironment(IOEnvironment):
    """The data is provided by local storage."""
    local: LocalDataSubSection
class LocalBatchDataSubSection(BaseModel):
    """Config section for local batch data (multiple input files)."""
    path_prefix: Optional[str] = None
    dynamic_file_path: Optional[str] = None
    file_type: FileType
    @model_validator(mode="before")
    def check_path_fields(cls, values):
        """Check that only one of path_prefix or dynamic_file_path is provided."""
        if not values.get("path_prefix") and not values.get("dynamic_file_path"):
            raise ValueError("Either path_prefix or dynamic_file_path must be provided")
        if values.get("path_prefix") and values.get("dynamic_file_path"):
            raise ValueError("Only one of path_prefix or dynamic_file_path should be provided")
        return values
class LocalBatchDataEnvironment(IOEnvironment):
    """Parent section for local batch (multiple files) config."""
    local: LocalBatchDataSubSection
class S3DataSubSection(BaseModel):
    """Config section for S3 data source."""
    file_path: str
    file_type: FileType
    bucket: str
class S3DataEnvironment(IOEnvironment):
    """Parent section for s3 data source config."""
    s3: S3DataSubSection
class S3PathPrefixSubSection(BaseModel):
    """Config section for s3 prefix data source (multiple s3 objects)."""
    path_prefix: str
    file_type: FileType
    bucket: str
    @pydantic.root_validator(pre=True)
    def support_legacy_config_path_prefix(cls, values):
        """This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field.
        E.g.
            bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf"
        """
        bucket = values.get("bucket")
        path_prefix = values.get("path_prefix")
        if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix):
            (new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1)
            values.update(
                {
                    "bucket": new_bucket,
                    "path_prefix": new_path_prefix,
                }
            )
        return values
class S3PathPrefixEnvironment(IOEnvironment):
    """Parent section for the multi-object s3 data source."""
    s3: S3PathPrefixSubSection
class KafkaDataSubSection(BaseModel):
    """Kafka configuration section."""
    kafka_server: str
    kafka_topic: str
class KafkaDataEnvironment(IOEnvironment):
    """Parent section for kafka data source config."""
    kafka: KafkaDataSubSection
class PostgresDataSubSection(BaseModel):
    """Postgres data source configuration."""
    db_host: str
    db_port: str
    db_name: str
    db_user: str
    db_password: str
class PostgresDataEnvironment(IOEnvironment):
    """Parent section for postgres data source."""
    postgres: PostgresDataSubSection
IOBinding.model_rebuild()Classes
- class DataBackendType (value, names=None, *, module=None, qualname=None, type=None, start=1)
- 
Input file types. Expand source codeclass DataBackendType(str, enum.Enum): """Input file types.""" # pylint: disable=invalid-name local = "local" local_batch = "local_batch" s3 = "s3" # is there a difference between 's3' and 's3_file' ? s3_file = "s3_file" s3_path_prefix = "s3_path_prefix" postgres = "postgres" athena = "athena" kafka = "kafka"Ancestors- builtins.str
- enum.Enum
 Class variables- var athena
- var kafka
- var local
- var local_batch
- var postgres
- var s3
- var s3_file
- var s3_path_prefix
 
- class FileType (value, names=None, *, module=None, qualname=None, type=None, start=1)
- 
List of supported file formats. Expand source codeclass FileType(str, enum.Enum): """List of supported file formats.""" # pylint: disable=invalid-name parquet = "parquet" csv = "csv" json = "json" hdf = "hdf"Ancestors- builtins.str
- enum.Enum
 Class variables- var csv
- var hdf
- var json
- var parquet
 
- class IOBinding (**data: Any)
- 
A binding for a single i/o object. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass IOBinding(BaseModel): """A binding for a single i/o object.""" name: str = pydantic.Field(alias="__binding_name__") environments: Mapping[ str, Union["IOEnvironment", "LocalDataEnvironment", "LocalBatchDataEnvironment", "S3DataEnvironment", "S3PathPrefixEnvironment", "KafkaDataEnvironment", "PostgresDataEnvironment"], ] dynamicio_schema: Union[table_spec.DataframeSchema, None] = pydantic.Field(default=None, alias="schema") def get_binding_for_environment(self, environment: str) -> "IOEnvironment": """Fetch the IOEnvironment spec for the name provided.""" return self.environments[environment] @pydantic.validator("environments", pre=True, always=True) def pick_correct_env_cls(cls, info): """This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`.""" if not isinstance(info, Mapping): raise ValueError(f"Environments input should be a dict. Got {info!r} instead.") config_cls_overrides = { DataBackendType.local: LocalDataEnvironment, DataBackendType.local_batch: LocalBatchDataEnvironment, DataBackendType.s3: S3DataEnvironment, DataBackendType.s3_file: S3DataEnvironment, DataBackendType.s3_path_prefix: S3PathPrefixEnvironment, DataBackendType.kafka: KafkaDataEnvironment, DataBackendType.postgres: PostgresDataEnvironment, } out_dict = {} for (env_name, env_data) in info.items(): base_obj: IOEnvironment = IOEnvironment(**env_data) override_cls = config_cls_overrides.get(base_obj.data_backend_type) if override_cls: use_obj = override_cls(**env_data) else: use_obj = base_obj out_dict[env_name] = use_obj return out_dict @pydantic.root_validator(pre=True) def _preprocess_raw_config(cls, values): if not isinstance(values, Mapping): raise ValueError(f"IOBinding must be a dict at the top level. (got {values!r} instead)") remapped_value = {"environments": {}} for (key, value) in values.items(): if key in ("__binding_name__", "schema"): # Passthrough params remapped_value[key] = value else: # Assuming an environment config remapped_value["environments"][key] = value return remapped_valueAncestors- pydantic.main.BaseModel
 Class variables- var dynamicio_schema : Optional[DataframeSchema]
- var environments : Mapping[str, Union[IOEnvironment, LocalDataEnvironment, LocalBatchDataEnvironment, S3DataEnvironment, S3PathPrefixEnvironment, KafkaDataEnvironment, PostgresDataEnvironment]]
- var model_computed_fields
- var model_config
- var model_fields
- var name : str
 Static methods- def pick_correct_env_cls(info)
- 
This pre-validator picks an appropriate IOEnvironment subclass for the data_backend_type.Expand source code@pydantic.validator("environments", pre=True, always=True) def pick_correct_env_cls(cls, info): """This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`.""" if not isinstance(info, Mapping): raise ValueError(f"Environments input should be a dict. Got {info!r} instead.") config_cls_overrides = { DataBackendType.local: LocalDataEnvironment, DataBackendType.local_batch: LocalBatchDataEnvironment, DataBackendType.s3: S3DataEnvironment, DataBackendType.s3_file: S3DataEnvironment, DataBackendType.s3_path_prefix: S3PathPrefixEnvironment, DataBackendType.kafka: KafkaDataEnvironment, DataBackendType.postgres: PostgresDataEnvironment, } out_dict = {} for (env_name, env_data) in info.items(): base_obj: IOEnvironment = IOEnvironment(**env_data) override_cls = config_cls_overrides.get(base_obj.data_backend_type) if override_cls: use_obj = override_cls(**env_data) else: use_obj = base_obj out_dict[env_name] = use_obj return out_dict
 Methods- def get_binding_for_environment(self, environment: str) ‑> IOEnvironment
- 
Fetch the IOEnvironment spec for the name provided. Expand source codedef get_binding_for_environment(self, environment: str) -> "IOEnvironment": """Fetch the IOEnvironment spec for the name provided.""" return self.environments[environment]
 
- class IOEnvironment (**data: Any)
- 
A section specifiing an data source backed by a particular data backend. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass IOEnvironment(BaseModel): """A section specifiing an data source backed by a particular data backend.""" _parent: Optional[IOBinding] = None # noqa: F821 options: Mapping = pydantic.Field(default_factory=dict) data_backend_type: DataBackendType = pydantic.Field(alias="type", const=None) class Config: """Additional pydantic configuration for the model.""" underscore_attrs_are_private = True @property def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]: """Returns tabular data structure definition for the data source (if available).""" if not self._parent: raise Exception("Parent field is not set.") return self._parent.dynamicio_schema def set_parent(self, parent: IOBinding): # noqa: F821 """Helper method to set parent config object.""" assert self._parent is None self._parent = parentAncestors- pydantic.main.BaseModel
 Subclasses- KafkaDataEnvironment
- LocalBatchDataEnvironment
- LocalDataEnvironment
- PostgresDataEnvironment
- S3DataEnvironment
- S3PathPrefixEnvironment
 Class variables- var Config
- 
Additional pydantic configuration for the model. 
- var data_backend_type : DataBackendType
- var model_computed_fields
- var model_config
- var model_fields
- var options : Mapping[~KT, +VT_co]
 Instance variables- var dynamicio_schema : Optional[DataframeSchema]
- 
Returns tabular data structure definition for the data source (if available). Expand source code@property def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]: """Returns tabular data structure definition for the data source (if available).""" if not self._parent: raise Exception("Parent field is not set.") return self._parent.dynamicio_schema
 Methods- def model_post_init(self: BaseModel, __context: Any) ‑> None
- 
This function is meant to behave like a BaseModel method to initialise private attributes. It takes context as an argument since that's what pydantic-core passes when calling it. Args- self
- The BaseModel instance.
- __context
- The context.
 Expand source codedef init_private_attributes(self: BaseModel, __context: Any) -> None: """This function is meant to behave like a BaseModel method to initialise private attributes. It takes context as an argument since that's what pydantic-core passes when calling it. Args: self: The BaseModel instance. __context: The context. """ if getattr(self, '__pydantic_private__', None) is None: pydantic_private = {} for name, private_attr in self.__private_attributes__.items(): default = private_attr.get_default() if default is not PydanticUndefined: pydantic_private[name] = default object_setattr(self, '__pydantic_private__', pydantic_private)
- def set_parent(self, parent: IOBinding)
- 
Helper method to set parent config object. Expand source codedef set_parent(self, parent: IOBinding): # noqa: F821 """Helper method to set parent config object.""" assert self._parent is None self._parent = parent
 
- class KafkaDataEnvironment (**data: Any)
- 
Parent section for kafka data source config. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass KafkaDataEnvironment(IOEnvironment): """Parent section for kafka data source config.""" kafka: KafkaDataSubSectionAncestors- IOEnvironment
- pydantic.main.BaseModel
 Class variables- var kafka : KafkaDataSubSection
- var model_computed_fields
- var model_config
- var model_fields
 Inherited members
- class KafkaDataSubSection (**data: Any)
- 
Kafka configuration section. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass KafkaDataSubSection(BaseModel): """Kafka configuration section.""" kafka_server: str kafka_topic: strAncestors- pydantic.main.BaseModel
 Class variables- var kafka_server : str
- var kafka_topic : str
- var model_computed_fields
- var model_config
- var model_fields
 
- class LocalBatchDataEnvironment (**data: Any)
- 
Parent section for local batch (multiple files) config. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass LocalBatchDataEnvironment(IOEnvironment): """Parent section for local batch (multiple files) config.""" local: LocalBatchDataSubSectionAncestors- IOEnvironment
- pydantic.main.BaseModel
 Class variables- var local : LocalBatchDataSubSection
- var model_computed_fields
- var model_config
- var model_fields
 Inherited members
- class LocalBatchDataSubSection (**data: Any)
- 
Config section for local batch data (multiple input files). Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass LocalBatchDataSubSection(BaseModel): """Config section for local batch data (multiple input files).""" path_prefix: Optional[str] = None dynamic_file_path: Optional[str] = None file_type: FileType @model_validator(mode="before") def check_path_fields(cls, values): """Check that only one of path_prefix or dynamic_file_path is provided.""" if not values.get("path_prefix") and not values.get("dynamic_file_path"): raise ValueError("Either path_prefix or dynamic_file_path must be provided") if values.get("path_prefix") and values.get("dynamic_file_path"): raise ValueError("Only one of path_prefix or dynamic_file_path should be provided") return valuesAncestors- pydantic.main.BaseModel
 Class variables- var dynamic_file_path : Optional[str]
- var file_type : FileType
- var model_computed_fields
- var model_config
- var model_fields
- var path_prefix : Optional[str]
 Static methods- def check_path_fields(values)
- 
Check that only one of path_prefix or dynamic_file_path is provided. Expand source code@model_validator(mode="before") def check_path_fields(cls, values): """Check that only one of path_prefix or dynamic_file_path is provided.""" if not values.get("path_prefix") and not values.get("dynamic_file_path"): raise ValueError("Either path_prefix or dynamic_file_path must be provided") if values.get("path_prefix") and values.get("dynamic_file_path"): raise ValueError("Only one of path_prefix or dynamic_file_path should be provided") return values
 
- class LocalDataEnvironment (**data: Any)
- 
The data is provided by local storage. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass LocalDataEnvironment(IOEnvironment): """The data is provided by local storage.""" local: LocalDataSubSectionAncestors- IOEnvironment
- pydantic.main.BaseModel
 Class variables- var local : LocalDataSubSection
- var model_computed_fields
- var model_config
- var model_fields
 Inherited members
- class LocalDataSubSection (**data: Any)
- 
Config section for local data provider. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass LocalDataSubSection(BaseModel): """Config section for local data provider.""" file_path: str file_type: FileTypeAncestors- pydantic.main.BaseModel
 Class variables- var file_path : str
- var file_type : FileType
- var model_computed_fields
- var model_config
- var model_fields
 
- class PostgresDataEnvironment (**data: Any)
- 
Parent section for postgres data source. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass PostgresDataEnvironment(IOEnvironment): """Parent section for postgres data source.""" postgres: PostgresDataSubSectionAncestors- IOEnvironment
- pydantic.main.BaseModel
 Class variables- var model_computed_fields
- var model_config
- var model_fields
- var postgres : PostgresDataSubSection
 Inherited members
- class PostgresDataSubSection (**data: Any)
- 
Postgres data source configuration. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass PostgresDataSubSection(BaseModel): """Postgres data source configuration.""" db_host: str db_port: str db_name: str db_user: str db_password: strAncestors- pydantic.main.BaseModel
 Class variables- var db_host : str
- var db_name : str
- var db_password : str
- var db_port : str
- var db_user : str
- var model_computed_fields
- var model_config
- var model_fields
 
- class S3DataEnvironment (**data: Any)
- 
Parent section for s3 data source config. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass S3DataEnvironment(IOEnvironment): """Parent section for s3 data source config.""" s3: S3DataSubSectionAncestors- IOEnvironment
- pydantic.main.BaseModel
 Class variables- var model_computed_fields
- var model_config
- var model_fields
- var s3 : S3DataSubSection
 Inherited members
- class S3DataSubSection (**data: Any)
- 
Config section for S3 data source. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass S3DataSubSection(BaseModel): """Config section for S3 data source.""" file_path: str file_type: FileType bucket: strAncestors- pydantic.main.BaseModel
 Class variables- var bucket : str
- var file_path : str
- var file_type : FileType
- var model_computed_fields
- var model_config
- var model_fields
 
- class S3PathPrefixEnvironment (**data: Any)
- 
Parent section for the multi-object s3 data source. Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass S3PathPrefixEnvironment(IOEnvironment): """Parent section for the multi-object s3 data source.""" s3: S3PathPrefixSubSectionAncestors- IOEnvironment
- pydantic.main.BaseModel
 Class variables- var model_computed_fields
- var model_config
- var model_fields
- var s3 : S3PathPrefixSubSection
 Inherited members
- class S3PathPrefixSubSection (**data: Any)
- 
Config section for s3 prefix data source (multiple s3 objects). Create a new model by parsing and validating input data from keyword arguments. Raises [ ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Expand source codeclass S3PathPrefixSubSection(BaseModel): """Config section for s3 prefix data source (multiple s3 objects).""" path_prefix: str file_type: FileType bucket: str @pydantic.root_validator(pre=True) def support_legacy_config_path_prefix(cls, values): """This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field. E.g. bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf" """ bucket = values.get("bucket") path_prefix = values.get("path_prefix") if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix): (new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1) values.update( { "bucket": new_bucket, "path_prefix": new_path_prefix, } ) return valuesAncestors- pydantic.main.BaseModel
 Class variables- var bucket : str
- var file_type : FileType
- var model_computed_fields
- var model_config
- var model_fields
- var path_prefix : str
 Static methods- def support_legacy_config_path_prefix(values)
- 
This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field. E.g. bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf" Expand source code@pydantic.root_validator(pre=True) def support_legacy_config_path_prefix(cls, values): """This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field. E.g. bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf" """ bucket = values.get("bucket") path_prefix = values.get("path_prefix") if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix): (new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1) values.update( { "bucket": new_bucket, "path_prefix": new_path_prefix, } ) return values