Module dynamicio.cli

Implements the dynamicio Command Line Interface (CLI).

Expand source code
"""Implements the dynamicio Command Line Interface (CLI)."""
import argparse
import glob
import os
import pprint
from typing import Mapping, MutableMapping, Optional, Sequence

import pandas as pd  # type: ignore
import yaml

from dynamicio.errors import InvalidDatasetTypeError


def parse_args(args: Optional[Sequence] = None) -> argparse.Namespace:
    """Arguments parser for dynamicio cli.py.

    Args:
        args: List of args to be parsed. Defaults to None, in which case
            sys.argv[1:] is used.

    Returns:
        An instance of ArgumentParser populated with the provided args.
    """
    parser = argparse.ArgumentParser(prog="dynamicio", description="Generate dataset schemas")
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
        "-b",
        "--batch",
        action="store_true",
        help="flag, used to generate multiple schemas provided a datasets directory.",
    )
    group.add_argument(
        "-s",
        "--single",
        action="store_true",
        help="flag, used to generate a schema provided a single dataset.",
    )
    parser.add_argument("-p", "--path", required=True, help="the path to the dataset/datasets-directory.", type=str)
    parser.add_argument("-o", "--output", required=True, help="the path to the schemas output directory.", type=str)
    return parser.parse_args(args)


def generate_schema_for(dataset: str) -> Mapping:
    """Generate a schema for a dataset.

    Args:
        dataset: The path to the dataset for which we want to generate a schema

    Returns:
        A dictionary containing the schema for the dataset, or None if the dataset is not valid.

    Raises:
        InvalidDatasetTypeError: If the dataset type is not supported by dynamicio.
    """
    dataset_name, file_type = os.path.splitext(os.path.basename(dataset))

    if file_type == ".parquet":
        df = pd.read_parquet(dataset)
    elif file_type == ".csv":
        df = pd.read_csv(dataset)
    elif file_type == ".json":
        df = pd.read_json(dataset)
    elif file_type == ".h5":
        df = pd.read_hdf(dataset)
    else:
        raise InvalidDatasetTypeError(dataset)

    print(f"Generating schema for: {dataset}")
    json_schema: MutableMapping = {"name": dataset_name, "columns": {}}
    for column, d_type in zip(list(df.columns), list(df.dtypes)):
        json_schema["columns"][column] = {"type": "", "validations": {}, "metrics": []}
        json_schema["columns"][column]["type"] = d_type.name

    return json_schema


def main(args: argparse.Namespace):
    """Main function for dynamicio cli.py.

    Args:
        args: Parsed args.
    """
    if args.batch:
        dataset_files = glob.glob(os.path.join(args.path, "*.*"))
        for dataset in dataset_files:
            try:
                json_schema = generate_schema_for(dataset)
            except InvalidDatasetTypeError as exception:
                print(f"Skipping {exception.message}! You may want to remove this file from the datasets directory")
            else:
                with open(os.path.join(args.output, f"{json_schema['name']}.yaml"), "w") as file:  # pylint: disable=unspecified-encoding]
                    file.write("---\n")
                    yaml.safe_dump(json_schema, file)

    if args.single:
        json_schema = generate_schema_for(str(args.path))
        with open(os.path.join(args.output, f"{json_schema['name']}.yaml"), "w") as file:  # pylint: disable=unspecified-encoding]
            file.write("---\n")
            yaml.safe_dump(json_schema, file)
        pprint.pprint(json_schema)


def run():
    """Entry point for the dynamicio cli.py."""
    args = parse_args()
    main(args)

Functions

def generate_schema_for(dataset: str) ‑> Mapping[~KT, +VT_co]

Generate a schema for a dataset.

Args

dataset
The path to the dataset for which we want to generate a schema

Returns

A dictionary containing the schema for the dataset, or None if the dataset is not valid.

Raises

InvalidDatasetTypeError
If the dataset type is not supported by dynamicio.
Expand source code
def generate_schema_for(dataset: str) -> Mapping:
    """Generate a schema for a dataset.

    Args:
        dataset: The path to the dataset for which we want to generate a schema

    Returns:
        A dictionary containing the schema for the dataset, or None if the dataset is not valid.

    Raises:
        InvalidDatasetTypeError: If the dataset type is not supported by dynamicio.
    """
    dataset_name, file_type = os.path.splitext(os.path.basename(dataset))

    if file_type == ".parquet":
        df = pd.read_parquet(dataset)
    elif file_type == ".csv":
        df = pd.read_csv(dataset)
    elif file_type == ".json":
        df = pd.read_json(dataset)
    elif file_type == ".h5":
        df = pd.read_hdf(dataset)
    else:
        raise InvalidDatasetTypeError(dataset)

    print(f"Generating schema for: {dataset}")
    json_schema: MutableMapping = {"name": dataset_name, "columns": {}}
    for column, d_type in zip(list(df.columns), list(df.dtypes)):
        json_schema["columns"][column] = {"type": "", "validations": {}, "metrics": []}
        json_schema["columns"][column]["type"] = d_type.name

    return json_schema
def main(args: argparse.Namespace)

Main function for dynamicio cli.py.

Args

args
Parsed args.
Expand source code
def main(args: argparse.Namespace):
    """Main function for dynamicio cli.py.

    Args:
        args: Parsed args.
    """
    if args.batch:
        dataset_files = glob.glob(os.path.join(args.path, "*.*"))
        for dataset in dataset_files:
            try:
                json_schema = generate_schema_for(dataset)
            except InvalidDatasetTypeError as exception:
                print(f"Skipping {exception.message}! You may want to remove this file from the datasets directory")
            else:
                with open(os.path.join(args.output, f"{json_schema['name']}.yaml"), "w") as file:  # pylint: disable=unspecified-encoding]
                    file.write("---\n")
                    yaml.safe_dump(json_schema, file)

    if args.single:
        json_schema = generate_schema_for(str(args.path))
        with open(os.path.join(args.output, f"{json_schema['name']}.yaml"), "w") as file:  # pylint: disable=unspecified-encoding]
            file.write("---\n")
            yaml.safe_dump(json_schema, file)
        pprint.pprint(json_schema)
def parse_args(args: Optional[Sequence[+T_co]] = None) ‑> argparse.Namespace

Arguments parser for dynamicio cli.py.

Args

args
List of args to be parsed. Defaults to None, in which case sys.argv[1:] is used.

Returns

An instance of ArgumentParser populated with the provided args.

Expand source code
def parse_args(args: Optional[Sequence] = None) -> argparse.Namespace:
    """Arguments parser for dynamicio cli.py.

    Args:
        args: List of args to be parsed. Defaults to None, in which case
            sys.argv[1:] is used.

    Returns:
        An instance of ArgumentParser populated with the provided args.
    """
    parser = argparse.ArgumentParser(prog="dynamicio", description="Generate dataset schemas")
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument(
        "-b",
        "--batch",
        action="store_true",
        help="flag, used to generate multiple schemas provided a datasets directory.",
    )
    group.add_argument(
        "-s",
        "--single",
        action="store_true",
        help="flag, used to generate a schema provided a single dataset.",
    )
    parser.add_argument("-p", "--path", required=True, help="the path to the dataset/datasets-directory.", type=str)
    parser.add_argument("-o", "--output", required=True, help="the path to the schemas output directory.", type=str)
    return parser.parse_args(args)
def run()

Entry point for the dynamicio cli.py.

Expand source code
def run():
    """Entry point for the dynamicio cli.py."""
    args = parse_args()
    main(args)