Skip to content

Messaging Drivers (Simplified for Debugging)

core.messager.drivers.DriverConfig

Bases: BaseDriverConfig

Source code in src/argentic/core/messager/drivers/configs.py
class DriverConfig(BaseDriverConfig):
    url: str
    port: int

    user: Optional[str] = None
    password: Optional[str] = None
    token: Optional[str] = None
    client_id: Optional[str] = None

    keepalive: int = 600
    version: ProtocolVersion = ProtocolVersion.V5

    group_id: Optional[str] = None
    auto_offset_reset: Optional[str] = "earliest"

    virtualhost: Optional[str] = "/"

    class Config:
        extra = "allow"

Core Components

core.messager.drivers.DriverConfig

Bases: BaseDriverConfig

Source code in src/argentic/core/messager/drivers/configs.py
class DriverConfig(BaseDriverConfig):
    url: str
    port: int

    user: Optional[str] = None
    password: Optional[str] = None
    token: Optional[str] = None
    client_id: Optional[str] = None

    keepalive: int = 600
    version: ProtocolVersion = ProtocolVersion.V5

    group_id: Optional[str] = None
    auto_offset_reset: Optional[str] = "earliest"

    virtualhost: Optional[str] = "/"

    class Config:
        extra = "allow"

core.messager.drivers.create_driver(protocol, config_data)

Factory: dynamically import only the requested driver

Source code in src/argentic/core/messager/drivers/__init__.py
def create_driver(protocol: MessagerProtocol, config_data: dict) -> BaseDriver:
    """Factory: dynamically import only the requested driver"""
    mapping = _DRIVER_MAPPING.get(protocol)

    if not mapping:
        raise ValueError(f"Unsupported protocol: {protocol}")

    driver_name, config_class = mapping
    module_path = _DRIVER_MODULES[driver_name]
    try:
        module = importlib.import_module(module_path)
        driver_cls = getattr(module, driver_name)
    except (ImportError, AttributeError) as e:
        raise ImportError(
            f"Could not import or find {driver_name} from {module_path}. "
            f"Ensure the driver and its dependencies are correctly installed."
        ) from e

    config = config_class(**config_data)
    return driver_cls(config)

Available Drivers

MQTT

core.messager.drivers.MQTTDriver.MQTTDriver

Bases: BaseDriver[MQTTDriverConfig]

Source code in src/argentic/core/messager/drivers/MQTTDriver.py
class MQTTDriver(BaseDriver[MQTTDriverConfig]):
    def __init__(self, config: MQTTDriverConfig):
        super().__init__(config)
        self._client: Optional[Client] = None
        self._connected = False
        # Dictionary: topic -> {message_cls_name: (handler, message_cls)}
        self._subscriptions: Dict[str, Dict[str, tuple[MessageHandler, type]]] = {}
        self._message_task: Optional[asyncio.Task] = None
        self._stack: Optional[AsyncExitStack] = None
        # EXPERIMENTAL: Task pool to prevent handler blocking
        self._handler_tasks: set = set()
        self._max_concurrent_handlers = 50
        # Shadow ping to keep connection alive. Interval = keepalive / 2
        if self.config.keepalive is not None and self.config.keepalive > 0:
            self._ping_interval: float = self.config.keepalive / 2.0
        else:
            # Fallback to 30 s if keepalive is disabled or invalid
            self._ping_interval = 30.0
        self._last_ping_time = 0.0
        self._ping_task: Optional[asyncio.Task] = None

    async def connect(self, start_ping: bool = True) -> bool:
        try:
            self._stack = AsyncExitStack()

            # Create aiomqtt client
            self._client = Client(
                hostname=self.config.url,
                port=self.config.port,
                username=self.config.user,
                password=self.config.password,
                identifier=self.config.client_id,
                keepalive=self.config.keepalive,
                protocol=self.config.version,
            )

            # Connect using the async context manager
            await self._stack.enter_async_context(self._client)

            # Start message handler task
            self._message_task = asyncio.create_task(self._handle_messages())

            # Start shadow ping task to keep connection alive
            if start_ping:
                self._ping_task = asyncio.create_task(self._shadow_ping_loop())

            self._connected = True
            logger.info("MQTT connected via aiomqtt.")
            return True

        except Exception as e:
            logger.error(f"MQTT connection failed: {e}")
            self._connected = False
            if self._stack:
                await self._stack.aclose()
                self._stack = None
            return False

    async def disconnect(self) -> None:
        if self._connected:
            self._connected = False

            # Cancel ping task
            if self._ping_task and not self._ping_task.done():
                self._ping_task.cancel()
                try:
                    await self._ping_task
                except asyncio.CancelledError:
                    pass

            # Cancel message handler task
            if self._message_task and not self._message_task.done():
                self._message_task.cancel()
                try:
                    await self._message_task
                except asyncio.CancelledError:
                    pass

            # EXPERIMENTAL: Cancel all pending handler tasks
            for task in self._handler_tasks:
                if not task.done():
                    task.cancel()
            if self._handler_tasks:
                try:
                    await asyncio.gather(*self._handler_tasks, return_exceptions=True)
                except Exception as e:
                    logger.debug(f"Error cleaning up handler tasks: {e}")
            self._handler_tasks.clear()

            # Close the client context
            if self._stack:
                await self._stack.aclose()
                self._stack = None

            self._client = None
            logger.info("MQTT disconnected.")

    def is_connected(self) -> bool:
        return self._connected

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        """
        Publish a message to MQTT.

        If the publish fails due to a lost connection, the driver will
        attempt a single reconnection and retry the publish once.
        """

        attempt = 0
        while attempt < 2:  # first try + one retry after reconnect
            attempt += 1

            # If we already know we're disconnected try reconnect first
            if (not self._connected) or (self._client is None):
                logger.debug("MQTT not connected – attempting to reconnect before publish …")
                await self._reconnect()

            try:
                client = cast(Client, self._client)  # type narrowing for linters
                serialized_data = payload.model_dump_json()
                await client.publish(topic, serialized_data.encode(), qos=qos, retain=retain)
                return  # Success
            except Exception as publish_err:
                logger.warning(f"Publish attempt {attempt} to '{topic}' failed: {publish_err}")

                if attempt >= 2:
                    # No more retries – propagate the error
                    raise

                # Mark as disconnected and try to reconnect once
                self._connected = False
                await self._reconnect()

    async def _reconnect(self) -> None:
        """Attempt to re-establish the MQTT connection.

        This helper is designed to be lightweight so it can be called
        directly from the publish path. The existing shadow-ping task
        continues running; therefore we do NOT spawn a new one here.
        """

        logger.info("Attempting MQTT reconnection …")

        # Close previous connection gracefully
        try:
            if self._stack:
                await self._stack.aclose()
        except Exception:
            pass

        self._client = None
        self._connected = False

        # Re-create connection without starting another ping loop
        await self.connect(start_ping=False)

        # Re-subscribe to previously registered topics
        if self._connected and self._client:
            client = cast(Client, self._client)
            for topic in list(self._subscriptions.keys()):
                try:
                    await client.subscribe(topic, qos=1)
                    logger.debug(f"Resubscribed to topic '{topic}' after reconnect")
                except Exception as sub_err:
                    logger.warning(f"Failed to resubscribe to '{topic}' after reconnect: {sub_err}")

    async def subscribe(
        self, topic: str, handler: MessageHandler, message_cls: type = BaseMessage, **kwargs
    ) -> None:
        if not self._connected or not self._client:
            raise ConnectionError("Not connected to MQTT broker.")

        try:
            # Store the handler for this topic
            if topic not in self._subscriptions:
                self._subscriptions[topic] = {}
            self._subscriptions[topic][message_cls.__name__] = (handler, message_cls)

            # Subscribe using aiomqtt
            await self._client.subscribe(topic, qos=kwargs.get("qos", 1))
            logger.info(f"Subscribed to topic: {topic}")

        except Exception as e:
            logger.error(f"Error subscribing to {topic}: {e}")
            raise

    async def unsubscribe(self, topic: str) -> None:
        if not self._connected or not self._client:
            raise ConnectionError("Not connected to MQTT broker.")

        try:
            # Remove all handlers for this topic
            if topic in self._subscriptions:
                del self._subscriptions[topic]

            # Unsubscribe using aiomqtt
            await self._client.unsubscribe(topic)
            logger.info(f"Unsubscribed from topic: {topic}")

        except Exception as e:
            logger.error(f"Error unsubscribing from {topic}: {e}")
            raise

    async def _handle_messages(self) -> None:
        """Handle incoming messages from aiomqtt."""
        if not self._client:
            return

        try:
            async for message in self._client.messages:
                # EXPERIMENTAL: Don't await _process_message directly - spawn as task
                # This prevents any handler from blocking the main message loop
                if len(self._handler_tasks) >= self._max_concurrent_handlers:
                    logger.warning("Handler task pool full, waiting for completion...")
                    # Wait for at least one task to complete
                    done, pending = await asyncio.wait(
                        self._handler_tasks, return_when=asyncio.FIRST_COMPLETED
                    )
                    # Clean up completed tasks
                    for task in done:
                        self._handler_tasks.discard(task)
                        if task.exception():
                            logger.error(f"Handler task failed: {task.exception()}")

                # Spawn message processing as independent task
                task = asyncio.create_task(self._process_message(message))
                self._handler_tasks.add(task)

                # Clean up completed tasks periodically
                done_tasks = [t for t in self._handler_tasks if t.done()]
                for task in done_tasks:
                    self._handler_tasks.discard(task)
                    if task.exception():
                        logger.error(f"Handler task failed: {task.exception()}")

        except asyncio.CancelledError:
            logger.debug("Message handler task cancelled")
            # Cancel all pending handler tasks
            for task in self._handler_tasks:
                if not task.done():
                    task.cancel()
            # Wait for all tasks to complete
            if self._handler_tasks:
                await asyncio.gather(*self._handler_tasks, return_exceptions=True)
        except Exception as e:
            logger.error(f"Error in message handler: {e}")

    async def _process_message(self, message: Message) -> None:
        """Process a single message from aiomqtt."""
        try:
            # Find handlers for this topic
            handlers = self._subscriptions.get(message.topic.value)
            if not handlers:
                logger.warning(f"No handlers for topic {message.topic.value}")
                return

            # Parse the message payload
            try:
                # Handle different payload types
                if isinstance(message.payload, bytes):
                    payload_str = message.payload.decode()
                elif isinstance(message.payload, str):
                    payload_str = message.payload
                else:
                    payload_str = str(message.payload)

                # Parse as BaseMessage first
                base_message = BaseMessage.model_validate_json(payload_str)
                # Store the original JSON string for re-parsing
                setattr(base_message, "_original_json", payload_str)

            except Exception as e:
                logger.error(f"Failed to parse message from {message.topic.value}: {e}")
                return

            # Call appropriate handlers based on message type compatibility
            for handler_cls_name, (handler, handler_cls) in handlers.items():
                try:
                    # Try to parse the message as the specific type
                    if handler_cls is BaseMessage:
                        # Generic BaseMessage handler
                        await handler(base_message)
                    else:
                        # Try to parse as specific type
                        try:
                            validate_method = getattr(handler_cls, "model_validate_json", None)
                            if validate_method:
                                specific_message = validate_method(payload_str)
                                await handler(specific_message)
                            else:
                                # Skip non-BaseMessage handlers
                                logger.debug(f"Skipping non-BaseMessage handler {handler_cls_name}")
                                continue
                        except Exception as parse_error:
                            # Skip this handler if message doesn't match type
                            logger.debug(
                                f"Message type mismatch for handler {handler_cls_name}: {parse_error}"
                            )
                            continue
                except Exception as handler_error:
                    logger.error(
                        f"Error in handler {handler_cls_name} for topic {message.topic.value}: {handler_error}"
                    )

        except Exception as e:
            logger.error(f"Error processing message from {message.topic.value}: {e}")

    async def _shadow_ping_loop(self) -> None:
        """Send periodic ping messages to keep connection alive.

        This loop runs constantly and independently of connection state.
        It will keep trying to ping even during reconnections.
        """
        logger.info(f"Starting shadow ping loop (interval: {self._ping_interval}s)")
        ping_count = 0

        try:
            while True:
                ping_count += 1
                try:
                    await self._send_ping(ping_count)
                except Exception as ping_err:
                    logger.debug(f"Shadow ping #{ping_count} failed: {ping_err}")
                await asyncio.sleep(self._ping_interval)
        except asyncio.CancelledError:
            logger.info(f"Shadow ping loop cancelled after {ping_count} pings")
        logger.info("Shadow ping loop ended")

    async def _send_ping(self, ping_count: int) -> None:
        """Send a single ping message directly via MQTT client.

        This bypasses the main publish() method to avoid interference
        with reconnection logic.
        """
        if not self._connected:
            raise ConnectionError("Not connected to MQTT broker.")

        ping_topic = f"_ping/{self.config.client_id}"
        ping_msg = InfoMessage(
            source=self.config.client_id or "mqtt_client",
            data={"ping_id": ping_count, "timestamp": time.time()},
        )
        await self.publish(ping_topic, ping_msg, qos=0, retain=False)

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Format MQTT-specific connection error details."""
        if isinstance(error, MqttError):
            return f"MQTT error: {error}"
        return None

format_connection_error_details(error)

Format MQTT-specific connection error details.

Source code in src/argentic/core/messager/drivers/MQTTDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Format MQTT-specific connection error details."""
    if isinstance(error, MqttError):
        return f"MQTT error: {error}"
    return None

publish(topic, payload, qos=0, retain=False) async

Publish a message to MQTT.

If the publish fails due to a lost connection, the driver will attempt a single reconnection and retry the publish once.

Source code in src/argentic/core/messager/drivers/MQTTDriver.py
async def publish(
    self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
) -> None:
    """
    Publish a message to MQTT.

    If the publish fails due to a lost connection, the driver will
    attempt a single reconnection and retry the publish once.
    """

    attempt = 0
    while attempt < 2:  # first try + one retry after reconnect
        attempt += 1

        # If we already know we're disconnected try reconnect first
        if (not self._connected) or (self._client is None):
            logger.debug("MQTT not connected – attempting to reconnect before publish …")
            await self._reconnect()

        try:
            client = cast(Client, self._client)  # type narrowing for linters
            serialized_data = payload.model_dump_json()
            await client.publish(topic, serialized_data.encode(), qos=qos, retain=retain)
            return  # Success
        except Exception as publish_err:
            logger.warning(f"Publish attempt {attempt} to '{topic}' failed: {publish_err}")

            if attempt >= 2:
                # No more retries – propagate the error
                raise

            # Mark as disconnected and try to reconnect once
            self._connected = False
            await self._reconnect()

Kafka

core.messager.drivers.KafkaDriver.KafkaDriver

Bases: BaseDriver[KafkaDriverConfig]

Source code in src/argentic/core/messager/drivers/KafkaDriver.py
class KafkaDriver(BaseDriver[KafkaDriverConfig]):
    def __init__(self, config: KafkaDriverConfig):
        # The imports at the top of the module handle the check.
        # If aiokafka wasn't found, an ImportError would have already occurred.
        super().__init__(config)
        self._producer: Optional[AIOKafkaProducer] = None
        self._consumer: Optional[AIOKafkaConsumer] = None
        self._listeners: Dict[str, List[MessageHandler]] = {}
        self._reader_task: Optional[asyncio.Task] = None
        logger.debug(
            f"KafkaDriver initialized with config: {{url: {config.url}, port: {config.port}}}"
        )

    async def connect(self) -> bool:
        servers = f"{self.config.url}:{self.config.port}"
        logger.info(f"Connecting Kafka producer to bootstrap servers: {servers}")
        try:
            # Explicitly pass the running event loop
            loop = asyncio.get_running_loop()
            self._producer = AIOKafkaProducer(bootstrap_servers=servers, loop=loop)
            await self._producer.start()
            logger.info("Kafka producer started successfully.")
            return True
        except KafkaConnectionError as e:
            logger.error(f"Failed to connect Kafka producer: {e}")
            raise
        except Exception as e:
            logger.error(f"An unexpected error occurred during Kafka producer connection: {e}")
            raise
        return False

    async def disconnect(self) -> None:
        logger.info("Disconnecting Kafka client.")
        if self._producer:
            try:
                logger.debug("Stopping Kafka producer...")
                await self._producer.stop()
                logger.info("Kafka producer stopped.")
            except Exception as e:
                logger.error(f"Error stopping Kafka producer: {e}")
        if self._consumer:
            try:
                logger.debug("Stopping Kafka consumer...")
                await self._consumer.stop()
                logger.info("Kafka consumer stopped.")
            except Exception as e:
                logger.error(f"Error stopping Kafka consumer: {e}")
        if self._reader_task and not self._reader_task.done():
            logger.debug("Cancelling Kafka reader task...")
            self._reader_task.cancel()
            try:
                await self._reader_task
                logger.debug("Kafka reader task cancelled.")
            except asyncio.CancelledError:
                logger.info("Kafka reader task was cancelled as expected.")
            except Exception as e:
                logger.error(f"Error during Kafka reader task cancellation: {e}")

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        if not self._producer:
            logger.error("Kafka producer is not connected. Cannot publish.")
            raise RuntimeError("Kafka producer is not connected.")

        data = payload.model_dump_json().encode()
        logger.debug(
            f"Publishing message to Kafka topic '{topic}'. Payload: {payload.model_dump_json()}"
        )
        try:
            await self._producer.send_and_wait(topic, data)
            logger.info(f"Message successfully sent to Kafka topic '{topic}'.")
        except KafkaTimeoutError as e:
            logger.error(f"Timeout sending message to Kafka topic '{topic}': {e}")
            raise
        except KafkaConnectionError as e:
            logger.error(f"Connection error sending message to Kafka topic '{topic}': {e}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error sending message to Kafka topic '{topic}': {e}")
            raise

    async def subscribe(
        self,
        topic: str,
        handler: MessageHandler,
        message_cls: Type[BaseMessage] = BaseMessage,
        ready_event: Optional[asyncio.Event] = None,
        **kwargs,
    ) -> None:
        """Subscribe to a Kafka topic.

        Args:
            topic: Topic to subscribe to.
            handler: Callback function to handle received messages.
            message_cls: The message class for deserialization (unused in this driver).
            ready_event: Optional asyncio.Event to signal when consumer is ready.
            **kwargs: Additional keyword arguments for AIOKafkaConsumer.
        """
        servers = f"{self.config.url}:{self.config.port}"
        listener_instance = None
        if ready_event:
            listener_instance = _KafkaReadyListener(ready_event, topic)

        if self._consumer is None:
            # Default Kafka consumer settings from config
            consumer_config = {
                "group_id": self.config.group_id or f"default-group-{uuid.uuid4().hex}",
                "auto_offset_reset": self.config.auto_offset_reset,
                "session_timeout_ms": 30000,
                "heartbeat_interval_ms": 10000,
                "max_poll_interval_ms": 300000,
            }
            # Allow kwargs to override config values
            consumer_config.update(kwargs)
            logger.info(
                f"Creating Kafka consumer for topic '{topic}' with effective config: {consumer_config}"
            )
            try:
                # Explicitly pass the running event loop
                loop = asyncio.get_running_loop()
                self._consumer = AIOKafkaConsumer(
                    bootstrap_servers=servers,
                    loop=loop,  # Pass loop
                    **consumer_config,  # Pass merged config
                )
                await self._consumer.start()
                logger.info(f"Kafka consumer started for topic '{topic}'. Subscribing...")
                # Pass listener if provided
                self._consumer.subscribe(
                    [topic], listener=listener_instance
                )  # This call is synchronous
                logger.info(f"Successfully subscribed to initial topic '{topic}'.")
                if self._reader_task is None or self._reader_task.done():
                    self._reader_task = asyncio.create_task(self._reader())
                    logger.info("Kafka reader task started.")
            except KafkaConnectionError as e:
                logger.error(f"Failed to connect Kafka consumer: {e}")
                # Consider how to handle this error (e.g., raise or return status)
                return  # Or raise specific exception
            except Exception as e:
                logger.error(f"An unexpected error occurred during Kafka consumer setup: {e}")
                return  # Or raise

        else:
            # Consumer already exists, add subscription if not already subscribed
            current_subscription = self._consumer.subscription()
            if topic not in current_subscription:
                new_topic_list = list(current_subscription) + [topic]
                logger.info(
                    f"Adding subscription for topic '{topic}'. Current: {current_subscription}, New: {new_topic_list}"
                )
                try:
                    # Pass listener if provided and applicable for re-subscribe
                    self._consumer.subscribe(  # This call is synchronous
                        topics=new_topic_list, listener=listener_instance
                    )
                    logger.info(f"Successfully added subscription for topic '{topic}'.")
                except Exception as e:
                    logger.error(f"Failed to add subscription for topic '{topic}': {e}")
                    return  # Or raise

        # Register handler
        if topic not in self._listeners:
            self._listeners[topic] = []
        if handler not in self._listeners[topic]:
            self._listeners[topic].append(handler)
            logger.info(f"Handler registered for topic '{topic}'.")

    async def _reader(self) -> None:
        """Single reader for all subscribed topics"""
        if not self._consumer:
            logger.error("Kafka consumer is not initialized. Reader cannot start.")
            return

        logger.info("Kafka message reader task started.")
        try:
            async for msg in self._consumer:
                topic = msg.topic
                partition = msg.partition
                offset = msg.offset
                logger.debug(
                    f"Received message from Kafka. Topic: {topic}, Partition: {partition}, Offset: {offset}, Key: {msg.key}, Timestamp: {msg.timestamp}"
                )

                # Log the raw message value before attempting to decode/process
                try:
                    if msg.value:
                        # Attempt to decode for logging, but handle potential errors
                        decoded_value_for_log = msg.value.decode("utf-8")
                        logger.debug(
                            f"Raw message value (decoded for log): {decoded_value_for_log}"
                        )
                    else:
                        logger.debug("Raw message value is None")
                except UnicodeDecodeError:
                    logger.debug(
                        f"Raw message value (bytes, could not decode as UTF-8): {msg.value!r}"
                    )
                except Exception as e_dec:
                    logger.warning(f"Could not decode message value for logging: {e_dec}")

                handlers = self._listeners.get(topic, [])
                if not handlers:
                    logger.warning(
                        f"No handlers registered for topic '{topic}'. Discarding message from Partition: {partition}, Offset: {offset}."
                    )
                    continue

                if not msg.value:
                    logger.debug(f"Skipping message with empty value from topic '{topic}'.")
                    continue

                logger.debug(
                    f"Processing message from topic '{topic}' with {len(handlers)} handler(s)."
                )
                try:
                    deserialized_message = BaseMessage.model_validate_json(msg.value)
                    for i, h in enumerate(handlers):
                        handler_name = h.__name__ if hasattr(h, "__name__") else str(h)
                        logger.debug(
                            f"Invoking handler {i+1}/{len(handlers)} ('{handler_name}') for topic '{topic}'."
                        )
                        try:
                            await h(deserialized_message)
                            logger.debug(
                                f"Handler '{handler_name}' completed for message from topic '{topic}'."
                            )
                        except Exception as e:
                            logger.error(
                                f"Handler '{handler_name}' for topic '{topic}' raised an exception: {e}",
                                exc_info=True,
                            )
                except Exception as e:
                    logger.error(
                        f"Failed to deserialize message for topic '{topic}': {e}", exc_info=True
                    )
        except asyncio.CancelledError:
            logger.info("Kafka reader task was cancelled.")
        except KafkaConnectionError as e:
            logger.error(f"Kafka connection error in reader task: {e}", exc_info=True)
            # Potentially try to reconnect or signal an issue, for now just log and exit task
        except Exception as e:
            logger.error(f"Unexpected error in Kafka reader task: {e}", exc_info=True)
        finally:
            logger.info("Kafka message reader task finished.")

    def is_connected(self) -> bool:
        return self._producer is not None and not self._producer._closed

    async def unsubscribe(self, topic: str) -> None:
        """Unsubscribe from a topic (Kafka doesn't support individual topic unsubscription easily)."""
        logger.warning(
            f"Kafka driver does not support unsubscribing from individual topics. Topic: {topic}"
        )
        # Kafka consumers are typically group-based and subscribe to multiple topics
        # Individual topic unsubscription would require recreating the consumer
        pass

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Format Kafka-specific connection error details."""
        if "kafka" in str(type(error)).lower():
            return f"Kafka error: {error}"
        return None

format_connection_error_details(error)

Format Kafka-specific connection error details.

Source code in src/argentic/core/messager/drivers/KafkaDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Format Kafka-specific connection error details."""
    if "kafka" in str(type(error)).lower():
        return f"Kafka error: {error}"
    return None

subscribe(topic, handler, message_cls=BaseMessage, ready_event=None, **kwargs) async

Subscribe to a Kafka topic.

Parameters:

Name Type Description Default
topic str

Topic to subscribe to.

required
handler MessageHandler

Callback function to handle received messages.

required
message_cls Type[BaseMessage]

The message class for deserialization (unused in this driver).

BaseMessage
ready_event Optional[Event]

Optional asyncio.Event to signal when consumer is ready.

None
**kwargs

Additional keyword arguments for AIOKafkaConsumer.

{}
Source code in src/argentic/core/messager/drivers/KafkaDriver.py
async def subscribe(
    self,
    topic: str,
    handler: MessageHandler,
    message_cls: Type[BaseMessage] = BaseMessage,
    ready_event: Optional[asyncio.Event] = None,
    **kwargs,
) -> None:
    """Subscribe to a Kafka topic.

    Args:
        topic: Topic to subscribe to.
        handler: Callback function to handle received messages.
        message_cls: The message class for deserialization (unused in this driver).
        ready_event: Optional asyncio.Event to signal when consumer is ready.
        **kwargs: Additional keyword arguments for AIOKafkaConsumer.
    """
    servers = f"{self.config.url}:{self.config.port}"
    listener_instance = None
    if ready_event:
        listener_instance = _KafkaReadyListener(ready_event, topic)

    if self._consumer is None:
        # Default Kafka consumer settings from config
        consumer_config = {
            "group_id": self.config.group_id or f"default-group-{uuid.uuid4().hex}",
            "auto_offset_reset": self.config.auto_offset_reset,
            "session_timeout_ms": 30000,
            "heartbeat_interval_ms": 10000,
            "max_poll_interval_ms": 300000,
        }
        # Allow kwargs to override config values
        consumer_config.update(kwargs)
        logger.info(
            f"Creating Kafka consumer for topic '{topic}' with effective config: {consumer_config}"
        )
        try:
            # Explicitly pass the running event loop
            loop = asyncio.get_running_loop()
            self._consumer = AIOKafkaConsumer(
                bootstrap_servers=servers,
                loop=loop,  # Pass loop
                **consumer_config,  # Pass merged config
            )
            await self._consumer.start()
            logger.info(f"Kafka consumer started for topic '{topic}'. Subscribing...")
            # Pass listener if provided
            self._consumer.subscribe(
                [topic], listener=listener_instance
            )  # This call is synchronous
            logger.info(f"Successfully subscribed to initial topic '{topic}'.")
            if self._reader_task is None or self._reader_task.done():
                self._reader_task = asyncio.create_task(self._reader())
                logger.info("Kafka reader task started.")
        except KafkaConnectionError as e:
            logger.error(f"Failed to connect Kafka consumer: {e}")
            # Consider how to handle this error (e.g., raise or return status)
            return  # Or raise specific exception
        except Exception as e:
            logger.error(f"An unexpected error occurred during Kafka consumer setup: {e}")
            return  # Or raise

    else:
        # Consumer already exists, add subscription if not already subscribed
        current_subscription = self._consumer.subscription()
        if topic not in current_subscription:
            new_topic_list = list(current_subscription) + [topic]
            logger.info(
                f"Adding subscription for topic '{topic}'. Current: {current_subscription}, New: {new_topic_list}"
            )
            try:
                # Pass listener if provided and applicable for re-subscribe
                self._consumer.subscribe(  # This call is synchronous
                    topics=new_topic_list, listener=listener_instance
                )
                logger.info(f"Successfully added subscription for topic '{topic}'.")
            except Exception as e:
                logger.error(f"Failed to add subscription for topic '{topic}': {e}")
                return  # Or raise

    # Register handler
    if topic not in self._listeners:
        self._listeners[topic] = []
    if handler not in self._listeners[topic]:
        self._listeners[topic].append(handler)
        logger.info(f"Handler registered for topic '{topic}'.")

unsubscribe(topic) async

Unsubscribe from a topic (Kafka doesn't support individual topic unsubscription easily).

Source code in src/argentic/core/messager/drivers/KafkaDriver.py
async def unsubscribe(self, topic: str) -> None:
    """Unsubscribe from a topic (Kafka doesn't support individual topic unsubscription easily)."""
    logger.warning(
        f"Kafka driver does not support unsubscribing from individual topics. Topic: {topic}"
    )
    # Kafka consumers are typically group-based and subscribe to multiple topics
    # Individual topic unsubscription would require recreating the consumer
    pass

RabbitMQ

core.messager.drivers.RabbitMQDriver.RabbitMQDriver

Bases: BaseDriver[RabbitMQDriverConfig]

Source code in src/argentic/core/messager/drivers/RabbitMQDriver.py
class RabbitMQDriver(BaseDriver[RabbitMQDriverConfig]):
    def __init__(self, config: RabbitMQDriverConfig):
        super().__init__(config)
        self._connection: Optional[aio_pika.RobustConnection] = None
        self._channel: Optional[aio_pika.Channel] = None
        # topic to list of handlers
        self._listeners: Dict[str, List[MessageHandler]] = {}
        # track queues per topic
        self._queues: Dict[str, aio_pika.Queue] = {}
        self.logger = logging.getLogger("RabbitMQDriver")

    async def connect(self) -> bool:
        # Determine virtualhost, defaulting to '/' if not specified or empty
        virtualhost = self.config.virtualhost or "/"
        # Ensure virtualhost starts with a slash if it's not empty and doesn't have one
        if virtualhost != "/" and not virtualhost.startswith("/"):
            virtualhost = "/" + virtualhost
        # If virtualhost is just '/', ensure no double slash in the URL if user was empty and we defaulted
        if virtualhost == "/":
            url_vhost_part = ""  # aio_pika implicitly uses / if path is empty
        else:
            url_vhost_part = virtualhost

        url = f"amqp://{self.config.user}:{self.config.password}@{self.config.url}:{self.config.port}{url_vhost_part}"
        self.logger.info(
            f"Connecting to RabbitMQ with URL: {url}"
        )  # Log the full URL for debugging
        self._connection = await aio_pika.connect_robust(url)
        self._channel = await self._connection.channel()
        self.logger.info(
            f"Connected to RabbitMQ at {self.config.url}:{self.config.port}, vhost: {virtualhost}"
        )
        return True

    async def disconnect(self) -> None:
        self.logger.info("Attempting to disconnect from RabbitMQ...")
        if self._connection and not self._connection.is_closed:
            self.logger.info(f"Connection object exists. is_closed: {self._connection.is_closed}")
            try:
                await self._connection.close()
                self.logger.info("Successfully closed RabbitMQ connection.")
            except Exception as e:
                self.logger.error(f"Error during RabbitMQ connection.close(): {e!r}", exc_info=True)
        else:
            self.logger.info("No active RabbitMQ connection object to close.")
        # Reset state
        self._connection = None
        self._channel = None
        self._listeners = {}
        self._queues = {}
        self.logger.info("RabbitMQ driver state reset after disconnect.")

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        if not self._channel:
            raise ConnectionError("RabbitMQ channel is not available.")
        try:
            exchange = await self._channel.declare_exchange(topic, aio_pika.ExchangeType.FANOUT)

            # Handle BaseMessage serialization
            body = payload.model_dump_json().encode("utf-8")

            message = aio_pika.Message(body=body)
            await exchange.publish(message, routing_key="")
            self.logger.debug(f"Published message to exchange: {topic}")
        except Exception as e:
            self.logger.error(f"Error publishing to exchange {topic}: {e}")
            raise

    async def subscribe(
        self,
        topic: str,
        handler: MessageHandler,
        message_cls: Type[BaseMessage] = BaseMessage,
        **kwargs,
    ) -> None:
        try:
            self.logger.info(f"Subscribe called for topic: {topic}")
            if not self._channel:
                self.logger.error(f"Cannot subscribe to topic '{topic}', channel is not available.")
                # Or raise an exception if this state is unexpected
                raise RuntimeError("RabbitMQ channel not available for subscription.")

            # register handler and setup consumer on first subscribe per topic
            if topic not in self._listeners:
                self._listeners[topic] = []
                self.logger.info(
                    f"First subscription for topic '{topic}', setting up exchange and queue."
                )
                # declare exchange and queue
                exchange = await self._channel.declare_exchange(topic, aio_pika.ExchangeType.FANOUT)
                self.logger.debug(f"Declared exchange '{topic}' type FANOUT.")
                queue = await self._channel.declare_queue(exclusive=True)
                self.logger.debug(f"Declared exclusive queue '{queue.name}' for topic '{topic}'.")
                await queue.bind(exchange)
                self.logger.debug(f"Bound queue '{queue.name}' to exchange '{topic}'.")
                self._queues[topic] = queue
                self.logger.info(f"Created and bound queue '{queue.name}' for topic: {topic}")

                # single reader for this topic
                async def _reader(message: aio_pika.IncomingMessage) -> None:
                    self.logger.debug(
                        f"[_reader for {topic}] Received raw message. Message ID: {message.message_id}, Correlation ID: {message.correlation_id}, Routing Key: {message.routing_key}"
                    )
                    try:
                        self.logger.debug(
                            f"[_reader for {topic}] Entering message.process() context for message ID: {message.message_id}"
                        )
                        async with message.process():
                            self.logger.debug(
                                f"[_reader for {topic}] Successfully entered message.process() for message ID: {message.message_id}. Body type: {type(message.body)}, Body (first 100 bytes): {message.body[:100]!r}"
                            )
                            topic_handlers = self._listeners.get(topic, [])
                            if not topic_handlers:
                                self.logger.warning(
                                    f"[_reader for {topic}] No handlers found for message ID: {message.message_id} after entering process context. This should not happen if consumer is active."
                                )
                                return

                            for i, h in enumerate(topic_handlers):
                                handler_name = getattr(h, "__name__", str(h))
                                self.logger.debug(
                                    f"[_reader for {topic}] Invoking handler {i+1}/{len(topic_handlers)} ('{handler_name}') for message ID: {message.message_id}"
                                )
                                try:
                                    # Deserialize message.body (bytes) to BaseMessage before passing to handler
                                    deserialized_message = BaseMessage.model_validate_json(
                                        message.body
                                    )
                                    await h(deserialized_message)
                                    self.logger.debug(
                                        f"[_reader for {topic}] Handler '{handler_name}' completed for message ID: {message.message_id}"
                                    )
                                except Exception as e_handler:
                                    self.logger.error(
                                        f"[_reader for {topic}] Handler '{handler_name}' failed for message ID: {message.message_id}: {e_handler!r}",
                                        exc_info=True,
                                    )
                        self.logger.debug(
                            f"[_reader for {topic}] Exited message.process() context for message ID: {message.message_id}"
                        )
                    except Exception as e_process:
                        self.logger.error(
                            f"[_reader for {topic}] Exception during message.process() or handler invocation for message ID: {message.message_id}: {e_process!r}",
                            exc_info=True,
                        )
                        # Depending on aio_pika's behavior, we might need to nack or requeue explicitly if message.process() fails internally
                        # For now, logging the error. If message.process() raises, it typically means the message wasn't acked.

                # Start consumer
                self.logger.info(f"Starting consumer for queue '{queue.name}' on topic '{topic}'.")
                consumer_tag = await queue.consume(
                    _reader
                )  # Store consumer_tag if needed for cancellation
                self.logger.info(
                    f"Started consumer for queue '{queue.name}' (topic: {topic}) with consumer_tag: {consumer_tag}."
                )
                # Store consumer_tag in self._queues or a new dict if you need to cancel specific consumers later
                # For example: self._queues[topic] = {'queue': queue, 'consumer_tag': consumer_tag}

            self._listeners[topic].append(handler)
            self.logger.info(
                f"Added handler for topic: {topic}, total handlers: {len(self._listeners[topic])}"
            )
        except Exception as e:
            self.logger.error(f"Error subscribing to topic {topic}: {e}")
            raise

    def is_connected(self) -> bool:
        return bool(self._connection and not getattr(self._connection, "is_closed", True))

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Extracts RabbitMQ specific Connection.Close frame details from an exception."""
        details = []
        details.append(f"Exception type: {type(error)}")
        # Use hasattr and getattr for safe attribute access
        if hasattr(error, "args") and error.args:
            details.append(f"Exception args: {error.args!r}")
            if (
                len(error.args) > 1
                and hasattr(error.args[1], "reply_code")
                and hasattr(error.args[1], "reply_text")
            ):
                close_frame = error.args[1]
                details.append(
                    f"RabbitMQ Connection.Close frame (from e.args[1]): "
                    f"reply_code={close_frame.reply_code}, reply_text='{close_frame.reply_text}'"
                )
                return "\n".join(details)
            elif (
                hasattr(error, "frame")
                and hasattr(error.frame, "reply_code")
                and hasattr(error.frame, "reply_text")
            ):
                close_frame = error.frame
                details.append(
                    f"RabbitMQ Connection.Close frame (from e.frame): "
                    f"reply_code={close_frame.reply_code}, reply_text='{close_frame.reply_text}'"
                )
                return "\n".join(details)

        if len(details) > 1:
            return "\n".join(details)

        return None

    async def unsubscribe(self, topic: str) -> None:
        """Unsubscribe from a RabbitMQ topic (queue)."""
        # RabbitMQ unsubscription is handled by closing the consumer
        # In practice, this is complex as it involves queue management
        self.logger.warning(f"RabbitMQ driver unsubscribe not fully implemented for topic: {topic}")
        # In a full implementation, you would need to:
        # 1. Find the consumer for this topic/queue
        # 2. Cancel the consumer
        # 3. Potentially delete the queue if it was auto-created
        pass

format_connection_error_details(error)

Extracts RabbitMQ specific Connection.Close frame details from an exception.

Source code in src/argentic/core/messager/drivers/RabbitMQDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Extracts RabbitMQ specific Connection.Close frame details from an exception."""
    details = []
    details.append(f"Exception type: {type(error)}")
    # Use hasattr and getattr for safe attribute access
    if hasattr(error, "args") and error.args:
        details.append(f"Exception args: {error.args!r}")
        if (
            len(error.args) > 1
            and hasattr(error.args[1], "reply_code")
            and hasattr(error.args[1], "reply_text")
        ):
            close_frame = error.args[1]
            details.append(
                f"RabbitMQ Connection.Close frame (from e.args[1]): "
                f"reply_code={close_frame.reply_code}, reply_text='{close_frame.reply_text}'"
            )
            return "\n".join(details)
        elif (
            hasattr(error, "frame")
            and hasattr(error.frame, "reply_code")
            and hasattr(error.frame, "reply_text")
        ):
            close_frame = error.frame
            details.append(
                f"RabbitMQ Connection.Close frame (from e.frame): "
                f"reply_code={close_frame.reply_code}, reply_text='{close_frame.reply_text}'"
            )
            return "\n".join(details)

    if len(details) > 1:
        return "\n".join(details)

    return None

unsubscribe(topic) async

Unsubscribe from a RabbitMQ topic (queue).

Source code in src/argentic/core/messager/drivers/RabbitMQDriver.py
async def unsubscribe(self, topic: str) -> None:
    """Unsubscribe from a RabbitMQ topic (queue)."""
    # RabbitMQ unsubscription is handled by closing the consumer
    # In practice, this is complex as it involves queue management
    self.logger.warning(f"RabbitMQ driver unsubscribe not fully implemented for topic: {topic}")
    # In a full implementation, you would need to:
    # 1. Find the consumer for this topic/queue
    # 2. Cancel the consumer
    # 3. Potentially delete the queue if it was auto-created
    pass

Redis

core.messager.drivers.RedisDriver.RedisDriver

Bases: BaseDriver[RedisDriverConfig]

Source code in src/argentic/core/messager/drivers/RedisDriver.py
class RedisDriver(BaseDriver[RedisDriverConfig]):
    def __init__(self, config: RedisDriverConfig):
        if not AIOREDIS_INSTALLED:
            raise ImportError(
                "aioredis is not installed. "
                "Please install it with: uv pip install argentic[redis]"
            )
        super().__init__(config)
        self._redis: Optional["aioredis.Redis"] = None
        # topic to list of handlers
        self._listeners: Dict[str, List[MessageHandler]] = {}
        self._pubsub: Optional[PubSub] = None
        self._reader_task: Optional[asyncio.Task] = None

    async def connect(self) -> bool:
        url = f"redis://{self.config.url}:{self.config.port}"
        self._redis = await aioredis.from_url(
            url,
            password=self.config.password,
        )
        return True

    async def disconnect(self) -> None:
        if self._pubsub:
            await self._pubsub.close()
        if self._reader_task and not self._reader_task.done():
            self._reader_task.cancel()
            try:
                await self._reader_task
            except asyncio.CancelledError:
                pass

        if self._redis:
            await self._redis.close()

        self._redis = None
        self._pubsub = None
        self._reader_task = None

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        # Handle BaseMessage serialization
        data = payload.model_dump_json()

        if not self._redis:
            raise ConnectionError("Redis client not connected")
        await self._redis.publish(topic, data)

    async def subscribe(
        self,
        topic: str,
        handler: MessageHandler,
        message_cls: Type[BaseMessage] = BaseMessage,
        **kwargs,
    ) -> None:
        if not self._redis:
            raise ConnectionError("Redis client not connected")
        # register handler and subscribe on first handler per topic
        if topic not in self._listeners:
            self._listeners[topic] = []
            # initialize pubsub and reader
            if self._pubsub is None:
                self._pubsub = self._redis.pubsub()
            await self._pubsub.subscribe(topic)
            if self._reader_task is None or self._reader_task.done():
                self._reader_task = asyncio.create_task(self._reader())
        self._listeners[topic].append(handler)

    async def _reader(self) -> None:
        # single reader for all topics
        if not self._pubsub:
            return
        async for message in self._pubsub.listen():
            if message.get("type") == "message":
                channel_bytes = message.get("channel")
                if not channel_bytes:
                    continue
                channel = channel_bytes.decode()
                data = message.get("data")
                if not data:
                    continue

                try:
                    msg_obj = BaseMessage.model_validate_json(data)
                    for h in self._listeners.get(channel, []):
                        await h(msg_obj)
                except Exception:
                    # Potentially log this error
                    pass

    def is_connected(self) -> bool:
        return bool(self._redis and not getattr(self._redis, "closed", True))

    async def unsubscribe(self, topic: str) -> None:
        """Unsubscribe from a Redis topic."""
        if self._pubsub and topic in self._listeners:
            await self._pubsub.unsubscribe(topic)
            del self._listeners[topic]

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Format Redis-specific connection error details."""
        if "redis" in str(type(error)).lower():
            return f"Redis error: {error}"
        return None

format_connection_error_details(error)

Format Redis-specific connection error details.

Source code in src/argentic/core/messager/drivers/RedisDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Format Redis-specific connection error details."""
    if "redis" in str(type(error)).lower():
        return f"Redis error: {error}"
    return None

unsubscribe(topic) async

Unsubscribe from a Redis topic.

Source code in src/argentic/core/messager/drivers/RedisDriver.py
async def unsubscribe(self, topic: str) -> None:
    """Unsubscribe from a Redis topic."""
    if self._pubsub and topic in self._listeners:
        await self._pubsub.unsubscribe(topic)
        del self._listeners[topic]