Module dynamicio.config.pydantic.io_resources

This module contains pylint models for physical data sources (places the bytes are being read from).

Expand source code
# pylint: disable=no-member, no-self-argument, unused-argument

"""This module contains pylint models for physical data sources (places the bytes are being read from)."""

import enum
import posixpath
from typing import Mapping, Optional, Union

import pydantic
from pydantic import BaseModel, model_validator

import dynamicio.config.pydantic.table_schema as table_spec

class DataBackendType(str, enum.Enum):
    """Input file types."""

    # pylint: disable=invalid-name
    local = "local"
    local_batch = "local_batch"
    s3 = "s3"  # is there a difference between 's3' and 's3_file' ?
    s3_file = "s3_file"
    s3_path_prefix = "s3_path_prefix"
    postgres = "postgres"
    athena = "athena"
    kafka = "kafka"

class FileType(str, enum.Enum):
    """List of supported file formats."""

    # pylint: disable=invalid-name
    parquet = "parquet"
    csv = "csv"
    json = "json"
    hdf = "hdf"

class IOBinding(BaseModel):
    """A binding for a single i/o object."""

    name: str = pydantic.Field(alias="__binding_name__")
    environments: Mapping[
        Union["IOEnvironment", "LocalDataEnvironment", "LocalBatchDataEnvironment", "S3DataEnvironment", "S3PathPrefixEnvironment", "KafkaDataEnvironment", "PostgresDataEnvironment"],
    dynamicio_schema: Union[table_spec.DataframeSchema, None] = pydantic.Field(default=None, alias="schema")

    def get_binding_for_environment(self, environment: str) -> "IOEnvironment":
        """Fetch the IOEnvironment spec for the name provided."""
        return self.environments[environment]

    @pydantic.validator("environments", pre=True, always=True)
    def pick_correct_env_cls(cls, info):
        """This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`."""
        if not isinstance(info, Mapping):
            raise ValueError(f"Environments input should be a dict. Got {info!r} instead.")
        config_cls_overrides = {
            DataBackendType.local: LocalDataEnvironment,
            DataBackendType.local_batch: LocalBatchDataEnvironment,
            DataBackendType.s3: S3DataEnvironment,
            DataBackendType.s3_file: S3DataEnvironment,
            DataBackendType.s3_path_prefix: S3PathPrefixEnvironment,
            DataBackendType.kafka: KafkaDataEnvironment,
            DataBackendType.postgres: PostgresDataEnvironment,
        out_dict = {}
        for (env_name, env_data) in info.items():
            base_obj: IOEnvironment = IOEnvironment(**env_data)
            override_cls = config_cls_overrides.get(base_obj.data_backend_type)
            if override_cls:
                use_obj = override_cls(**env_data)
                use_obj = base_obj
            out_dict[env_name] = use_obj
        return out_dict

    def _preprocess_raw_config(cls, values):
        if not isinstance(values, Mapping):
            raise ValueError(f"IOBinding must be a dict at the top level. (got {values!r} instead)")
        remapped_value = {"environments": {}}
        for (key, value) in values.items():
            if key in ("__binding_name__", "schema"):
                # Passthrough params
                remapped_value[key] = value
                # Assuming an environment config
                remapped_value["environments"][key] = value
        return remapped_value

class IOEnvironment(BaseModel):
    """A section specifiing an data source backed by a particular data backend."""

    _parent: Optional[IOBinding] = None  # noqa: F821
    options: Mapping = pydantic.Field(default_factory=dict)
    data_backend_type: DataBackendType = pydantic.Field(alias="type", const=None)

    class Config:
        """Additional pydantic configuration for the model."""

        underscore_attrs_are_private = True

    def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]:
        """Returns tabular data structure definition for the data source (if available)."""
        if not self._parent:
            raise Exception("Parent field is not set.")
        return self._parent.dynamicio_schema

    def set_parent(self, parent: IOBinding):  # noqa: F821
        """Helper method to set parent config object."""
        assert self._parent is None
        self._parent = parent

class LocalDataSubSection(BaseModel):
    """Config section for local data provider."""

    file_path: str
    file_type: FileType

class LocalDataEnvironment(IOEnvironment):
    """The data is provided by local storage."""

    local: LocalDataSubSection

class LocalBatchDataSubSection(BaseModel):
    """Config section for local batch data (multiple input files)."""

    path_prefix: Optional[str] = None
    dynamic_file_path: Optional[str] = None
    file_type: FileType

    def check_path_fields(cls, values):
        """Check that only one of path_prefix or dynamic_file_path is provided."""
        if not values.get("path_prefix") and not values.get("dynamic_file_path"):
            raise ValueError("Either path_prefix or dynamic_file_path must be provided")
        if values.get("path_prefix") and values.get("dynamic_file_path"):
            raise ValueError("Only one of path_prefix or dynamic_file_path should be provided")
        return values

class LocalBatchDataEnvironment(IOEnvironment):
    """Parent section for local batch (multiple files) config."""

    local: LocalBatchDataSubSection

class S3DataSubSection(BaseModel):
    """Config section for S3 data source."""

    file_path: str
    file_type: FileType
    bucket: str

class S3DataEnvironment(IOEnvironment):
    """Parent section for s3 data source config."""

    s3: S3DataSubSection

class S3PathPrefixSubSection(BaseModel):
    """Config section for s3 prefix data source (multiple s3 objects)."""

    path_prefix: str
    file_type: FileType
    bucket: str

    def support_legacy_config_path_prefix(cls, values):
        """This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field.

            bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf"
        bucket = values.get("bucket")
        path_prefix = values.get("path_prefix")
        if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix):
            (new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1)
                    "bucket": new_bucket,
                    "path_prefix": new_path_prefix,
        return values

class S3PathPrefixEnvironment(IOEnvironment):
    """Parent section for the multi-object s3 data source."""

    s3: S3PathPrefixSubSection

class KafkaDataSubSection(BaseModel):
    """Kafka configuration section."""

    kafka_server: str
    kafka_topic: str

class KafkaDataEnvironment(IOEnvironment):
    """Parent section for kafka data source config."""

    kafka: KafkaDataSubSection

class PostgresDataSubSection(BaseModel):
    """Postgres data source configuration."""

    db_host: str
    db_port: str
    db_name: str
    db_user: str
    db_password: str

class PostgresDataEnvironment(IOEnvironment):
    """Parent section for postgres data source."""

    postgres: PostgresDataSubSection



class DataBackendType (value, names=None, *, module=None, qualname=None, type=None, start=1)

Input file types.

Expand source code
class DataBackendType(str, enum.Enum):
    """Input file types."""

    # pylint: disable=invalid-name
    local = "local"
    local_batch = "local_batch"
    s3 = "s3"  # is there a difference between 's3' and 's3_file' ?
    s3_file = "s3_file"
    s3_path_prefix = "s3_path_prefix"
    postgres = "postgres"
    athena = "athena"
    kafka = "kafka"


  • builtins.str
  • enum.Enum

Class variables

var athena
var kafka
var local
var local_batch
var postgres
var s3
var s3_file
var s3_path_prefix
class FileType (value, names=None, *, module=None, qualname=None, type=None, start=1)

List of supported file formats.

Expand source code
class FileType(str, enum.Enum):
    """List of supported file formats."""

    # pylint: disable=invalid-name
    parquet = "parquet"
    csv = "csv"
    json = "json"
    hdf = "hdf"


  • builtins.str
  • enum.Enum

Class variables

var csv
var hdf
var json
var parquet
class IOBinding (**data: Any)

A binding for a single i/o object.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class IOBinding(BaseModel):
    """A binding for a single i/o object."""

    name: str = pydantic.Field(alias="__binding_name__")
    environments: Mapping[
        Union["IOEnvironment", "LocalDataEnvironment", "LocalBatchDataEnvironment", "S3DataEnvironment", "S3PathPrefixEnvironment", "KafkaDataEnvironment", "PostgresDataEnvironment"],
    dynamicio_schema: Union[table_spec.DataframeSchema, None] = pydantic.Field(default=None, alias="schema")

    def get_binding_for_environment(self, environment: str) -> "IOEnvironment":
        """Fetch the IOEnvironment spec for the name provided."""
        return self.environments[environment]

    @pydantic.validator("environments", pre=True, always=True)
    def pick_correct_env_cls(cls, info):
        """This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`."""
        if not isinstance(info, Mapping):
            raise ValueError(f"Environments input should be a dict. Got {info!r} instead.")
        config_cls_overrides = {
            DataBackendType.local: LocalDataEnvironment,
            DataBackendType.local_batch: LocalBatchDataEnvironment,
            DataBackendType.s3: S3DataEnvironment,
            DataBackendType.s3_file: S3DataEnvironment,
            DataBackendType.s3_path_prefix: S3PathPrefixEnvironment,
            DataBackendType.kafka: KafkaDataEnvironment,
            DataBackendType.postgres: PostgresDataEnvironment,
        out_dict = {}
        for (env_name, env_data) in info.items():
            base_obj: IOEnvironment = IOEnvironment(**env_data)
            override_cls = config_cls_overrides.get(base_obj.data_backend_type)
            if override_cls:
                use_obj = override_cls(**env_data)
                use_obj = base_obj
            out_dict[env_name] = use_obj
        return out_dict

    def _preprocess_raw_config(cls, values):
        if not isinstance(values, Mapping):
            raise ValueError(f"IOBinding must be a dict at the top level. (got {values!r} instead)")
        remapped_value = {"environments": {}}
        for (key, value) in values.items():
            if key in ("__binding_name__", "schema"):
                # Passthrough params
                remapped_value[key] = value
                # Assuming an environment config
                remapped_value["environments"][key] = value
        return remapped_value


  • pydantic.main.BaseModel

Class variables

var dynamicio_schema : Optional[DataframeSchema]
var environments : Mapping[str, Union[IOEnvironmentLocalDataEnvironmentLocalBatchDataEnvironmentS3DataEnvironmentS3PathPrefixEnvironmentKafkaDataEnvironmentPostgresDataEnvironment]]
var model_computed_fields
var model_config
var model_fields
var name : str

Static methods

def pick_correct_env_cls(info)

This pre-validator picks an appropriate IOEnvironment subclass for the data_backend_type.

Expand source code
@pydantic.validator("environments", pre=True, always=True)
def pick_correct_env_cls(cls, info):
    """This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`."""
    if not isinstance(info, Mapping):
        raise ValueError(f"Environments input should be a dict. Got {info!r} instead.")
    config_cls_overrides = {
        DataBackendType.local: LocalDataEnvironment,
        DataBackendType.local_batch: LocalBatchDataEnvironment,
        DataBackendType.s3: S3DataEnvironment,
        DataBackendType.s3_file: S3DataEnvironment,
        DataBackendType.s3_path_prefix: S3PathPrefixEnvironment,
        DataBackendType.kafka: KafkaDataEnvironment,
        DataBackendType.postgres: PostgresDataEnvironment,
    out_dict = {}
    for (env_name, env_data) in info.items():
        base_obj: IOEnvironment = IOEnvironment(**env_data)
        override_cls = config_cls_overrides.get(base_obj.data_backend_type)
        if override_cls:
            use_obj = override_cls(**env_data)
            use_obj = base_obj
        out_dict[env_name] = use_obj
    return out_dict


def get_binding_for_environment(self, environment: str) ‑> IOEnvironment

Fetch the IOEnvironment spec for the name provided.

Expand source code
def get_binding_for_environment(self, environment: str) -> "IOEnvironment":
    """Fetch the IOEnvironment spec for the name provided."""
    return self.environments[environment]
class IOEnvironment (**data: Any)

A section specifiing an data source backed by a particular data backend.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class IOEnvironment(BaseModel):
    """A section specifiing an data source backed by a particular data backend."""

    _parent: Optional[IOBinding] = None  # noqa: F821
    options: Mapping = pydantic.Field(default_factory=dict)
    data_backend_type: DataBackendType = pydantic.Field(alias="type", const=None)

    class Config:
        """Additional pydantic configuration for the model."""

        underscore_attrs_are_private = True

    def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]:
        """Returns tabular data structure definition for the data source (if available)."""
        if not self._parent:
            raise Exception("Parent field is not set.")
        return self._parent.dynamicio_schema

    def set_parent(self, parent: IOBinding):  # noqa: F821
        """Helper method to set parent config object."""
        assert self._parent is None
        self._parent = parent


  • pydantic.main.BaseModel


Class variables

var Config

Additional pydantic configuration for the model.

var data_backend_typeDataBackendType
var model_computed_fields
var model_config
var model_fields
var options : Mapping[~KT, +VT_co]

Instance variables

var dynamicio_schema : Optional[DataframeSchema]

Returns tabular data structure definition for the data source (if available).

Expand source code
def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]:
    """Returns tabular data structure definition for the data source (if available)."""
    if not self._parent:
        raise Exception("Parent field is not set.")
    return self._parent.dynamicio_schema


def model_post_init(self: BaseModel, __context: Any) ‑> None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.


The BaseModel instance.
The context.
Expand source code
def init_private_attributes(self: BaseModel, __context: Any) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.

    It takes context as an argument since that's what pydantic-core passes when calling it.

        self: The BaseModel instance.
        __context: The context.
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
def set_parent(self, parent: IOBinding)

Helper method to set parent config object.

Expand source code
def set_parent(self, parent: IOBinding):  # noqa: F821
    """Helper method to set parent config object."""
    assert self._parent is None
    self._parent = parent
class KafkaDataEnvironment (**data: Any)

Parent section for kafka data source config.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class KafkaDataEnvironment(IOEnvironment):
    """Parent section for kafka data source config."""

    kafka: KafkaDataSubSection


Class variables

var kafkaKafkaDataSubSection
var model_computed_fields
var model_config
var model_fields

Inherited members

class KafkaDataSubSection (**data: Any)

Kafka configuration section.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class KafkaDataSubSection(BaseModel):
    """Kafka configuration section."""

    kafka_server: str
    kafka_topic: str


  • pydantic.main.BaseModel

Class variables

var kafka_server : str
var kafka_topic : str
var model_computed_fields
var model_config
var model_fields
class LocalBatchDataEnvironment (**data: Any)

Parent section for local batch (multiple files) config.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class LocalBatchDataEnvironment(IOEnvironment):
    """Parent section for local batch (multiple files) config."""

    local: LocalBatchDataSubSection


Class variables

var localLocalBatchDataSubSection
var model_computed_fields
var model_config
var model_fields

Inherited members

class LocalBatchDataSubSection (**data: Any)

Config section for local batch data (multiple input files).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class LocalBatchDataSubSection(BaseModel):
    """Config section for local batch data (multiple input files)."""

    path_prefix: Optional[str] = None
    dynamic_file_path: Optional[str] = None
    file_type: FileType

    def check_path_fields(cls, values):
        """Check that only one of path_prefix or dynamic_file_path is provided."""
        if not values.get("path_prefix") and not values.get("dynamic_file_path"):
            raise ValueError("Either path_prefix or dynamic_file_path must be provided")
        if values.get("path_prefix") and values.get("dynamic_file_path"):
            raise ValueError("Only one of path_prefix or dynamic_file_path should be provided")
        return values


  • pydantic.main.BaseModel

Class variables

var dynamic_file_path : Optional[str]
var file_typeFileType
var model_computed_fields
var model_config
var model_fields
var path_prefix : Optional[str]

Static methods

def check_path_fields(values)

Check that only one of path_prefix or dynamic_file_path is provided.

Expand source code
def check_path_fields(cls, values):
    """Check that only one of path_prefix or dynamic_file_path is provided."""
    if not values.get("path_prefix") and not values.get("dynamic_file_path"):
        raise ValueError("Either path_prefix or dynamic_file_path must be provided")
    if values.get("path_prefix") and values.get("dynamic_file_path"):
        raise ValueError("Only one of path_prefix or dynamic_file_path should be provided")
    return values
class LocalDataEnvironment (**data: Any)

The data is provided by local storage.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class LocalDataEnvironment(IOEnvironment):
    """The data is provided by local storage."""

    local: LocalDataSubSection


Class variables

var localLocalDataSubSection
var model_computed_fields
var model_config
var model_fields

Inherited members

class LocalDataSubSection (**data: Any)

Config section for local data provider.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class LocalDataSubSection(BaseModel):
    """Config section for local data provider."""

    file_path: str
    file_type: FileType


  • pydantic.main.BaseModel

Class variables

var file_path : str
var file_typeFileType
var model_computed_fields
var model_config
var model_fields
class PostgresDataEnvironment (**data: Any)

Parent section for postgres data source.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class PostgresDataEnvironment(IOEnvironment):
    """Parent section for postgres data source."""

    postgres: PostgresDataSubSection


Class variables

var model_computed_fields
var model_config
var model_fields
var postgresPostgresDataSubSection

Inherited members

class PostgresDataSubSection (**data: Any)

Postgres data source configuration.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class PostgresDataSubSection(BaseModel):
    """Postgres data source configuration."""

    db_host: str
    db_port: str
    db_name: str
    db_user: str
    db_password: str


  • pydantic.main.BaseModel

Class variables

var db_host : str
var db_name : str
var db_password : str
var db_port : str
var db_user : str
var model_computed_fields
var model_config
var model_fields
class S3DataEnvironment (**data: Any)

Parent section for s3 data source config.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class S3DataEnvironment(IOEnvironment):
    """Parent section for s3 data source config."""

    s3: S3DataSubSection


Class variables

var model_computed_fields
var model_config
var model_fields
var s3S3DataSubSection

Inherited members

class S3DataSubSection (**data: Any)

Config section for S3 data source.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class S3DataSubSection(BaseModel):
    """Config section for S3 data source."""

    file_path: str
    file_type: FileType
    bucket: str


  • pydantic.main.BaseModel

Class variables

var bucket : str
var file_path : str
var file_typeFileType
var model_computed_fields
var model_config
var model_fields
class S3PathPrefixEnvironment (**data: Any)

Parent section for the multi-object s3 data source.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class S3PathPrefixEnvironment(IOEnvironment):
    """Parent section for the multi-object s3 data source."""

    s3: S3PathPrefixSubSection


Class variables

var model_computed_fields
var model_config
var model_fields
var s3S3PathPrefixSubSection

Inherited members

class S3PathPrefixSubSection (**data: Any)

Config section for s3 prefix data source (multiple s3 objects).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Expand source code
class S3PathPrefixSubSection(BaseModel):
    """Config section for s3 prefix data source (multiple s3 objects)."""

    path_prefix: str
    file_type: FileType
    bucket: str

    def support_legacy_config_path_prefix(cls, values):
        """This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field.

            bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf"
        bucket = values.get("bucket")
        path_prefix = values.get("path_prefix")
        if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix):
            (new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1)
                    "bucket": new_bucket,
                    "path_prefix": new_path_prefix,
        return values


  • pydantic.main.BaseModel

Class variables

var bucket : str
var file_typeFileType
var model_computed_fields
var model_config
var model_fields
var path_prefix : str

Static methods

def support_legacy_config_path_prefix(values)

This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field.

E.g. bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf"

Expand source code
def support_legacy_config_path_prefix(cls, values):
    """This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field.

        bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf"
    bucket = values.get("bucket")
    path_prefix = values.get("path_prefix")
    if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix):
        (new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1)
                "bucket": new_bucket,
                "path_prefix": new_path_prefix,
    return values