Module dynamicio.mixins.with_s3
This module provides mixins that are providing S3 I/O support.
Expand source code
# pylint: disable=no-member, protected-access, too-few-public-methods
"""This module provides mixins that are providing S3 I/O support."""
import dataclasses
import io
import os
import tempfile
import urllib.parse
import uuid
from contextlib import contextmanager
from typing import IO, Generator, List, Optional, Union # noqa: I101
import boto3 # type: ignore
import pandas as pd
import s3transfer.futures # type: ignore
import tables # type: ignore
from awscli.clidriver import create_clidriver # type: ignore
from magic_logger import logger
from pandas import DataFrame, Series
from dynamicio.config.pydantic import DataframeSchema, S3DataEnvironment, S3PathPrefixEnvironment
from dynamicio.mixins import utils, with_local
class InMemStore(pd.io.pytables.HDFStore):
"""A subclass of pandas HDFStore that does not manage the pytables File object."""
_in_mem_table = None
def __init__(self, path: str, table: tables.File, mode: str = "r"):
"""Create a new HDFStore object."""
self._in_mem_table = table
super().__init__(path=path, mode=mode)
def open(self, *_args, **_kwargs):
"""Open the in-memory table."""
pd.io.pytables._tables()
self._handle = self._in_mem_table
def close(self, *_args, **_kwargs):
"""Close the in-memory table."""
@property
def is_open(self):
"""Check if the in-memory table is open."""
return self._handle is not None
class HdfIO:
"""Class providing stream support for HDF tables."""
@contextmanager
def create_file(self, label: str, mode: str, data: Optional[bytes] = None) -> Generator[tables.File, None, None]:
"""Create an in-memory pytables table."""
extra_kw = {}
if data:
extra_kw["driver_core_image"] = data
file_handle = tables.File(f"{label}_{uuid.uuid4()}.h5", mode, title=label, root_uep="/", filters=None, driver="H5FD_CORE", driver_core_backing_store=0, **extra_kw)
try:
yield file_handle
finally:
file_handle.close()
def load(self, fobj: IO[bytes], label: str = "unknown_file.h5") -> Union[DataFrame, Series]:
"""Load the dataframe from an file-like object."""
with self.create_file(label, mode="r", data=fobj.read()) as file_handle:
return pd.read_hdf(InMemStore(label, file_handle))
def save(self, df: DataFrame, fobj: IO[bytes], label: str = "unknown_file.h5", options: Optional[dict] = None):
"""Load the dataframe to a file-like object."""
if not options:
options = {}
with self.create_file(label, mode="w", data=fobj.read()) as file_handle:
store = InMemStore(path=label, table=file_handle, mode="w")
store.put(key="df", value=df, **options)
fobj.write(file_handle.get_file_image())
def awscli_runner(*cmd: str):
"""Runs the awscli command provided.
Args:
*cmd: A list of args used in the command.
Raises:
A runtime error exception is raised if download fails.
Example:
>>> awscli_runner("s3", "sync", "s3://mock-bucket/mock-key", ".")
"""
# Run
exit_code = create_clidriver().main(cmd)
if exit_code > 0:
raise RuntimeError(f"AWS CLI exited with code {exit_code}")
@dataclasses.dataclass
class S3TransferHandle:
"""A dataclass used to track an ongoing data download from the s3."""
s3_object: object # boto3.resource('s3').ObjectSummary
fobj: IO[bytes] # file-like object the data is being downloaded to
done_future: s3transfer.futures.BaseTransferFuture
class WithS3PathPrefix(with_local.WithLocal):
"""Handles I/O operations for AWS S3; implements read operations only.
This mixin assumes that the directories it reads from will only contain a single file-type.
"""
sources_config: S3PathPrefixEnvironment # type: ignore
schema: DataframeSchema
boto3_resource = boto3.resource("s3")
boto3_client = boto3.client("s3")
def _write_to_s3_path_prefix(self, df: pd.DataFrame):
"""Write a DataFrame to an S3 path prefix.
The configuration object is expected to have the following keys:
- `bucket`
- `path_prefix`
- `file_type`
Args:
df (pd.DataFrame): the DataFrame to be written to S3
Raises:
ValueError: In case `path_prefix` is missing from config
ValueError: In case the `partition_cols` arg is missing while trying to write a parquet file
"""
s3_config = self.sources_config.s3
file_type = s3_config.file_type
if file_type != "parquet":
raise ValueError(f"File type not supported: {file_type}, only parquet files can be written to an S3 key")
if "partition_cols" not in self.options:
raise ValueError("`partition_cols` is required as an option to write partitioned parquet files to S3")
bucket = s3_config.bucket
path_prefix = s3_config.path_prefix
full_path_prefix = utils.resolve_template(f"s3://{bucket}/{path_prefix}", self.options)
with tempfile.TemporaryDirectory() as temp_dir:
self._write_parquet_file(df, temp_dir, **self.options)
awscli_runner(
"s3",
"sync",
temp_dir,
full_path_prefix,
"--acl",
"bucket-owner-full-control",
"--only-show-errors",
"--exact-timestamps",
)
def _read_from_s3_path_prefix(self) -> pd.DataFrame:
"""Read all files under a path prefix from an S3 bucket as a `DataFrame`.
The configuration object is expected to have the following keys:
- `bucket`
- `path_prefix`
- `file_type`
To actually read the file, a method is dynamically invoked by name, using
"_read_{file_type}_path_prefix".
Returns:
DataFrame
"""
s3_config = self.sources_config.s3
file_type = s3_config.file_type
if file_type not in {"parquet", "csv", "hdf", "json"}:
raise ValueError(f"File type not supported: {file_type}")
bucket = s3_config.bucket
path_prefix = s3_config.path_prefix
full_path_prefix = utils.resolve_template(f"s3://{bucket}/{path_prefix}", self.options)
# The `no_disk_space` option should be used only when reading a subset of columns from S3
if self.options.pop("no_disk_space", False):
if file_type == "parquet":
return self._read_parquet_file(full_path_prefix, self.schema, **self.options)
if file_type == "hdf":
dfs: List[DataFrame] = []
for fobj in self._iter_s3_files(full_path_prefix, file_ext=".h5", max_memory_use=1024**3): # 1 gib
dfs.append(HdfIO().load(fobj))
df = pd.concat(dfs, ignore_index=True)
columns = [column for column in df.columns.to_list() if column in self.schema.columns.keys()]
return df[columns]
with tempfile.TemporaryDirectory() as temp_dir:
# aws-cli is shown to be up to 6 times faster when downloading the complete dataset from S3 than using the boto3
# client or pandas directly. This is because aws-cli uses the parallel downloader, which is much faster than the
# boto3 client.
awscli_runner(
"s3",
"sync",
full_path_prefix,
temp_dir,
"--acl",
"bucket-owner-full-control",
"--only-show-errors",
"--exact-timestamps",
)
dfs: List[DataFrame] = []
for file in os.listdir(temp_dir):
df = getattr(self, f"_read_{file_type}_file")(os.path.join(temp_dir, file), self.schema, **self.options) # type: ignore
if len(df) > 0:
dfs.append(df)
return pd.concat(dfs, ignore_index=True)
def _iter_s3_files(self, s3_prefix: str, file_ext: Optional[str] = None, max_memory_use: int = -1) -> Generator[IO[bytes], None, None]: # pylint: disable=too-many-locals
"""Download sways of S3 objects.
Args:
s3_prefix: s3 url to fetch objects with
file_ext: extension of s3 objects to allow through
max_memory_use: The approximate number of bytes to allocate on each yield of Generator
"""
parsed_url = urllib.parse.urlparse(s3_prefix)
assert parsed_url.scheme == "s3", f"{s3_prefix!r} should be an s3 url"
bucket_name = parsed_url.netloc
file_prefix = f"{parsed_url.path.strip('/')}/"
s3_objects_to_fetch = []
# Collect objects to be loaded
for s3_object in self.boto3_resource.Bucket(bucket_name).objects.filter(Prefix=file_prefix):
good_object = (not file_ext) or (s3_object.key.endswith(file_ext))
if good_object:
s3_objects_to_fetch.append(s3_object)
if max_memory_use < 0:
# Unlimited memory use - fetch ALL
max_memory_use = sum(s3_obj.size for s3_obj in s3_objects_to_fetch) * 2
transfer_config = boto3.s3.transfer.TransferConfig(max_concurrency=20)
while s3_objects_to_fetch:
mem_use_left = max_memory_use
handles = []
with boto3.s3.transfer.create_transfer_manager(self.boto3_client, transfer_config) as transfer_manager:
while mem_use_left > 0 and s3_objects_to_fetch:
s3_object = s3_objects_to_fetch.pop()
fobj = io.BytesIO()
future = transfer_manager.download(bucket_name, s3_object.key, fobj)
handles.append(S3TransferHandle(s3_object, fobj, future))
mem_use_left -= s3_object.size
# Leaving the `transfer_manager` context implicitly waits for all downloads to complete
# Rewind and yield all fobjs
for handle in handles:
handle.fobj.seek(0)
yield handle.fobj
class WithS3File(with_local.WithLocal):
"""Handles I/O operations for AWS S3.
All files are persisted to disk first using boto3 as this has proven to be faster than reading them into memory.
Note that reading things into memory is available for csv, json and parquet types only. Unfortunately, until support
for generic buffer is added to read_hdf, we need to download and persists the file to disk first anyway.
Options:
no_disk_space: If `True`, then s3fs + fsspec will be used to read data directly into memory.
"""
sources_config: S3DataEnvironment # type: ignore
schema: DataframeSchema
boto3_client = boto3.client("s3")
@contextmanager
def _s3_named_file_reader(self, s3_bucket: str, s3_key: str) -> Generator:
"""Contextmanager to abstract reading different file types in S3.
This implementation saves the downloaded data to a temporary file.
Args:
s3_bucket: The S3 bucket from where to read the file.
s3_key: The file-path to the target file to be read.
Returns:
The local file path from where the file can be read, once it has been downloaded there by the boto3.client.
"""
with tempfile.NamedTemporaryFile("wb") as target_file:
# Download the file from S3
self.boto3_client.download_fileobj(s3_bucket, s3_key, target_file)
# Yield local file path to body of `with` statement
target_file.flush()
yield target_file
@contextmanager
def _s3_reader(self, s3_bucket: str, s3_key: str) -> Generator[io.BytesIO, None, None]:
"""Contextmanager to abstract reading different file types in S3.
This implementation only retains data in-memory, avoiding creating any temp files.
Args:
s3_bucket: The S3 bucket from where to read the file.
s3_key: The file-path to the target file to be read.
Returns:
The local file path from where the file can be read, once it has been downloaded there by the boto3.client.
"""
fobj = io.BytesIO()
# Download the file from S3
self.boto3_client.download_fileobj(s3_bucket, s3_key, fobj)
# Yield the buffer
fobj.seek(0)
yield fobj
@contextmanager
def _s3_writer(self, s3_bucket: str, s3_key: str) -> Generator[IO[bytes], None, None]:
"""Contextmanager to abstract loading different file types to S3.
Args:
s3_bucket: The S3 bucket to upload the file to.
s3_key: The file-path where the target file should be uploaded to.
Returns:
The local file path where to actually write the file, to be read and uploaded by boto3.client.
"""
fobj = io.BytesIO()
yield fobj
fobj.seek(0)
self.boto3_client.upload_fileobj(fobj, s3_bucket, s3_key, ExtraArgs={"ACL": "bucket-owner-full-control"})
def _read_from_s3_file(self) -> pd.DataFrame:
"""Read a file from an S3 bucket as a `DataFrame`.
The configuration object is expected to have the following keys:
- `bucket`
- `file_path`
- `file_type`
To actually read the file, a method is dynamically invoked by name, using "_read_{file_type}_file".
Returns:
DataFrame
"""
s3_config = self.sources_config.s3
file_type = s3_config.file_type
file_path = utils.resolve_template(s3_config.file_path, self.options)
bucket = s3_config.bucket
logger.info(f"[s3] Started downloading: s3://{s3_config.bucket}/{file_path}")
if self.options.pop("no_disk_space", None):
no_disk_space_rv = None
if file_type in ["csv", "json", "parquet"]:
no_disk_space_rv = getattr(self, f"_read_{file_type}_file")(f"s3://{s3_config.bucket}/{file_path}", self.schema, **self.options) # type: ignore
elif file_type == "hdf":
with self._s3_reader(s3_bucket=bucket, s3_key=file_path) as fobj: # type: ignore
no_disk_space_rv = HdfIO().load(fobj) # type: ignore
else:
raise NotImplementedError(f"Unsupported file type {file_type!r}.")
if no_disk_space_rv is not None:
return no_disk_space_rv
with self._s3_named_file_reader(s3_bucket=bucket, s3_key=file_path) as target_file: # type: ignore
return getattr(self, f"_read_{file_type}_file")(target_file.name, self.schema, **self.options) # type: ignore
def _write_to_s3_file(self, df: pd.DataFrame):
"""Write a dataframe to s3 based on the {file_type} of the config_io configuration.
The configuration object is expected to have two keys:
- `file_path`
- `file_type`
To actually write the file, a method is dynamically invoked by name, using "_write_{file_type}_file".
Args:
df: The dataframe to be written out
"""
s3_config = self.sources_config.s3
bucket = s3_config.bucket
file_path = utils.resolve_template(s3_config.file_path, self.options)
file_type = s3_config.file_type
logger.info(f"[s3] Started uploading: s3://{bucket}/{file_path}")
if file_type in ["csv", "json", "parquet"]:
getattr(self, f"_write_{file_type}_file")(df, f"s3://{bucket}/{file_path}", **self.options) # type: ignore
elif file_type == "hdf":
hdf_options = dict(self.options)
pickle_protocol = hdf_options.pop("pickle_protocol", None)
with self._s3_writer(s3_bucket=s3_config.bucket, s3_key=file_path) as target_file, utils.pickle_protocol(protocol=pickle_protocol):
HdfIO().save(df, target_file, hdf_options) # type: ignore
else:
raise ValueError(f"File type: {file_type} not supported!")
logger.info(f"[s3] Finished uploading: s3://{bucket}/{file_path}")
Functions
def awscli_runner(*cmd: str)
-
Runs the awscli command provided.
Args
*cmd
- A list of args used in the command.
Raises
A runtime error exception is raised if download fails.
Example
>>> awscli_runner("s3", "sync", "s3://mock-bucket/mock-key", ".")
Expand source code
def awscli_runner(*cmd: str): """Runs the awscli command provided. Args: *cmd: A list of args used in the command. Raises: A runtime error exception is raised if download fails. Example: >>> awscli_runner("s3", "sync", "s3://mock-bucket/mock-key", ".") """ # Run exit_code = create_clidriver().main(cmd) if exit_code > 0: raise RuntimeError(f"AWS CLI exited with code {exit_code}")
Classes
class HdfIO
-
Class providing stream support for HDF tables.
Expand source code
class HdfIO: """Class providing stream support for HDF tables.""" @contextmanager def create_file(self, label: str, mode: str, data: Optional[bytes] = None) -> Generator[tables.File, None, None]: """Create an in-memory pytables table.""" extra_kw = {} if data: extra_kw["driver_core_image"] = data file_handle = tables.File(f"{label}_{uuid.uuid4()}.h5", mode, title=label, root_uep="/", filters=None, driver="H5FD_CORE", driver_core_backing_store=0, **extra_kw) try: yield file_handle finally: file_handle.close() def load(self, fobj: IO[bytes], label: str = "unknown_file.h5") -> Union[DataFrame, Series]: """Load the dataframe from an file-like object.""" with self.create_file(label, mode="r", data=fobj.read()) as file_handle: return pd.read_hdf(InMemStore(label, file_handle)) def save(self, df: DataFrame, fobj: IO[bytes], label: str = "unknown_file.h5", options: Optional[dict] = None): """Load the dataframe to a file-like object.""" if not options: options = {} with self.create_file(label, mode="w", data=fobj.read()) as file_handle: store = InMemStore(path=label, table=file_handle, mode="w") store.put(key="df", value=df, **options) fobj.write(file_handle.get_file_image())
Methods
def create_file(self, label: str, mode: str, data: Optional[bytes] = None) ‑> Generator[tables.file.File, None, None]
-
Create an in-memory pytables table.
Expand source code
@contextmanager def create_file(self, label: str, mode: str, data: Optional[bytes] = None) -> Generator[tables.File, None, None]: """Create an in-memory pytables table.""" extra_kw = {} if data: extra_kw["driver_core_image"] = data file_handle = tables.File(f"{label}_{uuid.uuid4()}.h5", mode, title=label, root_uep="/", filters=None, driver="H5FD_CORE", driver_core_backing_store=0, **extra_kw) try: yield file_handle finally: file_handle.close()
def load(self, fobj: IO[bytes], label: str = 'unknown_file.h5') ‑> Union[pandas.core.frame.DataFrame, pandas.core.series.Series]
-
Load the dataframe from an file-like object.
Expand source code
def load(self, fobj: IO[bytes], label: str = "unknown_file.h5") -> Union[DataFrame, Series]: """Load the dataframe from an file-like object.""" with self.create_file(label, mode="r", data=fobj.read()) as file_handle: return pd.read_hdf(InMemStore(label, file_handle))
def save(self, df: pandas.core.frame.DataFrame, fobj: IO[bytes], label: str = 'unknown_file.h5', options: Optional[dict] = None)
-
Load the dataframe to a file-like object.
Expand source code
def save(self, df: DataFrame, fobj: IO[bytes], label: str = "unknown_file.h5", options: Optional[dict] = None): """Load the dataframe to a file-like object.""" if not options: options = {} with self.create_file(label, mode="w", data=fobj.read()) as file_handle: store = InMemStore(path=label, table=file_handle, mode="w") store.put(key="df", value=df, **options) fobj.write(file_handle.get_file_image())
class InMemStore (path: str, table: tables.file.File, mode: str = 'r')
-
A subclass of pandas HDFStore that does not manage the pytables File object.
Create a new HDFStore object.
Expand source code
class InMemStore(pd.io.pytables.HDFStore): """A subclass of pandas HDFStore that does not manage the pytables File object.""" _in_mem_table = None def __init__(self, path: str, table: tables.File, mode: str = "r"): """Create a new HDFStore object.""" self._in_mem_table = table super().__init__(path=path, mode=mode) def open(self, *_args, **_kwargs): """Open the in-memory table.""" pd.io.pytables._tables() self._handle = self._in_mem_table def close(self, *_args, **_kwargs): """Close the in-memory table.""" @property def is_open(self): """Check if the in-memory table is open.""" return self._handle is not None
Ancestors
- pandas.io.pytables.HDFStore
Instance variables
var is_open
-
Check if the in-memory table is open.
Expand source code
@property def is_open(self): """Check if the in-memory table is open.""" return self._handle is not None
Methods
def close(self, *_args, **_kwargs)
-
Close the in-memory table.
Expand source code
def close(self, *_args, **_kwargs): """Close the in-memory table."""
def open(self, *_args, **_kwargs)
-
Open the in-memory table.
Expand source code
def open(self, *_args, **_kwargs): """Open the in-memory table.""" pd.io.pytables._tables() self._handle = self._in_mem_table
class S3TransferHandle (s3_object: object, fobj: IO[bytes], done_future: s3transfer.futures.BaseTransferFuture)
-
A dataclass used to track an ongoing data download from the s3.
Expand source code
class S3TransferHandle: """A dataclass used to track an ongoing data download from the s3.""" s3_object: object # boto3.resource('s3').ObjectSummary fobj: IO[bytes] # file-like object the data is being downloaded to done_future: s3transfer.futures.BaseTransferFuture
Class variables
var done_future : s3transfer.futures.BaseTransferFuture
var fobj : IO[bytes]
var s3_object : object
class WithS3File
-
Handles I/O operations for AWS S3.
All files are persisted to disk first using boto3 as this has proven to be faster than reading them into memory. Note that reading things into memory is available for csv, json and parquet types only. Unfortunately, until support for generic buffer is added to read_hdf, we need to download and persists the file to disk first anyway.
Options
no_disk_space: If
True
, then s3fs + fsspec will be used to read data directly into memory.Expand source code
class WithS3File(with_local.WithLocal): """Handles I/O operations for AWS S3. All files are persisted to disk first using boto3 as this has proven to be faster than reading them into memory. Note that reading things into memory is available for csv, json and parquet types only. Unfortunately, until support for generic buffer is added to read_hdf, we need to download and persists the file to disk first anyway. Options: no_disk_space: If `True`, then s3fs + fsspec will be used to read data directly into memory. """ sources_config: S3DataEnvironment # type: ignore schema: DataframeSchema boto3_client = boto3.client("s3") @contextmanager def _s3_named_file_reader(self, s3_bucket: str, s3_key: str) -> Generator: """Contextmanager to abstract reading different file types in S3. This implementation saves the downloaded data to a temporary file. Args: s3_bucket: The S3 bucket from where to read the file. s3_key: The file-path to the target file to be read. Returns: The local file path from where the file can be read, once it has been downloaded there by the boto3.client. """ with tempfile.NamedTemporaryFile("wb") as target_file: # Download the file from S3 self.boto3_client.download_fileobj(s3_bucket, s3_key, target_file) # Yield local file path to body of `with` statement target_file.flush() yield target_file @contextmanager def _s3_reader(self, s3_bucket: str, s3_key: str) -> Generator[io.BytesIO, None, None]: """Contextmanager to abstract reading different file types in S3. This implementation only retains data in-memory, avoiding creating any temp files. Args: s3_bucket: The S3 bucket from where to read the file. s3_key: The file-path to the target file to be read. Returns: The local file path from where the file can be read, once it has been downloaded there by the boto3.client. """ fobj = io.BytesIO() # Download the file from S3 self.boto3_client.download_fileobj(s3_bucket, s3_key, fobj) # Yield the buffer fobj.seek(0) yield fobj @contextmanager def _s3_writer(self, s3_bucket: str, s3_key: str) -> Generator[IO[bytes], None, None]: """Contextmanager to abstract loading different file types to S3. Args: s3_bucket: The S3 bucket to upload the file to. s3_key: The file-path where the target file should be uploaded to. Returns: The local file path where to actually write the file, to be read and uploaded by boto3.client. """ fobj = io.BytesIO() yield fobj fobj.seek(0) self.boto3_client.upload_fileobj(fobj, s3_bucket, s3_key, ExtraArgs={"ACL": "bucket-owner-full-control"}) def _read_from_s3_file(self) -> pd.DataFrame: """Read a file from an S3 bucket as a `DataFrame`. The configuration object is expected to have the following keys: - `bucket` - `file_path` - `file_type` To actually read the file, a method is dynamically invoked by name, using "_read_{file_type}_file". Returns: DataFrame """ s3_config = self.sources_config.s3 file_type = s3_config.file_type file_path = utils.resolve_template(s3_config.file_path, self.options) bucket = s3_config.bucket logger.info(f"[s3] Started downloading: s3://{s3_config.bucket}/{file_path}") if self.options.pop("no_disk_space", None): no_disk_space_rv = None if file_type in ["csv", "json", "parquet"]: no_disk_space_rv = getattr(self, f"_read_{file_type}_file")(f"s3://{s3_config.bucket}/{file_path}", self.schema, **self.options) # type: ignore elif file_type == "hdf": with self._s3_reader(s3_bucket=bucket, s3_key=file_path) as fobj: # type: ignore no_disk_space_rv = HdfIO().load(fobj) # type: ignore else: raise NotImplementedError(f"Unsupported file type {file_type!r}.") if no_disk_space_rv is not None: return no_disk_space_rv with self._s3_named_file_reader(s3_bucket=bucket, s3_key=file_path) as target_file: # type: ignore return getattr(self, f"_read_{file_type}_file")(target_file.name, self.schema, **self.options) # type: ignore def _write_to_s3_file(self, df: pd.DataFrame): """Write a dataframe to s3 based on the {file_type} of the config_io configuration. The configuration object is expected to have two keys: - `file_path` - `file_type` To actually write the file, a method is dynamically invoked by name, using "_write_{file_type}_file". Args: df: The dataframe to be written out """ s3_config = self.sources_config.s3 bucket = s3_config.bucket file_path = utils.resolve_template(s3_config.file_path, self.options) file_type = s3_config.file_type logger.info(f"[s3] Started uploading: s3://{bucket}/{file_path}") if file_type in ["csv", "json", "parquet"]: getattr(self, f"_write_{file_type}_file")(df, f"s3://{bucket}/{file_path}", **self.options) # type: ignore elif file_type == "hdf": hdf_options = dict(self.options) pickle_protocol = hdf_options.pop("pickle_protocol", None) with self._s3_writer(s3_bucket=s3_config.bucket, s3_key=file_path) as target_file, utils.pickle_protocol(protocol=pickle_protocol): HdfIO().save(df, target_file, hdf_options) # type: ignore else: raise ValueError(f"File type: {file_type} not supported!") logger.info(f"[s3] Finished uploading: s3://{bucket}/{file_path}")
Ancestors
Subclasses
Class variables
var boto3_client
var schema : DataframeSchema
var sources_config : S3DataEnvironment
class WithS3PathPrefix
-
Handles I/O operations for AWS S3; implements read operations only.
This mixin assumes that the directories it reads from will only contain a single file-type.
Expand source code
class WithS3PathPrefix(with_local.WithLocal): """Handles I/O operations for AWS S3; implements read operations only. This mixin assumes that the directories it reads from will only contain a single file-type. """ sources_config: S3PathPrefixEnvironment # type: ignore schema: DataframeSchema boto3_resource = boto3.resource("s3") boto3_client = boto3.client("s3") def _write_to_s3_path_prefix(self, df: pd.DataFrame): """Write a DataFrame to an S3 path prefix. The configuration object is expected to have the following keys: - `bucket` - `path_prefix` - `file_type` Args: df (pd.DataFrame): the DataFrame to be written to S3 Raises: ValueError: In case `path_prefix` is missing from config ValueError: In case the `partition_cols` arg is missing while trying to write a parquet file """ s3_config = self.sources_config.s3 file_type = s3_config.file_type if file_type != "parquet": raise ValueError(f"File type not supported: {file_type}, only parquet files can be written to an S3 key") if "partition_cols" not in self.options: raise ValueError("`partition_cols` is required as an option to write partitioned parquet files to S3") bucket = s3_config.bucket path_prefix = s3_config.path_prefix full_path_prefix = utils.resolve_template(f"s3://{bucket}/{path_prefix}", self.options) with tempfile.TemporaryDirectory() as temp_dir: self._write_parquet_file(df, temp_dir, **self.options) awscli_runner( "s3", "sync", temp_dir, full_path_prefix, "--acl", "bucket-owner-full-control", "--only-show-errors", "--exact-timestamps", ) def _read_from_s3_path_prefix(self) -> pd.DataFrame: """Read all files under a path prefix from an S3 bucket as a `DataFrame`. The configuration object is expected to have the following keys: - `bucket` - `path_prefix` - `file_type` To actually read the file, a method is dynamically invoked by name, using "_read_{file_type}_path_prefix". Returns: DataFrame """ s3_config = self.sources_config.s3 file_type = s3_config.file_type if file_type not in {"parquet", "csv", "hdf", "json"}: raise ValueError(f"File type not supported: {file_type}") bucket = s3_config.bucket path_prefix = s3_config.path_prefix full_path_prefix = utils.resolve_template(f"s3://{bucket}/{path_prefix}", self.options) # The `no_disk_space` option should be used only when reading a subset of columns from S3 if self.options.pop("no_disk_space", False): if file_type == "parquet": return self._read_parquet_file(full_path_prefix, self.schema, **self.options) if file_type == "hdf": dfs: List[DataFrame] = [] for fobj in self._iter_s3_files(full_path_prefix, file_ext=".h5", max_memory_use=1024**3): # 1 gib dfs.append(HdfIO().load(fobj)) df = pd.concat(dfs, ignore_index=True) columns = [column for column in df.columns.to_list() if column in self.schema.columns.keys()] return df[columns] with tempfile.TemporaryDirectory() as temp_dir: # aws-cli is shown to be up to 6 times faster when downloading the complete dataset from S3 than using the boto3 # client or pandas directly. This is because aws-cli uses the parallel downloader, which is much faster than the # boto3 client. awscli_runner( "s3", "sync", full_path_prefix, temp_dir, "--acl", "bucket-owner-full-control", "--only-show-errors", "--exact-timestamps", ) dfs: List[DataFrame] = [] for file in os.listdir(temp_dir): df = getattr(self, f"_read_{file_type}_file")(os.path.join(temp_dir, file), self.schema, **self.options) # type: ignore if len(df) > 0: dfs.append(df) return pd.concat(dfs, ignore_index=True) def _iter_s3_files(self, s3_prefix: str, file_ext: Optional[str] = None, max_memory_use: int = -1) -> Generator[IO[bytes], None, None]: # pylint: disable=too-many-locals """Download sways of S3 objects. Args: s3_prefix: s3 url to fetch objects with file_ext: extension of s3 objects to allow through max_memory_use: The approximate number of bytes to allocate on each yield of Generator """ parsed_url = urllib.parse.urlparse(s3_prefix) assert parsed_url.scheme == "s3", f"{s3_prefix!r} should be an s3 url" bucket_name = parsed_url.netloc file_prefix = f"{parsed_url.path.strip('/')}/" s3_objects_to_fetch = [] # Collect objects to be loaded for s3_object in self.boto3_resource.Bucket(bucket_name).objects.filter(Prefix=file_prefix): good_object = (not file_ext) or (s3_object.key.endswith(file_ext)) if good_object: s3_objects_to_fetch.append(s3_object) if max_memory_use < 0: # Unlimited memory use - fetch ALL max_memory_use = sum(s3_obj.size for s3_obj in s3_objects_to_fetch) * 2 transfer_config = boto3.s3.transfer.TransferConfig(max_concurrency=20) while s3_objects_to_fetch: mem_use_left = max_memory_use handles = [] with boto3.s3.transfer.create_transfer_manager(self.boto3_client, transfer_config) as transfer_manager: while mem_use_left > 0 and s3_objects_to_fetch: s3_object = s3_objects_to_fetch.pop() fobj = io.BytesIO() future = transfer_manager.download(bucket_name, s3_object.key, fobj) handles.append(S3TransferHandle(s3_object, fobj, future)) mem_use_left -= s3_object.size # Leaving the `transfer_manager` context implicitly waits for all downloads to complete # Rewind and yield all fobjs for handle in handles: handle.fobj.seek(0) yield handle.fobj
Ancestors
Subclasses
Class variables
var boto3_client
var boto3_resource
var schema : DataframeSchema
var sources_config : S3PathPrefixEnvironment