Module dynamicio.config.pydantic.io_resources
This module contains pylint models for physical data sources (places the bytes are being read from).
Expand source code
# pylint: disable=no-member, no-self-argument, unused-argument
"""This module contains pylint models for physical data sources (places the bytes are being read from)."""
import enum
import posixpath
from typing import Mapping, Optional, Union
import pydantic
from pydantic import BaseModel, model_validator
import dynamicio.config.pydantic.table_schema as table_spec
@enum.unique
class DataBackendType(str, enum.Enum):
"""Input file types."""
# pylint: disable=invalid-name
local = "local"
local_batch = "local_batch"
s3 = "s3" # is there a difference between 's3' and 's3_file' ?
s3_file = "s3_file"
s3_path_prefix = "s3_path_prefix"
postgres = "postgres"
athena = "athena"
kafka = "kafka"
@enum.unique
class FileType(str, enum.Enum):
"""List of supported file formats."""
# pylint: disable=invalid-name
parquet = "parquet"
csv = "csv"
json = "json"
hdf = "hdf"
class IOBinding(BaseModel):
"""A binding for a single i/o object."""
name: str = pydantic.Field(alias="__binding_name__")
environments: Mapping[
str,
Union["IOEnvironment", "LocalDataEnvironment", "LocalBatchDataEnvironment", "S3DataEnvironment", "S3PathPrefixEnvironment", "KafkaDataEnvironment", "PostgresDataEnvironment"],
]
dynamicio_schema: Union[table_spec.DataframeSchema, None] = pydantic.Field(default=None, alias="schema")
def get_binding_for_environment(self, environment: str) -> "IOEnvironment":
"""Fetch the IOEnvironment spec for the name provided."""
return self.environments[environment]
@pydantic.validator("environments", pre=True, always=True)
def pick_correct_env_cls(cls, info):
"""This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`."""
if not isinstance(info, Mapping):
raise ValueError(f"Environments input should be a dict. Got {info!r} instead.")
config_cls_overrides = {
DataBackendType.local: LocalDataEnvironment,
DataBackendType.local_batch: LocalBatchDataEnvironment,
DataBackendType.s3: S3DataEnvironment,
DataBackendType.s3_file: S3DataEnvironment,
DataBackendType.s3_path_prefix: S3PathPrefixEnvironment,
DataBackendType.kafka: KafkaDataEnvironment,
DataBackendType.postgres: PostgresDataEnvironment,
}
out_dict = {}
for (env_name, env_data) in info.items():
base_obj: IOEnvironment = IOEnvironment(**env_data)
override_cls = config_cls_overrides.get(base_obj.data_backend_type)
if override_cls:
use_obj = override_cls(**env_data)
else:
use_obj = base_obj
out_dict[env_name] = use_obj
return out_dict
@pydantic.root_validator(pre=True)
def _preprocess_raw_config(cls, values):
if not isinstance(values, Mapping):
raise ValueError(f"IOBinding must be a dict at the top level. (got {values!r} instead)")
remapped_value = {"environments": {}}
for (key, value) in values.items():
if key in ("__binding_name__", "schema"):
# Passthrough params
remapped_value[key] = value
else:
# Assuming an environment config
remapped_value["environments"][key] = value
return remapped_value
class IOEnvironment(BaseModel):
"""A section specifiing an data source backed by a particular data backend."""
_parent: Optional[IOBinding] = None # noqa: F821
options: Mapping = pydantic.Field(default_factory=dict)
data_backend_type: DataBackendType = pydantic.Field(alias="type", const=None)
class Config:
"""Additional pydantic configuration for the model."""
underscore_attrs_are_private = True
@property
def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]:
"""Returns tabular data structure definition for the data source (if available)."""
if not self._parent:
raise Exception("Parent field is not set.")
return self._parent.dynamicio_schema
def set_parent(self, parent: IOBinding): # noqa: F821
"""Helper method to set parent config object."""
assert self._parent is None
self._parent = parent
class LocalDataSubSection(BaseModel):
"""Config section for local data provider."""
file_path: str
file_type: FileType
class LocalDataEnvironment(IOEnvironment):
"""The data is provided by local storage."""
local: LocalDataSubSection
class LocalBatchDataSubSection(BaseModel):
"""Config section for local batch data (multiple input files)."""
path_prefix: Optional[str] = None
dynamic_file_path: Optional[str] = None
file_type: FileType
@model_validator(mode="before")
def check_path_fields(cls, values):
"""Check that only one of path_prefix or dynamic_file_path is provided."""
if not values.get("path_prefix") and not values.get("dynamic_file_path"):
raise ValueError("Either path_prefix or dynamic_file_path must be provided")
if values.get("path_prefix") and values.get("dynamic_file_path"):
raise ValueError("Only one of path_prefix or dynamic_file_path should be provided")
return values
class LocalBatchDataEnvironment(IOEnvironment):
"""Parent section for local batch (multiple files) config."""
local: LocalBatchDataSubSection
class S3DataSubSection(BaseModel):
"""Config section for S3 data source."""
file_path: str
file_type: FileType
bucket: str
class S3DataEnvironment(IOEnvironment):
"""Parent section for s3 data source config."""
s3: S3DataSubSection
class S3PathPrefixSubSection(BaseModel):
"""Config section for s3 prefix data source (multiple s3 objects)."""
path_prefix: str
file_type: FileType
bucket: str
@pydantic.root_validator(pre=True)
def support_legacy_config_path_prefix(cls, values):
"""This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field.
E.g.
bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf"
"""
bucket = values.get("bucket")
path_prefix = values.get("path_prefix")
if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix):
(new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1)
values.update(
{
"bucket": new_bucket,
"path_prefix": new_path_prefix,
}
)
return values
class S3PathPrefixEnvironment(IOEnvironment):
"""Parent section for the multi-object s3 data source."""
s3: S3PathPrefixSubSection
class KafkaDataSubSection(BaseModel):
"""Kafka configuration section."""
kafka_server: str
kafka_topic: str
class KafkaDataEnvironment(IOEnvironment):
"""Parent section for kafka data source config."""
kafka: KafkaDataSubSection
class PostgresDataSubSection(BaseModel):
"""Postgres data source configuration."""
db_host: str
db_port: str
db_name: str
db_user: str
db_password: str
class PostgresDataEnvironment(IOEnvironment):
"""Parent section for postgres data source."""
postgres: PostgresDataSubSection
IOBinding.model_rebuild()
Classes
class DataBackendType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Input file types.
Expand source code
class DataBackendType(str, enum.Enum): """Input file types.""" # pylint: disable=invalid-name local = "local" local_batch = "local_batch" s3 = "s3" # is there a difference between 's3' and 's3_file' ? s3_file = "s3_file" s3_path_prefix = "s3_path_prefix" postgres = "postgres" athena = "athena" kafka = "kafka"
Ancestors
- builtins.str
- enum.Enum
Class variables
var athena
var kafka
var local
var local_batch
var postgres
var s3
var s3_file
var s3_path_prefix
class FileType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
List of supported file formats.
Expand source code
class FileType(str, enum.Enum): """List of supported file formats.""" # pylint: disable=invalid-name parquet = "parquet" csv = "csv" json = "json" hdf = "hdf"
Ancestors
- builtins.str
- enum.Enum
Class variables
var csv
var hdf
var json
var parquet
class IOBinding (**data: Any)
-
A binding for a single i/o object.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class IOBinding(BaseModel): """A binding for a single i/o object.""" name: str = pydantic.Field(alias="__binding_name__") environments: Mapping[ str, Union["IOEnvironment", "LocalDataEnvironment", "LocalBatchDataEnvironment", "S3DataEnvironment", "S3PathPrefixEnvironment", "KafkaDataEnvironment", "PostgresDataEnvironment"], ] dynamicio_schema: Union[table_spec.DataframeSchema, None] = pydantic.Field(default=None, alias="schema") def get_binding_for_environment(self, environment: str) -> "IOEnvironment": """Fetch the IOEnvironment spec for the name provided.""" return self.environments[environment] @pydantic.validator("environments", pre=True, always=True) def pick_correct_env_cls(cls, info): """This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`.""" if not isinstance(info, Mapping): raise ValueError(f"Environments input should be a dict. Got {info!r} instead.") config_cls_overrides = { DataBackendType.local: LocalDataEnvironment, DataBackendType.local_batch: LocalBatchDataEnvironment, DataBackendType.s3: S3DataEnvironment, DataBackendType.s3_file: S3DataEnvironment, DataBackendType.s3_path_prefix: S3PathPrefixEnvironment, DataBackendType.kafka: KafkaDataEnvironment, DataBackendType.postgres: PostgresDataEnvironment, } out_dict = {} for (env_name, env_data) in info.items(): base_obj: IOEnvironment = IOEnvironment(**env_data) override_cls = config_cls_overrides.get(base_obj.data_backend_type) if override_cls: use_obj = override_cls(**env_data) else: use_obj = base_obj out_dict[env_name] = use_obj return out_dict @pydantic.root_validator(pre=True) def _preprocess_raw_config(cls, values): if not isinstance(values, Mapping): raise ValueError(f"IOBinding must be a dict at the top level. (got {values!r} instead)") remapped_value = {"environments": {}} for (key, value) in values.items(): if key in ("__binding_name__", "schema"): # Passthrough params remapped_value[key] = value else: # Assuming an environment config remapped_value["environments"][key] = value return remapped_value
Ancestors
- pydantic.main.BaseModel
Class variables
var dynamicio_schema : Optional[DataframeSchema]
var environments : Mapping[str, Union[IOEnvironment, LocalDataEnvironment, LocalBatchDataEnvironment, S3DataEnvironment, S3PathPrefixEnvironment, KafkaDataEnvironment, PostgresDataEnvironment]]
var model_computed_fields
var model_config
var model_fields
var name : str
Static methods
def pick_correct_env_cls(info)
-
This pre-validator picks an appropriate IOEnvironment subclass for the
data_backend_type
.Expand source code
@pydantic.validator("environments", pre=True, always=True) def pick_correct_env_cls(cls, info): """This pre-validator picks an appropriate IOEnvironment subclass for the `data_backend_type`.""" if not isinstance(info, Mapping): raise ValueError(f"Environments input should be a dict. Got {info!r} instead.") config_cls_overrides = { DataBackendType.local: LocalDataEnvironment, DataBackendType.local_batch: LocalBatchDataEnvironment, DataBackendType.s3: S3DataEnvironment, DataBackendType.s3_file: S3DataEnvironment, DataBackendType.s3_path_prefix: S3PathPrefixEnvironment, DataBackendType.kafka: KafkaDataEnvironment, DataBackendType.postgres: PostgresDataEnvironment, } out_dict = {} for (env_name, env_data) in info.items(): base_obj: IOEnvironment = IOEnvironment(**env_data) override_cls = config_cls_overrides.get(base_obj.data_backend_type) if override_cls: use_obj = override_cls(**env_data) else: use_obj = base_obj out_dict[env_name] = use_obj return out_dict
Methods
def get_binding_for_environment(self, environment: str) ‑> IOEnvironment
-
Fetch the IOEnvironment spec for the name provided.
Expand source code
def get_binding_for_environment(self, environment: str) -> "IOEnvironment": """Fetch the IOEnvironment spec for the name provided.""" return self.environments[environment]
class IOEnvironment (**data: Any)
-
A section specifiing an data source backed by a particular data backend.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class IOEnvironment(BaseModel): """A section specifiing an data source backed by a particular data backend.""" _parent: Optional[IOBinding] = None # noqa: F821 options: Mapping = pydantic.Field(default_factory=dict) data_backend_type: DataBackendType = pydantic.Field(alias="type", const=None) class Config: """Additional pydantic configuration for the model.""" underscore_attrs_are_private = True @property def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]: """Returns tabular data structure definition for the data source (if available).""" if not self._parent: raise Exception("Parent field is not set.") return self._parent.dynamicio_schema def set_parent(self, parent: IOBinding): # noqa: F821 """Helper method to set parent config object.""" assert self._parent is None self._parent = parent
Ancestors
- pydantic.main.BaseModel
Subclasses
- KafkaDataEnvironment
- LocalBatchDataEnvironment
- LocalDataEnvironment
- PostgresDataEnvironment
- S3DataEnvironment
- S3PathPrefixEnvironment
Class variables
var Config
-
Additional pydantic configuration for the model.
var data_backend_type : DataBackendType
var model_computed_fields
var model_config
var model_fields
var options : Mapping[~KT, +VT_co]
Instance variables
var dynamicio_schema : Optional[DataframeSchema]
-
Returns tabular data structure definition for the data source (if available).
Expand source code
@property def dynamicio_schema(self) -> Union[table_spec.DataframeSchema, None]: """Returns tabular data structure definition for the data source (if available).""" if not self._parent: raise Exception("Parent field is not set.") return self._parent.dynamicio_schema
Methods
def model_post_init(self: BaseModel, __context: Any) ‑> None
-
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args
self
- The BaseModel instance.
__context
- The context.
Expand source code
def init_private_attributes(self: BaseModel, __context: Any) -> None: """This function is meant to behave like a BaseModel method to initialise private attributes. It takes context as an argument since that's what pydantic-core passes when calling it. Args: self: The BaseModel instance. __context: The context. """ if getattr(self, '__pydantic_private__', None) is None: pydantic_private = {} for name, private_attr in self.__private_attributes__.items(): default = private_attr.get_default() if default is not PydanticUndefined: pydantic_private[name] = default object_setattr(self, '__pydantic_private__', pydantic_private)
def set_parent(self, parent: IOBinding)
-
Helper method to set parent config object.
Expand source code
def set_parent(self, parent: IOBinding): # noqa: F821 """Helper method to set parent config object.""" assert self._parent is None self._parent = parent
class KafkaDataEnvironment (**data: Any)
-
Parent section for kafka data source config.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class KafkaDataEnvironment(IOEnvironment): """Parent section for kafka data source config.""" kafka: KafkaDataSubSection
Ancestors
- IOEnvironment
- pydantic.main.BaseModel
Class variables
var kafka : KafkaDataSubSection
var model_computed_fields
var model_config
var model_fields
Inherited members
class KafkaDataSubSection (**data: Any)
-
Kafka configuration section.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class KafkaDataSubSection(BaseModel): """Kafka configuration section.""" kafka_server: str kafka_topic: str
Ancestors
- pydantic.main.BaseModel
Class variables
var kafka_server : str
var kafka_topic : str
var model_computed_fields
var model_config
var model_fields
class LocalBatchDataEnvironment (**data: Any)
-
Parent section for local batch (multiple files) config.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class LocalBatchDataEnvironment(IOEnvironment): """Parent section for local batch (multiple files) config.""" local: LocalBatchDataSubSection
Ancestors
- IOEnvironment
- pydantic.main.BaseModel
Class variables
var local : LocalBatchDataSubSection
var model_computed_fields
var model_config
var model_fields
Inherited members
class LocalBatchDataSubSection (**data: Any)
-
Config section for local batch data (multiple input files).
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class LocalBatchDataSubSection(BaseModel): """Config section for local batch data (multiple input files).""" path_prefix: Optional[str] = None dynamic_file_path: Optional[str] = None file_type: FileType @model_validator(mode="before") def check_path_fields(cls, values): """Check that only one of path_prefix or dynamic_file_path is provided.""" if not values.get("path_prefix") and not values.get("dynamic_file_path"): raise ValueError("Either path_prefix or dynamic_file_path must be provided") if values.get("path_prefix") and values.get("dynamic_file_path"): raise ValueError("Only one of path_prefix or dynamic_file_path should be provided") return values
Ancestors
- pydantic.main.BaseModel
Class variables
var dynamic_file_path : Optional[str]
var file_type : FileType
var model_computed_fields
var model_config
var model_fields
var path_prefix : Optional[str]
Static methods
def check_path_fields(values)
-
Check that only one of path_prefix or dynamic_file_path is provided.
Expand source code
@model_validator(mode="before") def check_path_fields(cls, values): """Check that only one of path_prefix or dynamic_file_path is provided.""" if not values.get("path_prefix") and not values.get("dynamic_file_path"): raise ValueError("Either path_prefix or dynamic_file_path must be provided") if values.get("path_prefix") and values.get("dynamic_file_path"): raise ValueError("Only one of path_prefix or dynamic_file_path should be provided") return values
class LocalDataEnvironment (**data: Any)
-
The data is provided by local storage.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class LocalDataEnvironment(IOEnvironment): """The data is provided by local storage.""" local: LocalDataSubSection
Ancestors
- IOEnvironment
- pydantic.main.BaseModel
Class variables
var local : LocalDataSubSection
var model_computed_fields
var model_config
var model_fields
Inherited members
class LocalDataSubSection (**data: Any)
-
Config section for local data provider.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class LocalDataSubSection(BaseModel): """Config section for local data provider.""" file_path: str file_type: FileType
Ancestors
- pydantic.main.BaseModel
Class variables
var file_path : str
var file_type : FileType
var model_computed_fields
var model_config
var model_fields
class PostgresDataEnvironment (**data: Any)
-
Parent section for postgres data source.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class PostgresDataEnvironment(IOEnvironment): """Parent section for postgres data source.""" postgres: PostgresDataSubSection
Ancestors
- IOEnvironment
- pydantic.main.BaseModel
Class variables
var model_computed_fields
var model_config
var model_fields
var postgres : PostgresDataSubSection
Inherited members
class PostgresDataSubSection (**data: Any)
-
Postgres data source configuration.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class PostgresDataSubSection(BaseModel): """Postgres data source configuration.""" db_host: str db_port: str db_name: str db_user: str db_password: str
Ancestors
- pydantic.main.BaseModel
Class variables
var db_host : str
var db_name : str
var db_password : str
var db_port : str
var db_user : str
var model_computed_fields
var model_config
var model_fields
class S3DataEnvironment (**data: Any)
-
Parent section for s3 data source config.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class S3DataEnvironment(IOEnvironment): """Parent section for s3 data source config.""" s3: S3DataSubSection
Ancestors
- IOEnvironment
- pydantic.main.BaseModel
Class variables
var model_computed_fields
var model_config
var model_fields
var s3 : S3DataSubSection
Inherited members
class S3DataSubSection (**data: Any)
-
Config section for S3 data source.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class S3DataSubSection(BaseModel): """Config section for S3 data source.""" file_path: str file_type: FileType bucket: str
Ancestors
- pydantic.main.BaseModel
Class variables
var bucket : str
var file_path : str
var file_type : FileType
var model_computed_fields
var model_config
var model_fields
class S3PathPrefixEnvironment (**data: Any)
-
Parent section for the multi-object s3 data source.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class S3PathPrefixEnvironment(IOEnvironment): """Parent section for the multi-object s3 data source.""" s3: S3PathPrefixSubSection
Ancestors
- IOEnvironment
- pydantic.main.BaseModel
Class variables
var model_computed_fields
var model_config
var model_fields
var s3 : S3PathPrefixSubSection
Inherited members
class S3PathPrefixSubSection (**data: Any)
-
Config section for s3 prefix data source (multiple s3 objects).
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Expand source code
class S3PathPrefixSubSection(BaseModel): """Config section for s3 prefix data source (multiple s3 objects).""" path_prefix: str file_type: FileType bucket: str @pydantic.root_validator(pre=True) def support_legacy_config_path_prefix(cls, values): """This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field. E.g. bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf" """ bucket = values.get("bucket") path_prefix = values.get("path_prefix") if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix): (new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1) values.update( { "bucket": new_bucket, "path_prefix": new_path_prefix, } ) return values
Ancestors
- pydantic.main.BaseModel
Class variables
var bucket : str
var file_type : FileType
var model_computed_fields
var model_config
var model_fields
var path_prefix : str
Static methods
def support_legacy_config_path_prefix(values)
-
This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field.
E.g. bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf"
Expand source code
@pydantic.root_validator(pre=True) def support_legacy_config_path_prefix(cls, values): """This validator implements support for legacy config format where the bucket & path_prefix path could've been passed as a single param in 'bucket' field. E.g. bucket: "[[ MOCK_BUCKET ]]/data/input/{file_name_to_replace}.hdf" """ bucket = values.get("bucket") path_prefix = values.get("path_prefix") if (bucket and isinstance(bucket, str) and posixpath.sep in bucket) and (not path_prefix): (new_bucket, new_path_prefix) = bucket.split(posixpath.sep, 1) values.update( { "bucket": new_bucket, "path_prefix": new_path_prefix, } ) return values