Module dynamicio.mixins.utils
Mixin utility functions
Expand source code
"""Mixin utility functions"""
# pylint: disable=no-member, protected-access, too-few-public-methods
import inspect
import string
from contextlib import contextmanager
from functools import wraps
from types import FunctionType, MethodType
from typing import Any, Collection, Iterable, Mapping, MutableMapping, Optional, Union
from magic_logger import logger
def allow_options(options: Union[Iterable, FunctionType, MethodType]):
"""Validate **options for a decorated reader function.
Args:
options: A set of valid options for a reader (e.g. `pandas.read_parquet` or `pandas.read_csv`)
Returns:
read_with_valid_options: The input function called with modified options.
"""
def _filter_out_irrelevant_options(kwargs: Mapping, valid_options: Iterable):
filtered_options = {}
invalid_options = {}
for key_arg in kwargs.keys():
if key_arg in valid_options:
filtered_options[key_arg] = kwargs[key_arg]
else:
invalid_options[key_arg] = kwargs[key_arg]
if len(invalid_options) > 0:
logger.warning(
f"Options {invalid_options} were not used because they were not supported by the read or write method configured for this source. "
"Check if you expected any of those to have been used by the operation!"
)
return filtered_options
def read_with_valid_options(func):
@wraps(func)
def _(*args, **kwargs):
if callable(options):
return func(*args, **_filter_out_irrelevant_options(kwargs, args_of(options)))
return func(*args, **_filter_out_irrelevant_options(kwargs, options))
return _
return read_with_valid_options
def args_of(func):
"""Retrieve allowed options for a given function.
Args:
func: A function like, e.g., pd.read_csv
Returns:
A set of allowed options
"""
return set(inspect.signature(func).parameters.keys())
def get_string_template_field_names(s: str) -> Collection[str]: # pylint: disable=C0103
"""Given a string `s`, it parses the string to identify any template fields and returns the names of those fields.
If `s` is not a string template, the returned `Collection` is empty.
Args:
s:
Returns:
Collection[str]
Example:
>>> get_string_template_field_names("abc{def}{efg}")
["def", "efg"]
>>> get_string_template_field_names("{0}-{1}")
["0", "1"]
>>> get_string_template_field_names("hello world")
[]
"""
# string.Formatter.parse returns a 4-tuple of:
# `literal_text`, `field_name`, `form_at_spec`, `conversion`
# More info here https://docs.python.org/3.8/library/string.html#string.Formatter.parse
field_names = [group[1] for group in string.Formatter().parse(s) if group[1] is not None]
return field_names
def resolve_template(path: str, options: MutableMapping[str, Any]) -> str: # pylint: disable=C0103
"""Given a string `path`, it attempts to replace all templates fields with values provided in `options`.
If `path` is not a string template, `path` is returned.
Args:
path: A string which is either a template, e.g. /path/to/file/{replace_me}.h5 or just a path /path/to/file/dont_replace_me.h5
options: A dynamic name for the "replace_me" field in the templated string. e.g. {"replace_me": "name_of_file"}
Returns:
str: Returns a static path replaced with the value in the options mapping.
Raises:
ValueError: if any template fields in s are not named using valid Python identifiers
ValueError: if a given template field cannot be resolved in `options`
"""
fields = get_string_template_field_names(path)
if len(fields) == 0:
return path
if not all(field.isidentifier() for field in fields):
raise ValueError(f"Expected valid Python identifiers, found {fields}")
if not all(field in options for field in fields):
raise ValueError(f"Expected values for all fields in {fields}, found {list(options.keys())}")
path = path.format(**{field: options[field] for field in fields})
for field in fields:
options.pop(field)
return path
@contextmanager
def pickle_protocol(protocol: Optional[int]):
"""Downgrade to the provided pickle protocol within the context manager.
Args:
protocol: The number of the protocol HIGHEST_PROTOCOL to downgrade to. Defaults to 4, which covers python 3.4 and higher.
"""
import pickle # pylint: disable=import-outside-toplevel
previous = pickle.HIGHEST_PROTOCOL
try:
pickle.HIGHEST_PROTOCOL = 4
if protocol:
pickle.HIGHEST_PROTOCOL = protocol
yield
finally:
pickle.HIGHEST_PROTOCOL = previous
Functions
def allow_options(options: Union[Iterable[+T_co], function, method])
-
Validate **options for a decorated reader function.
Args
options
- A set of valid options for a reader (e.g.
pandas.read_parquet
orpandas.read_csv
)
Returns
read_with_valid_options
- The input function called with modified options.
Expand source code
def allow_options(options: Union[Iterable, FunctionType, MethodType]): """Validate **options for a decorated reader function. Args: options: A set of valid options for a reader (e.g. `pandas.read_parquet` or `pandas.read_csv`) Returns: read_with_valid_options: The input function called with modified options. """ def _filter_out_irrelevant_options(kwargs: Mapping, valid_options: Iterable): filtered_options = {} invalid_options = {} for key_arg in kwargs.keys(): if key_arg in valid_options: filtered_options[key_arg] = kwargs[key_arg] else: invalid_options[key_arg] = kwargs[key_arg] if len(invalid_options) > 0: logger.warning( f"Options {invalid_options} were not used because they were not supported by the read or write method configured for this source. " "Check if you expected any of those to have been used by the operation!" ) return filtered_options def read_with_valid_options(func): @wraps(func) def _(*args, **kwargs): if callable(options): return func(*args, **_filter_out_irrelevant_options(kwargs, args_of(options))) return func(*args, **_filter_out_irrelevant_options(kwargs, options)) return _ return read_with_valid_options
def args_of(func)
-
Retrieve allowed options for a given function.
Args
func
- A function like, e.g., pd.read_csv
Returns
A set of allowed options
Expand source code
def args_of(func): """Retrieve allowed options for a given function. Args: func: A function like, e.g., pd.read_csv Returns: A set of allowed options """ return set(inspect.signature(func).parameters.keys())
def get_string_template_field_names(s: str) ‑> Collection[str]
-
Given a string
s
, it parses the string to identify any template fields and returns the names of those fields.If
s
is not a string template, the returnedCollection
is empty.Args
s:
Returns
Collection[str]
Example
>>> get_string_template_field_names("abc{def}{efg}") ["def", "efg"] >>> get_string_template_field_names("{0}-{1}") ["0", "1"] >>> get_string_template_field_names("hello world") []
Expand source code
def get_string_template_field_names(s: str) -> Collection[str]: # pylint: disable=C0103 """Given a string `s`, it parses the string to identify any template fields and returns the names of those fields. If `s` is not a string template, the returned `Collection` is empty. Args: s: Returns: Collection[str] Example: >>> get_string_template_field_names("abc{def}{efg}") ["def", "efg"] >>> get_string_template_field_names("{0}-{1}") ["0", "1"] >>> get_string_template_field_names("hello world") [] """ # string.Formatter.parse returns a 4-tuple of: # `literal_text`, `field_name`, `form_at_spec`, `conversion` # More info here https://docs.python.org/3.8/library/string.html#string.Formatter.parse field_names = [group[1] for group in string.Formatter().parse(s) if group[1] is not None] return field_names
def pickle_protocol(protocol: Optional[int])
-
Downgrade to the provided pickle protocol within the context manager.
Args
protocol
- The number of the protocol HIGHEST_PROTOCOL to downgrade to. Defaults to 4, which covers python 3.4 and higher.
Expand source code
@contextmanager def pickle_protocol(protocol: Optional[int]): """Downgrade to the provided pickle protocol within the context manager. Args: protocol: The number of the protocol HIGHEST_PROTOCOL to downgrade to. Defaults to 4, which covers python 3.4 and higher. """ import pickle # pylint: disable=import-outside-toplevel previous = pickle.HIGHEST_PROTOCOL try: pickle.HIGHEST_PROTOCOL = 4 if protocol: pickle.HIGHEST_PROTOCOL = protocol yield finally: pickle.HIGHEST_PROTOCOL = previous
def resolve_template(path: str, options: MutableMapping[str, Any]) ‑> str
-
Given a string
path
, it attempts to replace all templates fields with values provided inoptions
.If
path
is not a string template,path
is returned.Args
path
- A string which is either a template, e.g. /path/to/file/{replace_me}.h5 or just a path /path/to/file/dont_replace_me.h5
options
- A dynamic name for the "replace_me" field in the templated string. e.g. {"replace_me": "name_of_file"}
Returns
str
- Returns a static path replaced with the value in the options mapping.
Raises
ValueError
- if any template fields in s are not named using valid Python identifiers
ValueError
- if a given template field cannot be resolved in
options
Expand source code
def resolve_template(path: str, options: MutableMapping[str, Any]) -> str: # pylint: disable=C0103 """Given a string `path`, it attempts to replace all templates fields with values provided in `options`. If `path` is not a string template, `path` is returned. Args: path: A string which is either a template, e.g. /path/to/file/{replace_me}.h5 or just a path /path/to/file/dont_replace_me.h5 options: A dynamic name for the "replace_me" field in the templated string. e.g. {"replace_me": "name_of_file"} Returns: str: Returns a static path replaced with the value in the options mapping. Raises: ValueError: if any template fields in s are not named using valid Python identifiers ValueError: if a given template field cannot be resolved in `options` """ fields = get_string_template_field_names(path) if len(fields) == 0: return path if not all(field.isidentifier() for field in fields): raise ValueError(f"Expected valid Python identifiers, found {fields}") if not all(field in options for field in fields): raise ValueError(f"Expected values for all fields in {fields}, found {list(options.keys())}") path = path.format(**{field: options[field] for field in fields}) for field in fields: options.pop(field) return path