Module dynamicio.mixins.with_kafka
This module provides mixins that are providing Kafka I/O support.
Expand source code
# pylint: disable=no-member, protected-access, too-few-public-methods
"""This module provides mixins that are providing Kafka I/O support."""
from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional
import pandas as pd # type: ignore
import simplejson
from kafka import KafkaProducer # type: ignore
from magic_logger import logger
from dynamicio.config.pydantic import DataframeSchema, KafkaDataEnvironment
from dynamicio.mixins import utils
class WithKafka:
"""Handles I/O operations for Kafka.
Args:
- options:
- Standard: Keyword-arguments passed to the KafkaProducer constructor (see `KafkaProducer.DEFAULT_CONFIG.keys()`).
- Additional Options:
- `key_generator: Callable[[Any, Mapping], T]`: defines the keying policy to be used for sending keyed-messages to Kafka. It is a `Callable` that takes a
`tuple(idx, row)` and returns a string that will serve as the message's key, invoked prior to serialising the key. It defaults to the dataframe's index
(which may not be composed of unique values or string type keys). It goes hand in hand with the default `key-serialiser`, which assumes that the keys
are strings and encode's them as such.
- `key_serializer: Callable[T, bytes]`: Custom key serialiser; if not provided, a default key-serializer will be used, applied on a string-key (unless key is None).
N.B. Providing a custom key-generator that generates a non-string key is best provided alongside a custom key-serializer best suited to handle the custom key-type.
- `document_transformer: Callable[[Mapping[Any, Any]`: Manipulates the messages/rows sent to Kafka as values. It is a `Callable` taking a `Mapping` as its only
argument and return a `Mapping`, then this callable will be invoked prior to serializing each document. This can be used, for example, to add metadata to each
document that will be written to the target Kafka topic.
- `value_serializer: Callable[Mapping, bytes]`: Custom value serialiser; if not provided, a default value-serializer will be used applied on a Mapping..
Example:
>>> # Given
>>> keyed_test_df = pd.DataFrame.from_records(
>>> [
>>> ["key-01", "cm_1", "id_1", 1000, "ABC"],
>>> ["key-02", "cm_2", "id_2", 1000, "ABC"],
>>> ["key-03", "cm_3", "id_3", 1000, "ABC"],
>>> ],
>>> columns=["key", "id", "foo", "bar", "baz"],
>>> ).set_index("key")
>>>
>>> kafka_cloud_config = IOConfig(
>>> path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "processed.yaml")),
>>> env_identifier="CLOUD",
>>> dynamic_vars=constants,
>>> ).get(source_key="WRITE_TO_KAFKA_JSON")
>>>
>>> write_kafka_io = WriteKafkaIO(kafka_cloud_config, key_generator=lambda key, _: key, document_transformer=lambda doc: doc["new_field"]="new_value")
>>>
>>> # When
>>> with patch.object(mixins, "KafkaProducer") as mock__kafka_producer:
>>> mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG
>>> mock_producer = MockKafkaProducer()
>>> mock__kafka_producer.return_value = mock_producer
>>> write_kafka_io.write(keyed_test_df)
>>>
>>> # Then
>>> assert mock_producer.my_stream == [
>>> {"key": "key-01", "value": {"bar": 1000, "baz": "ABC", "foo": "id_1", "id": "cm_1", "new_field": "new_value"}},
>>> {"key": "key-02", "value": {"bar": 1000, "baz": "ABC", "foo": "id_2", "id": "cm_2", "new_field": "new_value"}},
>>> {"key": "key-03", "value": {"bar": 1000, "baz": "ABC", "foo": "id_3", "id": "cm_3", "new_field": "new_value"}},
>>> ]
"""
sources_config: KafkaDataEnvironment
schema: DataframeSchema
options: MutableMapping[str, Any]
__kafka_config: Optional[Mapping] = None
__producer: Optional[KafkaProducer] = None
__key_generator: Optional[Callable[[Any, Mapping[Any, Any]], Optional[str]]] = None
__document_transformer: Optional[Callable[[Mapping[Any, Any]], Mapping[Any, Any]]] = None
def _write_to_kafka(self, df: pd.DataFrame) -> None:
"""Given a dataframe where each row is a message to be sent to a Kafka Topic, iterate through all rows and send them to a Kafka topic.
The topic is defined in `self.sources_config["kafka"]` and using a kafka producer, which is flushed at the
end of this process.
Args:
df: A dataframe where each row is a message to be sent to a Kafka Topic.
"""
if self.__key_generator is None:
self.__key_generator = lambda idx, __: idx # default key generator uses the dataframe's index
if self.options.get("key_generator") is not None:
self.__key_generator = self.options.pop("key_generator")
if self.__document_transformer is None:
self.__document_transformer = lambda value: value
if self.options.get("document_transformer") is not None:
self.__document_transformer = self.options.pop("document_transformer")
if self.__producer is None:
self.__producer = self._get_producer(self.sources_config.kafka.kafka_server, **self.options)
self._send_messages(df=df, topic=self.sources_config.kafka.kafka_topic)
@utils.allow_options(KafkaProducer.DEFAULT_CONFIG.keys())
def _get_producer(self, server: str, **options: MutableMapping[str, Any]) -> KafkaProducer:
"""Generate and return a Kafka Producer.
Default options are used to generate the producer. Specifically:
- `bootstrap_servers`: Passed on through the source_config
- `value_serializer`: Uses a default_value_serializer defined in this mixin
More options can be added to the producer by passing them as keyword arguments, through valid options.
These can also override the default options.
Args:
server: The host name.
**options: Keyword arguments to pass to the KafkaProducer.
Returns:
A Kafka producer instance.
"""
self.__kafka_config = {
**{
"bootstrap_servers": server,
"compression_type": "snappy",
"key_serializer": self._default_key_serializer,
"value_serializer": self._default_value_serializer,
},
**options,
}
return KafkaProducer(**self.__kafka_config)
def _send_messages(self, df: pd.DataFrame, topic: str) -> None:
logger.info(f"Sending {len(df)} messages to Kafka topic:{topic}.")
messages = df.reset_index(drop=True).to_dict("records")
for idx, message in zip(df.index.values, messages):
self.__producer.send(topic, key=self.__key_generator(idx, message), value=self.__document_transformer(message)) # type: ignore
self.__producer.flush() # type: ignore
@staticmethod
def _default_key_serializer(key: Optional[str]) -> Optional[bytes]:
if key:
return key.encode("utf-8")
return None
@staticmethod
def _default_value_serializer(value: Mapping) -> bytes:
return simplejson.dumps(value, ignore_nan=True).encode("utf-8")
def _read_from_kafka(self) -> Iterable[Mapping]: # type: ignore
"""Read messages from a Kafka Topic and convert them to separate dataframes.
Returns:
Multiple dataframes, one per message read from the Kafka topic of interest.
"""
# TODO: Implement kafka reader
Classes
class WithKafka
-
Handles I/O operations for Kafka.
Args
- options:
- Standard: Keyword-arguments passed to the KafkaProducer constructor (see
KafkaProducer.DEFAULT_CONFIG.keys()
). -
Additional Options:
-
key_generator: Callable[[Any, Mapping], T]
: defines the keying policy to be used for sending keyed-messages to Kafka. It is aCallable
that takes atuple(idx, row)
and returns a string that will serve as the message's key, invoked prior to serialising the key. It defaults to the dataframe's index (which may not be composed of unique values or string type keys). It goes hand in hand with the defaultkey-serialiser
, which assumes that the keys are strings and encode's them as such. -
key_serializer: Callable[T, bytes]
: Custom key serialiser; if not provided, a default key-serializer will be used, applied on a string-key (unless key is None).
N.B. Providing a custom key-generator that generates a non-string key is best provided alongside a custom key-serializer best suited to handle the custom key-type.
-
document_transformer: Callable[[Mapping[Any, Any]
: Manipulates the messages/rows sent to Kafka as values. It is aCallable
taking aMapping
as its only argument and return aMapping
, then this callable will be invoked prior to serializing each document. This can be used, for example, to add metadata to each document that will be written to the target Kafka topic. -
value_serializer: Callable[Mapping, bytes]
: Custom value serialiser; if not provided, a default value-serializer will be used applied on a Mapping..
-
- Standard: Keyword-arguments passed to the KafkaProducer constructor (see
Example
>>> # Given >>> keyed_test_df = pd.DataFrame.from_records( >>> [ >>> ["key-01", "cm_1", "id_1", 1000, "ABC"], >>> ["key-02", "cm_2", "id_2", 1000, "ABC"], >>> ["key-03", "cm_3", "id_3", 1000, "ABC"], >>> ], >>> columns=["key", "id", "foo", "bar", "baz"], >>> ).set_index("key") >>> >>> kafka_cloud_config = IOConfig( >>> path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "processed.yaml")), >>> env_identifier="CLOUD", >>> dynamic_vars=constants, >>> ).get(source_key="WRITE_TO_KAFKA_JSON") >>> >>> write_kafka_io = WriteKafkaIO(kafka_cloud_config, key_generator=lambda key, _: key, document_transformer=lambda doc: doc["new_field"]="new_value") >>> >>> # When >>> with patch.object(mixins, "KafkaProducer") as mock__kafka_producer: >>> mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG >>> mock_producer = MockKafkaProducer() >>> mock__kafka_producer.return_value = mock_producer >>> write_kafka_io.write(keyed_test_df) >>> >>> # Then >>> assert mock_producer.my_stream == [ >>> {"key": "key-01", "value": {"bar": 1000, "baz": "ABC", "foo": "id_1", "id": "cm_1", "new_field": "new_value"}}, >>> {"key": "key-02", "value": {"bar": 1000, "baz": "ABC", "foo": "id_2", "id": "cm_2", "new_field": "new_value"}}, >>> {"key": "key-03", "value": {"bar": 1000, "baz": "ABC", "foo": "id_3", "id": "cm_3", "new_field": "new_value"}}, >>> ]
Expand source code
class WithKafka: """Handles I/O operations for Kafka. Args: - options: - Standard: Keyword-arguments passed to the KafkaProducer constructor (see `KafkaProducer.DEFAULT_CONFIG.keys()`). - Additional Options: - `key_generator: Callable[[Any, Mapping], T]`: defines the keying policy to be used for sending keyed-messages to Kafka. It is a `Callable` that takes a `tuple(idx, row)` and returns a string that will serve as the message's key, invoked prior to serialising the key. It defaults to the dataframe's index (which may not be composed of unique values or string type keys). It goes hand in hand with the default `key-serialiser`, which assumes that the keys are strings and encode's them as such. - `key_serializer: Callable[T, bytes]`: Custom key serialiser; if not provided, a default key-serializer will be used, applied on a string-key (unless key is None). N.B. Providing a custom key-generator that generates a non-string key is best provided alongside a custom key-serializer best suited to handle the custom key-type. - `document_transformer: Callable[[Mapping[Any, Any]`: Manipulates the messages/rows sent to Kafka as values. It is a `Callable` taking a `Mapping` as its only argument and return a `Mapping`, then this callable will be invoked prior to serializing each document. This can be used, for example, to add metadata to each document that will be written to the target Kafka topic. - `value_serializer: Callable[Mapping, bytes]`: Custom value serialiser; if not provided, a default value-serializer will be used applied on a Mapping.. Example: >>> # Given >>> keyed_test_df = pd.DataFrame.from_records( >>> [ >>> ["key-01", "cm_1", "id_1", 1000, "ABC"], >>> ["key-02", "cm_2", "id_2", 1000, "ABC"], >>> ["key-03", "cm_3", "id_3", 1000, "ABC"], >>> ], >>> columns=["key", "id", "foo", "bar", "baz"], >>> ).set_index("key") >>> >>> kafka_cloud_config = IOConfig( >>> path_to_source_yaml=(os.path.join(constants.TEST_RESOURCES, "processed.yaml")), >>> env_identifier="CLOUD", >>> dynamic_vars=constants, >>> ).get(source_key="WRITE_TO_KAFKA_JSON") >>> >>> write_kafka_io = WriteKafkaIO(kafka_cloud_config, key_generator=lambda key, _: key, document_transformer=lambda doc: doc["new_field"]="new_value") >>> >>> # When >>> with patch.object(mixins, "KafkaProducer") as mock__kafka_producer: >>> mock__kafka_producer.DEFAULT_CONFIG = KafkaProducer.DEFAULT_CONFIG >>> mock_producer = MockKafkaProducer() >>> mock__kafka_producer.return_value = mock_producer >>> write_kafka_io.write(keyed_test_df) >>> >>> # Then >>> assert mock_producer.my_stream == [ >>> {"key": "key-01", "value": {"bar": 1000, "baz": "ABC", "foo": "id_1", "id": "cm_1", "new_field": "new_value"}}, >>> {"key": "key-02", "value": {"bar": 1000, "baz": "ABC", "foo": "id_2", "id": "cm_2", "new_field": "new_value"}}, >>> {"key": "key-03", "value": {"bar": 1000, "baz": "ABC", "foo": "id_3", "id": "cm_3", "new_field": "new_value"}}, >>> ] """ sources_config: KafkaDataEnvironment schema: DataframeSchema options: MutableMapping[str, Any] __kafka_config: Optional[Mapping] = None __producer: Optional[KafkaProducer] = None __key_generator: Optional[Callable[[Any, Mapping[Any, Any]], Optional[str]]] = None __document_transformer: Optional[Callable[[Mapping[Any, Any]], Mapping[Any, Any]]] = None def _write_to_kafka(self, df: pd.DataFrame) -> None: """Given a dataframe where each row is a message to be sent to a Kafka Topic, iterate through all rows and send them to a Kafka topic. The topic is defined in `self.sources_config["kafka"]` and using a kafka producer, which is flushed at the end of this process. Args: df: A dataframe where each row is a message to be sent to a Kafka Topic. """ if self.__key_generator is None: self.__key_generator = lambda idx, __: idx # default key generator uses the dataframe's index if self.options.get("key_generator") is not None: self.__key_generator = self.options.pop("key_generator") if self.__document_transformer is None: self.__document_transformer = lambda value: value if self.options.get("document_transformer") is not None: self.__document_transformer = self.options.pop("document_transformer") if self.__producer is None: self.__producer = self._get_producer(self.sources_config.kafka.kafka_server, **self.options) self._send_messages(df=df, topic=self.sources_config.kafka.kafka_topic) @utils.allow_options(KafkaProducer.DEFAULT_CONFIG.keys()) def _get_producer(self, server: str, **options: MutableMapping[str, Any]) -> KafkaProducer: """Generate and return a Kafka Producer. Default options are used to generate the producer. Specifically: - `bootstrap_servers`: Passed on through the source_config - `value_serializer`: Uses a default_value_serializer defined in this mixin More options can be added to the producer by passing them as keyword arguments, through valid options. These can also override the default options. Args: server: The host name. **options: Keyword arguments to pass to the KafkaProducer. Returns: A Kafka producer instance. """ self.__kafka_config = { **{ "bootstrap_servers": server, "compression_type": "snappy", "key_serializer": self._default_key_serializer, "value_serializer": self._default_value_serializer, }, **options, } return KafkaProducer(**self.__kafka_config) def _send_messages(self, df: pd.DataFrame, topic: str) -> None: logger.info(f"Sending {len(df)} messages to Kafka topic:{topic}.") messages = df.reset_index(drop=True).to_dict("records") for idx, message in zip(df.index.values, messages): self.__producer.send(topic, key=self.__key_generator(idx, message), value=self.__document_transformer(message)) # type: ignore self.__producer.flush() # type: ignore @staticmethod def _default_key_serializer(key: Optional[str]) -> Optional[bytes]: if key: return key.encode("utf-8") return None @staticmethod def _default_value_serializer(value: Mapping) -> bytes: return simplejson.dumps(value, ignore_nan=True).encode("utf-8") def _read_from_kafka(self) -> Iterable[Mapping]: # type: ignore """Read messages from a Kafka Topic and convert them to separate dataframes. Returns: Multiple dataframes, one per message read from the Kafka topic of interest. """ # TODO: Implement kafka reader
Subclasses
Class variables
var options : MutableMapping[str, Any]
var schema : DataframeSchema
var sources_config : KafkaDataEnvironment
- options: