Module dynamicio.core
Implements the DynamicDataIO class which provides functionality for data: loading; sinking, and; schema validation.
Expand source code
"""Implements the DynamicDataIO class which provides functionality for data: loading; sinking, and; schema validation."""
# pylint: disable=no-member
__all__ = ["DynamicDataIO", "SCHEMA_FROM_FILE"]
import asyncio
import inspect
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple
import pandas as pd # type: ignore
import pydantic
from magic_logger import logger
from dynamicio import validations
from dynamicio.config.pydantic import DataframeSchema, IOEnvironment, SchemaColumn
from dynamicio.errors import CASTING_WARNING_MSG, ColumnsDataTypeError, NOTICE_MSG, SchemaNotFoundError, SchemaValidationError
from dynamicio.metrics import get_metric
SCHEMA_FROM_FILE = {"schema": object()}
pool = ThreadPoolExecutor()
class DynamicDataIO:
"""Given a `src.utils.dynamicio.config.IOConfig` object, it generates an object with access to a series of methods for cloud I/O operations and data validations.
Example:
>>> input_sources_config = IOConfig(
>>> "path_to/input.yaml",
>>> os.getenv("ENVIRONMENT",default="LOCAL")
>>> )
>>>
>>> class IO(WithS3File, WithLocal, DynamicDataIO):
>>> schema = S
>>>
>>> my_dataset_local_mapping = input_config.get(source_key="MY_DATASET")
>>> my_dataset_io = IO(my_dataset_local_mapping)
>>> my_dataset_df = my_dataset_io.read()
"""
schema: DataframeSchema
sources_config: IOEnvironment
def __init__(
self,
source_config: IOEnvironment,
apply_schema_validations: bool = False,
log_schema_metrics: bool = False,
show_casting_warnings: bool = False,
**options: MutableMapping[str, Any],
):
"""Class constructor.
Args:
source_config: Configuration to use when reading/writing data from/to a source
apply_schema_validations: Applies schema validations on either read() or write()
log_schema_metrics: Logs schema metrics on either read() or write()
show_casting_warnings: Logs casting warnings on either read() or write() if set to True
options: Any additional kwargs that may be used throughout the lifecycle of the object
"""
if type(self) is DynamicDataIO: # pylint: disable=unidiomatic-typecheck
raise TypeError("Abstract class DynamicDataIO cannot be used to instantiate an object...")
self.sources_config = source_config
self.name = self._transform_class_name_to_dataset_name(self.__class__.__name__)
self.apply_schema_validations = apply_schema_validations
self.log_schema_metrics = log_schema_metrics
self.show_casting_warnings = show_casting_warnings
self.options = self._get_options(options, source_config.options)
source_name = self.sources_config.data_backend_type.value
if self.schema is SCHEMA_FROM_FILE:
active_schema = self.sources_config.dynamicio_schema
else:
active_schema = self._schema_from_obj(self)
if not active_schema:
raise SchemaNotFoundError()
assert isinstance(active_schema, DataframeSchema)
self.schema = active_schema
self.name = self.schema.name.upper()
self.schema_validations = self.schema.validations
self.schema_metrics = self.schema.metrics
assert hasattr(self, f"_read_from_{source_name}") or hasattr(
self, f"_write_to_{source_name}"
), f"No method '_read_from_{source_name}' or '_write_to_{source_name}'. Have you registered a mixin for {source_name}?"
@staticmethod
def _schema_from_obj(target) -> DataframeSchema:
"""Construct `DataframeSchema` from an object.
The object:
- MUST have `schema` attribute that is a dictionary specifying columns and datatypes
- CAN have `schema_validations` and `schema_metrics` attributes
"""
col_info = {}
for (col_name, dtype) in target.schema.items():
col_validations = {}
col_metrics = []
try:
col_validations = target.schema_validations[col_name]
except (KeyError, AttributeError):
pass
try:
col_metrics = target.schema_metrics[col_name]
except (KeyError, AttributeError):
pass
col_info[col_name] = {
"type": dtype,
"validations": col_validations,
"metrics": col_metrics,
}
try:
out = DataframeSchema(name=target.name, columns=col_info)
except pydantic.ValidationError:
logger.exception(f"Error parsing {target.name=!r} {col_info=!r}")
raise
return out
def __init_subclass__(cls):
"""Ensure that all subclasses have a `schema` attribute and a `validate` method.
Raises:
AssertionError: If either of the attributes is not implemented
"""
if not inspect.getmodule(cls).__name__.startswith("dynamicio"):
assert "schema" in cls.__dict__
if cls.schema is None or (cls.schema is not SCHEMA_FROM_FILE and len(cls.schema) == 0):
raise ValueError(f"schema for class {cls} cannot be None or empty...")
async def async_read(self):
"""Allows the use of asyncio to concurrently read files in memory.
Returns:
A pandas dataframe or an iterable.
"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(pool, self.read)
def read(self) -> pd.DataFrame:
"""Reads data source and returns a schema validated dataframe (by means of _apply_schema).
Returns:
A pandas dataframe or an iterable.
"""
source_name = self.sources_config.data_backend_type.value
df = getattr(self, f"_read_from_{source_name}")()
df = self._apply_schema(df)
if self.apply_schema_validations:
self.validate_from_schema(df)
if self.log_schema_metrics:
self.log_metrics_from_schema(df)
return df
async def async_write(self, df: pd.DataFrame):
"""Allows the use of asyncio to concurrently write files out.
Args:
df: The data to be written
"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(pool, self.write, df)
def write(self, df: pd.DataFrame):
"""Sink data to a given source based on the sources_config.
Args:
df: The data to be written
"""
source_name = self.sources_config.data_backend_type.value
if set(df.columns) != self.schema.column_names: # pylint: disable=E1101
columns = [column for column in df.columns.to_list() if column in self.schema.column_names]
df = df[columns]
if self.apply_schema_validations:
self.validate_from_schema(df)
if self.log_schema_metrics:
self.log_metrics_from_schema(df)
getattr(self, f"_write_to_{source_name}")(self._apply_schema(df))
def validate_from_schema(self, df: pd.DataFrame) -> "DynamicDataIO":
"""Validates a dataframe based on the validations present in its schema definition.
All validations are checked and if any of them fails, a `SchemaValidationError` is raised.
Args:
df: A dataframe to be validated.
Returns:
self (to allow for method chaining).
Raises:
SchemaValidationError: if any of the validations failed. The `message` attribute of
the exception object is a `List[str]`, where each element is the name of a
validation that failed.
"""
failed_validations = {}
for column in self.schema_validations.keys():
col_validations = self.schema_validations[column]
for validation in col_validations:
if validation.apply:
validator = validations.ALL_VALIDATORS[validation.name]
validation_result = validator(self.name, df, column, **validation.options)
if not validation_result.valid:
failed_validations[validation.name] = validation_result.message
if len(failed_validations) > 0:
raise SchemaValidationError(failed_validations)
return self
def log_metrics_from_schema(self, df: pd.DataFrame) -> "DynamicDataIO":
"""Calculates and logs metrics based on the metrics present in its schema definition.
Args:
df: A dataframe for which metrics are generated and logged
Returns:
self (to allow for method chaining).
"""
for column in self.schema_metrics.keys():
for metric in self.schema_metrics[column]:
get_metric(metric)(self.name, df, column)() # type: ignore
return self
def _apply_schema(self, df: pd.DataFrame) -> pd.DataFrame:
"""Called by the `self.read()` and the `self._write_to_local()` methods.
Contrasts a dataframe's read from a given source against the class's schema dictionary,
checking that columns are the same (by means of _has_columns and _has_valid_dtypes). Then,
check if the columns are fine, it further validates if the types of columns conform to the
expected schema. Finally, if schema types are different, then it attempts to apply schema;
if possible then the schema validation is successful.
Args:
df: A pandas dataframe.
Returns:
A schema validated dataframe.
"""
if not self._has_valid_dtypes(df):
raise ColumnsDataTypeError()
return df
@staticmethod
def _transform_class_name_to_dataset_name(string_to_transform: str) -> str:
"""Called by the init function to fetch dataset names from class name.
Used to create dataset name from class name, turns camel case into upper snake case.
For example: 'ThisNameABC' -> 'THIS_NAME_ABC'.
"""
words = re.findall(r"\d[A-Z]+|[A-Z]?[a-z\d]+|[A-Z]{2,}(?=[A-Z][a-z]|\d|\W|$)|\d+|[A-Z]{2,}|[A-Z]", string_to_transform)
return "_".join(map(str.lower, words)).upper()
def _has_valid_dtypes(self, df: pd.DataFrame) -> bool:
"""Checks if `df` has the expected dtypes defined in `schema`.
Schema is a dictionary object where keys are column names and values are dtypes in string format as returned by e.g.
`df[column].dtype.name`.
This function issues `error` level logs describing the first column that caused the check to fail.
It is assumed that `df` only has the columns defined in `schema`.
Args:
df:
Returns:
bool - `True` if `df` has the given dtypes, `False` otherwise
"""
dtypes = df.dtypes
cols_allowed, cols_not_allowed = self._split_columns_to_validate_between_allowed_and_not(columns_to_validate=self.options.get("columns", self.schema.column_names))
if cols_not_allowed:
logger.exception(f"The columns passed: {cols_not_allowed} do not belong to the schema")
return False
for col_info in cols_allowed:
column_name = col_info.name
expected_dtype = col_info.data_type
found_dtype = dtypes[column_name].name
if found_dtype != expected_dtype:
if self.show_casting_warnings:
logger.info(f"Expected: '{expected_dtype}' dtype for {self.name}['{column_name}]', found '{found_dtype}'")
try:
if len(set(type(v) for v in df[column_name].values)) > 1: # pylint: disable=consider-using-set-comprehension
logger.warning(CASTING_WARNING_MSG.format(column_name, expected_dtype, found_dtype)) # pylint: disable=logging-format-interpolation
logger.info(NOTICE_MSG.format(column_name)) # pylint: disable=logging-format-interpolation
df[column_name] = df[column_name].astype(self.schema.columns[column_name].data_type)
except (ValueError, TypeError):
logger.exception(f"ValueError: Tried casting column {self.name}['{column_name}'] to '{expected_dtype}' from '{found_dtype}', but failed")
return False
return True
def _split_columns_to_validate_between_allowed_and_not(self, columns_to_validate: List[str]) -> Tuple[List[SchemaColumn], List[str]]:
not_allowed_cols, allowed_cols = [], []
for col in columns_to_validate:
col_from_schema = self.schema.columns.get(col)
if col_from_schema:
allowed_cols.append(col_from_schema)
else:
not_allowed_cols.append(col)
return allowed_cols, not_allowed_cols
@staticmethod
def _get_options(options_from_code: MutableMapping[str, Any], options_from_resource_definition: Optional[Mapping[str, Any]]) -> MutableMapping[str, Any]:
"""Retrieves options either from code or from a resource-definition.
Options are merged if they are provided by both sources, while in the case of conflicts, the options from the code
take precedence.
Args:
options_from_code (Optional[Mapping])
options_from_resource_definition (Optional[Mapping])
Returns:
[Optional[Mapping]]: options that are going to be used
"""
if options_from_resource_definition:
return {**options_from_resource_definition, **options_from_code}
return options_from_code
Classes
class DynamicDataIO (source_config: IOEnvironment, apply_schema_validations: bool = False, log_schema_metrics: bool = False, show_casting_warnings: bool = False, **options: MutableMapping[str, Any])
-
Given a
src.utils.dynamicio.config.IOConfig
object, it generates an object with access to a series of methods for cloud I/O operations and data validations.Example
>>> input_sources_config = IOConfig( >>> "path_to/input.yaml", >>> os.getenv("ENVIRONMENT",default="LOCAL") >>> ) >>> >>> class IO(WithS3File, WithLocal, DynamicDataIO): >>> schema = S >>> >>> my_dataset_local_mapping = input_config.get(source_key="MY_DATASET") >>> my_dataset_io = IO(my_dataset_local_mapping) >>> my_dataset_df = my_dataset_io.read()
Class constructor.
Args
source_config
- Configuration to use when reading/writing data from/to a source
apply_schema_validations
- Applies schema validations on either read() or write()
log_schema_metrics
- Logs schema metrics on either read() or write()
show_casting_warnings
- Logs casting warnings on either read() or write() if set to True
options
- Any additional kwargs that may be used throughout the lifecycle of the object
Expand source code
class DynamicDataIO: """Given a `src.utils.dynamicio.config.IOConfig` object, it generates an object with access to a series of methods for cloud I/O operations and data validations. Example: >>> input_sources_config = IOConfig( >>> "path_to/input.yaml", >>> os.getenv("ENVIRONMENT",default="LOCAL") >>> ) >>> >>> class IO(WithS3File, WithLocal, DynamicDataIO): >>> schema = S >>> >>> my_dataset_local_mapping = input_config.get(source_key="MY_DATASET") >>> my_dataset_io = IO(my_dataset_local_mapping) >>> my_dataset_df = my_dataset_io.read() """ schema: DataframeSchema sources_config: IOEnvironment def __init__( self, source_config: IOEnvironment, apply_schema_validations: bool = False, log_schema_metrics: bool = False, show_casting_warnings: bool = False, **options: MutableMapping[str, Any], ): """Class constructor. Args: source_config: Configuration to use when reading/writing data from/to a source apply_schema_validations: Applies schema validations on either read() or write() log_schema_metrics: Logs schema metrics on either read() or write() show_casting_warnings: Logs casting warnings on either read() or write() if set to True options: Any additional kwargs that may be used throughout the lifecycle of the object """ if type(self) is DynamicDataIO: # pylint: disable=unidiomatic-typecheck raise TypeError("Abstract class DynamicDataIO cannot be used to instantiate an object...") self.sources_config = source_config self.name = self._transform_class_name_to_dataset_name(self.__class__.__name__) self.apply_schema_validations = apply_schema_validations self.log_schema_metrics = log_schema_metrics self.show_casting_warnings = show_casting_warnings self.options = self._get_options(options, source_config.options) source_name = self.sources_config.data_backend_type.value if self.schema is SCHEMA_FROM_FILE: active_schema = self.sources_config.dynamicio_schema else: active_schema = self._schema_from_obj(self) if not active_schema: raise SchemaNotFoundError() assert isinstance(active_schema, DataframeSchema) self.schema = active_schema self.name = self.schema.name.upper() self.schema_validations = self.schema.validations self.schema_metrics = self.schema.metrics assert hasattr(self, f"_read_from_{source_name}") or hasattr( self, f"_write_to_{source_name}" ), f"No method '_read_from_{source_name}' or '_write_to_{source_name}'. Have you registered a mixin for {source_name}?" @staticmethod def _schema_from_obj(target) -> DataframeSchema: """Construct `DataframeSchema` from an object. The object: - MUST have `schema` attribute that is a dictionary specifying columns and datatypes - CAN have `schema_validations` and `schema_metrics` attributes """ col_info = {} for (col_name, dtype) in target.schema.items(): col_validations = {} col_metrics = [] try: col_validations = target.schema_validations[col_name] except (KeyError, AttributeError): pass try: col_metrics = target.schema_metrics[col_name] except (KeyError, AttributeError): pass col_info[col_name] = { "type": dtype, "validations": col_validations, "metrics": col_metrics, } try: out = DataframeSchema(name=target.name, columns=col_info) except pydantic.ValidationError: logger.exception(f"Error parsing {target.name=!r} {col_info=!r}") raise return out def __init_subclass__(cls): """Ensure that all subclasses have a `schema` attribute and a `validate` method. Raises: AssertionError: If either of the attributes is not implemented """ if not inspect.getmodule(cls).__name__.startswith("dynamicio"): assert "schema" in cls.__dict__ if cls.schema is None or (cls.schema is not SCHEMA_FROM_FILE and len(cls.schema) == 0): raise ValueError(f"schema for class {cls} cannot be None or empty...") async def async_read(self): """Allows the use of asyncio to concurrently read files in memory. Returns: A pandas dataframe or an iterable. """ loop = asyncio.get_running_loop() return await loop.run_in_executor(pool, self.read) def read(self) -> pd.DataFrame: """Reads data source and returns a schema validated dataframe (by means of _apply_schema). Returns: A pandas dataframe or an iterable. """ source_name = self.sources_config.data_backend_type.value df = getattr(self, f"_read_from_{source_name}")() df = self._apply_schema(df) if self.apply_schema_validations: self.validate_from_schema(df) if self.log_schema_metrics: self.log_metrics_from_schema(df) return df async def async_write(self, df: pd.DataFrame): """Allows the use of asyncio to concurrently write files out. Args: df: The data to be written """ loop = asyncio.get_running_loop() return await loop.run_in_executor(pool, self.write, df) def write(self, df: pd.DataFrame): """Sink data to a given source based on the sources_config. Args: df: The data to be written """ source_name = self.sources_config.data_backend_type.value if set(df.columns) != self.schema.column_names: # pylint: disable=E1101 columns = [column for column in df.columns.to_list() if column in self.schema.column_names] df = df[columns] if self.apply_schema_validations: self.validate_from_schema(df) if self.log_schema_metrics: self.log_metrics_from_schema(df) getattr(self, f"_write_to_{source_name}")(self._apply_schema(df)) def validate_from_schema(self, df: pd.DataFrame) -> "DynamicDataIO": """Validates a dataframe based on the validations present in its schema definition. All validations are checked and if any of them fails, a `SchemaValidationError` is raised. Args: df: A dataframe to be validated. Returns: self (to allow for method chaining). Raises: SchemaValidationError: if any of the validations failed. The `message` attribute of the exception object is a `List[str]`, where each element is the name of a validation that failed. """ failed_validations = {} for column in self.schema_validations.keys(): col_validations = self.schema_validations[column] for validation in col_validations: if validation.apply: validator = validations.ALL_VALIDATORS[validation.name] validation_result = validator(self.name, df, column, **validation.options) if not validation_result.valid: failed_validations[validation.name] = validation_result.message if len(failed_validations) > 0: raise SchemaValidationError(failed_validations) return self def log_metrics_from_schema(self, df: pd.DataFrame) -> "DynamicDataIO": """Calculates and logs metrics based on the metrics present in its schema definition. Args: df: A dataframe for which metrics are generated and logged Returns: self (to allow for method chaining). """ for column in self.schema_metrics.keys(): for metric in self.schema_metrics[column]: get_metric(metric)(self.name, df, column)() # type: ignore return self def _apply_schema(self, df: pd.DataFrame) -> pd.DataFrame: """Called by the `self.read()` and the `self._write_to_local()` methods. Contrasts a dataframe's read from a given source against the class's schema dictionary, checking that columns are the same (by means of _has_columns and _has_valid_dtypes). Then, check if the columns are fine, it further validates if the types of columns conform to the expected schema. Finally, if schema types are different, then it attempts to apply schema; if possible then the schema validation is successful. Args: df: A pandas dataframe. Returns: A schema validated dataframe. """ if not self._has_valid_dtypes(df): raise ColumnsDataTypeError() return df @staticmethod def _transform_class_name_to_dataset_name(string_to_transform: str) -> str: """Called by the init function to fetch dataset names from class name. Used to create dataset name from class name, turns camel case into upper snake case. For example: 'ThisNameABC' -> 'THIS_NAME_ABC'. """ words = re.findall(r"\d[A-Z]+|[A-Z]?[a-z\d]+|[A-Z]{2,}(?=[A-Z][a-z]|\d|\W|$)|\d+|[A-Z]{2,}|[A-Z]", string_to_transform) return "_".join(map(str.lower, words)).upper() def _has_valid_dtypes(self, df: pd.DataFrame) -> bool: """Checks if `df` has the expected dtypes defined in `schema`. Schema is a dictionary object where keys are column names and values are dtypes in string format as returned by e.g. `df[column].dtype.name`. This function issues `error` level logs describing the first column that caused the check to fail. It is assumed that `df` only has the columns defined in `schema`. Args: df: Returns: bool - `True` if `df` has the given dtypes, `False` otherwise """ dtypes = df.dtypes cols_allowed, cols_not_allowed = self._split_columns_to_validate_between_allowed_and_not(columns_to_validate=self.options.get("columns", self.schema.column_names)) if cols_not_allowed: logger.exception(f"The columns passed: {cols_not_allowed} do not belong to the schema") return False for col_info in cols_allowed: column_name = col_info.name expected_dtype = col_info.data_type found_dtype = dtypes[column_name].name if found_dtype != expected_dtype: if self.show_casting_warnings: logger.info(f"Expected: '{expected_dtype}' dtype for {self.name}['{column_name}]', found '{found_dtype}'") try: if len(set(type(v) for v in df[column_name].values)) > 1: # pylint: disable=consider-using-set-comprehension logger.warning(CASTING_WARNING_MSG.format(column_name, expected_dtype, found_dtype)) # pylint: disable=logging-format-interpolation logger.info(NOTICE_MSG.format(column_name)) # pylint: disable=logging-format-interpolation df[column_name] = df[column_name].astype(self.schema.columns[column_name].data_type) except (ValueError, TypeError): logger.exception(f"ValueError: Tried casting column {self.name}['{column_name}'] to '{expected_dtype}' from '{found_dtype}', but failed") return False return True def _split_columns_to_validate_between_allowed_and_not(self, columns_to_validate: List[str]) -> Tuple[List[SchemaColumn], List[str]]: not_allowed_cols, allowed_cols = [], [] for col in columns_to_validate: col_from_schema = self.schema.columns.get(col) if col_from_schema: allowed_cols.append(col_from_schema) else: not_allowed_cols.append(col) return allowed_cols, not_allowed_cols @staticmethod def _get_options(options_from_code: MutableMapping[str, Any], options_from_resource_definition: Optional[Mapping[str, Any]]) -> MutableMapping[str, Any]: """Retrieves options either from code or from a resource-definition. Options are merged if they are provided by both sources, while in the case of conflicts, the options from the code take precedence. Args: options_from_code (Optional[Mapping]) options_from_resource_definition (Optional[Mapping]) Returns: [Optional[Mapping]]: options that are going to be used """ if options_from_resource_definition: return {**options_from_resource_definition, **options_from_code} return options_from_code
Subclasses
Class variables
var schema : DataframeSchema
var sources_config : IOEnvironment
Methods
async def async_read(self)
-
Allows the use of asyncio to concurrently read files in memory.
Returns
A pandas dataframe or an iterable.
Expand source code
async def async_read(self): """Allows the use of asyncio to concurrently read files in memory. Returns: A pandas dataframe or an iterable. """ loop = asyncio.get_running_loop() return await loop.run_in_executor(pool, self.read)
async def async_write(self, df: pandas.core.frame.DataFrame)
-
Allows the use of asyncio to concurrently write files out.
Args
df
- The data to be written
Expand source code
async def async_write(self, df: pd.DataFrame): """Allows the use of asyncio to concurrently write files out. Args: df: The data to be written """ loop = asyncio.get_running_loop() return await loop.run_in_executor(pool, self.write, df)
def log_metrics_from_schema(self, df: pandas.core.frame.DataFrame) ‑> DynamicDataIO
-
Calculates and logs metrics based on the metrics present in its schema definition.
Args
df
- A dataframe for which metrics are generated and logged
Returns
self (to allow for method chaining).
Expand source code
def log_metrics_from_schema(self, df: pd.DataFrame) -> "DynamicDataIO": """Calculates and logs metrics based on the metrics present in its schema definition. Args: df: A dataframe for which metrics are generated and logged Returns: self (to allow for method chaining). """ for column in self.schema_metrics.keys(): for metric in self.schema_metrics[column]: get_metric(metric)(self.name, df, column)() # type: ignore return self
def read(self) ‑> pandas.core.frame.DataFrame
-
Reads data source and returns a schema validated dataframe (by means of _apply_schema).
Returns
A pandas dataframe or an iterable.
Expand source code
def read(self) -> pd.DataFrame: """Reads data source and returns a schema validated dataframe (by means of _apply_schema). Returns: A pandas dataframe or an iterable. """ source_name = self.sources_config.data_backend_type.value df = getattr(self, f"_read_from_{source_name}")() df = self._apply_schema(df) if self.apply_schema_validations: self.validate_from_schema(df) if self.log_schema_metrics: self.log_metrics_from_schema(df) return df
def validate_from_schema(self, df: pandas.core.frame.DataFrame) ‑> DynamicDataIO
-
Validates a dataframe based on the validations present in its schema definition.
All validations are checked and if any of them fails, a
SchemaValidationError
is raised.Args
df
- A dataframe to be validated.
Returns
self (to allow for method chaining).
Raises
SchemaValidationError
- if any of the validations failed. The
message
attribute of the exception object is aList[str]
, where each element is the name of a validation that failed.
Expand source code
def validate_from_schema(self, df: pd.DataFrame) -> "DynamicDataIO": """Validates a dataframe based on the validations present in its schema definition. All validations are checked and if any of them fails, a `SchemaValidationError` is raised. Args: df: A dataframe to be validated. Returns: self (to allow for method chaining). Raises: SchemaValidationError: if any of the validations failed. The `message` attribute of the exception object is a `List[str]`, where each element is the name of a validation that failed. """ failed_validations = {} for column in self.schema_validations.keys(): col_validations = self.schema_validations[column] for validation in col_validations: if validation.apply: validator = validations.ALL_VALIDATORS[validation.name] validation_result = validator(self.name, df, column, **validation.options) if not validation_result.valid: failed_validations[validation.name] = validation_result.message if len(failed_validations) > 0: raise SchemaValidationError(failed_validations) return self
def write(self, df: pandas.core.frame.DataFrame)
-
Sink data to a given source based on the sources_config.
Args
df
- The data to be written
Expand source code
def write(self, df: pd.DataFrame): """Sink data to a given source based on the sources_config. Args: df: The data to be written """ source_name = self.sources_config.data_backend_type.value if set(df.columns) != self.schema.column_names: # pylint: disable=E1101 columns = [column for column in df.columns.to_list() if column in self.schema.column_names] df = df[columns] if self.apply_schema_validations: self.validate_from_schema(df) if self.log_schema_metrics: self.log_metrics_from_schema(df) getattr(self, f"_write_to_{source_name}")(self._apply_schema(df))