Source code for sophys.common.utils.kafka.callback

import typing


def default_topic_names():
    return ["test_bluesky_raw_docs"]


def default_bootstrap_servers():
    return ["localhost:9092"]


[docs] def make_kafka_callback( topic_names: typing.Union[str, typing.List[str], callable] = default_topic_names, bootstrap_servers: typing.Union[ str, typing.List[str], callable ] = default_bootstrap_servers, backoff_times: typing.Optional[typing.List[float]] = [0.1, 1.0, 5.0, 20.0, 60.0], ): """ Create a Bluesky document callback, that sends the data created by the RunEngine to one or more Kafka topics, encoded in msgpack. Parameters ---------- topic_names : list of str, or a callable that returns a list of str A list of topic names to send the data to. Defaults to ``test_bluesky_raw_docs``. bootstrap_servers : list of str, or a callable that returns a list of str A list of IPs / hosts to check for the specified topics. Defaults to ``localhost:9092``. backoff_times : list of float, optional A list of times, in seconds, to delay each successive attempt at connecting to a Kafka broker. If a connection fails, it will be retried ``len(backoff_times)`` times, sleeping for ``backoff_times[i]`` seconds between each attempt. If it reaches the end of the list, it will raise an Exception. Defaults to ``[0.1, 1.0, 5.0, 20.0, 60.0]``. """ if callable(topic_names): topic_names = topic_names() if callable(bootstrap_servers): bootstrap_servers = bootstrap_servers() if not isinstance(topic_names, list): topic_names = [topic_names] if not isinstance(bootstrap_servers, list): bootstrap_servers = [bootstrap_servers] import time # NOTE: These should be here so that not having the packages installed doesn't break a client that doesn't need them. from kafka import KafkaProducer from kafka.errors import NoBrokersAvailable from msgpack import dumps i = 0 while True: try: __kafka_producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=dumps ) break except NoBrokersAvailable as _e: if i == len(backoff_times): raise _e time.sleep(backoff_times[i]) i += 1 def __callback(name, doc): for topic in topic_names: __kafka_producer.send(topic, (name, doc)) return __callback