Skip to content

Messaging Drivers (Simplified for Debugging)

core.messager.drivers.DriverConfig

Bases: BaseDriverConfig

Source code in src/argentic/core/messager/drivers/configs.py
class DriverConfig(BaseDriverConfig):
    url: str
    port: int

    user: Optional[str] = None
    password: Optional[str] = None
    token: Optional[str] = None
    client_id: Optional[str] = None

    keepalive: int = 600
    version: ProtocolVersion = ProtocolVersion.V5

    group_id: Optional[str] = None
    auto_offset_reset: Optional[str] = "earliest"

    virtualhost: Optional[str] = "/"

    class Config:
        extra = "allow"

Core Components

core.messager.drivers.DriverConfig

Bases: BaseDriverConfig

Source code in src/argentic/core/messager/drivers/configs.py
class DriverConfig(BaseDriverConfig):
    url: str
    port: int

    user: Optional[str] = None
    password: Optional[str] = None
    token: Optional[str] = None
    client_id: Optional[str] = None

    keepalive: int = 600
    version: ProtocolVersion = ProtocolVersion.V5

    group_id: Optional[str] = None
    auto_offset_reset: Optional[str] = "earliest"

    virtualhost: Optional[str] = "/"

    class Config:
        extra = "allow"

core.messager.drivers.create_driver(protocol, config_data)

Factory: dynamically import only the requested driver

Source code in src/argentic/core/messager/drivers/__init__.py
def create_driver(protocol: MessagerProtocol, config_data: dict) -> BaseDriver:
    """Factory: dynamically import only the requested driver"""
    mapping = _DRIVER_MAPPING.get(protocol)

    if not mapping:
        raise ValueError(f"Unsupported protocol: {protocol}")

    driver_name, config_class = mapping
    module_path = _DRIVER_MODULES[driver_name]
    try:
        module = importlib.import_module(module_path)
        driver_cls = getattr(module, driver_name)
    except (ImportError, AttributeError) as e:
        raise ImportError(
            f"Could not import or find {driver_name} from {module_path}. "
            f"Ensure the driver and its dependencies are correctly installed."
        ) from e

    config = config_class(**config_data)
    return driver_cls(config)

Available Drivers

MQTT

core.messager.drivers.MQTTDriver.MQTTDriver

Bases: BaseDriver[MQTTDriverConfig]

Source code in src/argentic/core/messager/drivers/MQTTDriver.py
class MQTTDriver(BaseDriver[MQTTDriverConfig]):
    def __init__(self, config: MQTTDriverConfig):
        super().__init__(config)
        self._client: Optional[Client] = None
        self._connected = False
        # Dictionary: topic -> {message_cls_name: (handler, message_cls)}
        self._subscriptions: Dict[str, Dict[str, tuple[MessageHandler, type]]] = {}
        self._message_task: Optional[asyncio.Task] = None
        self._stack: Optional[AsyncExitStack] = None
        # Track handler tasks without leaking memory – WeakSet auto-removes finished tasks
        self._handler_tasks: "weakref.WeakSet[asyncio.Task]" = weakref.WeakSet()
        self._max_concurrent_handlers = 50
        # Shadow ping to keep connection alive. Interval = keepalive / 2
        if self.config.keepalive is not None and self.config.keepalive > 0:
            self._ping_interval: float = self.config.keepalive / 2.0
        else:
            # Fallback to 30 s if keepalive is disabled or invalid
            self._ping_interval = 30.0
        self._last_ping_time = 0.0
        self._ping_task: Optional[asyncio.Task] = None

    async def connect(self, start_ping: bool = True) -> bool:
        try:
            self._stack = AsyncExitStack()

            # Create aiomqtt client
            self._client = Client(
                hostname=self.config.url,
                port=self.config.port,
                username=self.config.user,
                password=self.config.password,
                identifier=self.config.client_id,
                keepalive=self.config.keepalive,
                protocol=self.config.version,
            )

            # Connect using the async context manager
            await self._stack.enter_async_context(self._client)

            # Start message handler task
            self._message_task = asyncio.create_task(self._handle_messages())

            # Start shadow ping task to keep connection alive
            if start_ping:
                self._ping_task = asyncio.create_task(self._shadow_ping_loop())

            self._connected = True
            logger.info("MQTT connected via aiomqtt.")
            return True

        except Exception as e:
            logger.error(f"MQTT connection failed: {e}")
            self._connected = False
            if self._stack:
                await self._stack.aclose()
                self._stack = None
            return False

    async def disconnect(self) -> None:
        if self._connected:
            self._connected = False

            # Cancel ping task
            if self._ping_task and not self._ping_task.done():
                self._ping_task.cancel()
                try:
                    await self._ping_task
                except asyncio.CancelledError:
                    pass

            # Cancel message handler task
            if self._message_task and not self._message_task.done():
                self._message_task.cancel()
                try:
                    await self._message_task
                except asyncio.CancelledError:
                    pass

            # EXPERIMENTAL: Cancel all pending handler tasks
            for task in self._handler_tasks:
                if not task.done():
                    task.cancel()
            if self._handler_tasks:
                try:
                    await asyncio.gather(*self._handler_tasks, return_exceptions=True)
                except Exception as e:
                    logger.debug(f"Error cleaning up handler tasks: {e}")
            self._handler_tasks.clear()

            # Close the client context
            if self._stack:
                await self._stack.aclose()
                self._stack = None

            self._client = None
            logger.info("MQTT disconnected.")

    def is_connected(self) -> bool:
        return self._connected

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        """
        Publish a message to MQTT.

        If the publish fails due to a lost connection, the driver will
        attempt a single reconnection and retry the publish once.
        """

        attempt = 0
        while attempt < 2:  # first try + one retry after reconnect
            attempt += 1

            # If we already know we're disconnected try reconnect first
            if (not self._connected) or (self._client is None):
                logger.debug("MQTT not connected – attempting to reconnect before publish …")
                await self._reconnect()

            try:
                client = cast(Client, self._client)  # type narrowing for linters
                serialized_data = payload.model_dump_json()
                await client.publish(topic, serialized_data.encode(), qos=qos, retain=retain)
                return  # Success
            except Exception as publish_err:
                logger.warning(f"Publish attempt {attempt} to '{topic}' failed: {publish_err}")

                if attempt >= 2:
                    # No more retries – propagate the error
                    raise

                # Mark as disconnected and try to reconnect once
                self._connected = False
                await self._reconnect()

    async def _reconnect(self) -> None:
        """Attempt to re-establish the MQTT connection.

        This helper is designed to be lightweight so it can be called
        directly from the publish path. The existing shadow-ping task
        continues running; therefore we do NOT spawn a new one here.
        """

        logger.info("Attempting MQTT reconnection …")

        # Close previous connection gracefully
        try:
            if self._stack:
                await self._stack.aclose()
        except Exception:
            pass

        self._client = None
        self._connected = False

        # Re-create connection without starting another ping loop
        await self.connect(start_ping=False)

        # Re-subscribe to previously registered topics
        if self._connected and self._client:
            client = cast(Client, self._client)
            for topic in list(self._subscriptions.keys()):
                try:
                    await client.subscribe(topic, qos=1)
                    logger.debug(f"Resubscribed to topic '{topic}' after reconnect")
                except Exception as sub_err:
                    logger.warning(f"Failed to resubscribe to '{topic}' after reconnect: {sub_err}")

    async def subscribe(
        self, topic: str, handler: MessageHandler, message_cls: type = BaseMessage, **kwargs
    ) -> None:
        if not self._connected or not self._client:
            raise ConnectionError("Not connected to MQTT broker.")

        try:
            # Store the handler for this topic
            if topic not in self._subscriptions:
                self._subscriptions[topic] = {}
            # Prefer the explicit message_cls provided via kwargs, fallback to the arg
            explicit_cls = kwargs.pop("message_cls", None)
            cls_to_use = explicit_cls or message_cls or BaseMessage
            self._subscriptions[topic][cls_to_use.__name__] = (handler, cls_to_use)

            # Subscribe using aiomqtt
            await self._client.subscribe(topic, qos=kwargs.get("qos", 1))
            logger.info(f"Subscribed to topic: {topic}")

        except Exception as e:
            logger.error(f"Error subscribing to {topic}: {e}")
            raise

    async def unsubscribe(self, topic: str) -> None:
        if not self._connected or not self._client:
            raise ConnectionError("Not connected to MQTT broker.")

        try:
            # Remove all handlers for this topic
            if topic in self._subscriptions:
                del self._subscriptions[topic]

            # Unsubscribe using aiomqtt
            await self._client.unsubscribe(topic)
            logger.info(f"Unsubscribed from topic: {topic}")

        except Exception as e:
            logger.error(f"Error unsubscribing from {topic}: {e}")
            raise

    async def _handle_messages(self) -> None:
        """Handle incoming messages from aiomqtt."""
        if not self._client:
            return

        try:
            async for message in self._client.messages:
                # EXPERIMENTAL: Don't await _process_message directly - spawn as task
                # This prevents any handler from blocking the main message loop
                if len(self._handler_tasks) >= self._max_concurrent_handlers:
                    logger.warning("Handler task pool full, waiting for completion...")
                    # Wait for at least one task to complete
                    done, pending = await asyncio.wait(
                        self._handler_tasks, return_when=asyncio.FIRST_COMPLETED
                    )
                    # Clean up completed tasks
                    for task in done:
                        self._handler_tasks.discard(task)
                        if task.exception():
                            logger.error(f"Handler task failed: {task.exception()}")

                # Spawn message processing as independent task
                task = asyncio.create_task(self._process_message(message))

                # Auto-remove from the tracking set once finished to avoid leaks
                task.add_done_callback(self._handler_tasks.discard)

                self._handler_tasks.add(task)

        except asyncio.CancelledError:
            logger.debug("Message handler task cancelled")
            # Cancel and gather remaining handler tasks (WeakSet auto-shrinks)
            pending = [t for t in self._handler_tasks if not t.done()]
            for task in pending:
                task.cancel()
            if pending:
                await asyncio.gather(*pending, return_exceptions=True)
        except Exception as e:
            logger.error(f"Error in message handler: {e}")

    async def _process_message(self, message: Message) -> None:
        """Process a single message from aiomqtt."""
        try:
            # Find handlers for this topic
            handlers = self._subscriptions.get(message.topic.value)
            if not handlers:
                logger.warning(f"No handlers for topic {message.topic.value}")
                return

            # Parse the message payload
            try:
                # Handle different payload types
                if isinstance(message.payload, bytes):
                    payload_str = message.payload.decode()
                elif isinstance(message.payload, str):
                    payload_str = message.payload
                else:
                    payload_str = str(message.payload)

                # Parse as BaseMessage first
                base_message = BaseMessage.model_validate_json(payload_str)
                # Store the original JSON string for re-parsing
                setattr(base_message, "_original_json", payload_str)

            except Exception as e:
                logger.error(f"Failed to parse message from {message.topic.value}: {e}")
                return

            # Call appropriate handlers based on message type compatibility
            for handler_cls_name, (handler, handler_cls) in handlers.items():
                try:
                    # Parse as specific type early if provided
                    if handler_cls is BaseMessage:
                        await handler(base_message)
                    else:
                        try:
                            validate_method = getattr(handler_cls, "model_validate_json", None)
                            if validate_method:
                                specific_message = validate_method(
                                    getattr(
                                        base_message,
                                        "_original_json",
                                        base_message.model_dump_json(),
                                    )
                                )
                                await handler(specific_message)
                            else:
                                await handler(base_message)
                        except Exception as parse_error:
                            logger.debug(
                                f"Message type mismatch for handler {handler_cls_name}: {parse_error}"
                            )
                            continue
                except Exception as handler_error:
                    logger.error(
                        f"Error in handler {handler_cls_name} for topic {message.topic.value}: {handler_error}"
                    )

        except Exception as e:
            logger.error(f"Error processing message from {message.topic.value}: {e}")

    async def _shadow_ping_loop(self) -> None:
        """Send periodic ping messages to keep connection alive.

        This loop runs constantly and independently of connection state.
        It will keep trying to ping even during reconnections.
        """
        logger.info(f"Starting shadow ping loop (interval: {self._ping_interval}s)")
        ping_count = 0

        try:
            while True:
                ping_count += 1
                try:
                    await self._send_ping(ping_count)
                except Exception as ping_err:
                    logger.debug(f"Shadow ping #{ping_count} failed: {ping_err}")
                await asyncio.sleep(self._ping_interval)
        except asyncio.CancelledError:
            logger.info(f"Shadow ping loop cancelled after {ping_count} pings")
        logger.info("Shadow ping loop ended")

    async def _send_ping(self, ping_count: int) -> None:
        """Send a single ping message directly via MQTT client.

        This bypasses the main publish() method to avoid interference
        with reconnection logic.
        """
        if not self._connected:
            raise ConnectionError("Not connected to MQTT broker.")

        ping_topic = f"_ping/{self.config.client_id}"
        ping_msg = InfoMessage(
            source=self.config.client_id or "mqtt_client",
            data={"ping_id": ping_count, "timestamp": time.time()},
        )
        await self.publish(ping_topic, ping_msg, qos=0, retain=False)

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Format MQTT-specific connection error details."""
        if isinstance(error, MqttError):
            return f"MQTT error: {error}"
        return None

format_connection_error_details(error)

Format MQTT-specific connection error details.

Source code in src/argentic/core/messager/drivers/MQTTDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Format MQTT-specific connection error details."""
    if isinstance(error, MqttError):
        return f"MQTT error: {error}"
    return None

publish(topic, payload, qos=0, retain=False) async

Publish a message to MQTT.

If the publish fails due to a lost connection, the driver will attempt a single reconnection and retry the publish once.

Source code in src/argentic/core/messager/drivers/MQTTDriver.py
async def publish(
    self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
) -> None:
    """
    Publish a message to MQTT.

    If the publish fails due to a lost connection, the driver will
    attempt a single reconnection and retry the publish once.
    """

    attempt = 0
    while attempt < 2:  # first try + one retry after reconnect
        attempt += 1

        # If we already know we're disconnected try reconnect first
        if (not self._connected) or (self._client is None):
            logger.debug("MQTT not connected – attempting to reconnect before publish …")
            await self._reconnect()

        try:
            client = cast(Client, self._client)  # type narrowing for linters
            serialized_data = payload.model_dump_json()
            await client.publish(topic, serialized_data.encode(), qos=qos, retain=retain)
            return  # Success
        except Exception as publish_err:
            logger.warning(f"Publish attempt {attempt} to '{topic}' failed: {publish_err}")

            if attempt >= 2:
                # No more retries – propagate the error
                raise

            # Mark as disconnected and try to reconnect once
            self._connected = False
            await self._reconnect()

Kafka

core.messager.drivers.KafkaDriver.KafkaDriver

Bases: BaseDriver[KafkaDriverConfig]

Source code in src/argentic/core/messager/drivers/KafkaDriver.py
class KafkaDriver(BaseDriver[KafkaDriverConfig]):
    def __init__(self, config: KafkaDriverConfig):
        # The imports at the top of the module handle the check.
        # If aiokafka wasn't found, an ImportError would have already occurred.
        super().__init__(config)
        self._producer: Optional[AIOKafkaProducer] = None
        self._consumer: Optional[AIOKafkaConsumer] = None
        self._listeners: Dict[str, List[MessageHandler]] = {}
        self._reader_task: Optional[asyncio.Task] = None
        logger.debug(
            f"KafkaDriver initialized with config: {{url: {config.url}, port: {config.port}}}"
        )

    async def connect(self) -> bool:
        servers = f"{self.config.url}:{self.config.port}"
        logger.info(f"Connecting Kafka producer to bootstrap servers: {servers}")
        try:
            # Explicitly pass the running event loop
            loop = asyncio.get_running_loop()
            self._producer = AIOKafkaProducer(bootstrap_servers=servers, loop=loop)
            await self._producer.start()
            logger.info("Kafka producer started successfully.")
            return True
        except KafkaConnectionError as e:
            logger.error(f"Failed to connect Kafka producer: {e}")
            raise
        except Exception as e:
            logger.error(f"An unexpected error occurred during Kafka producer connection: {e}")
            raise
        return False

    async def disconnect(self) -> None:
        logger.info("Disconnecting Kafka client.")
        if self._producer:
            try:
                logger.debug("Stopping Kafka producer...")
                await self._producer.stop()
                logger.info("Kafka producer stopped.")
            except Exception as e:
                logger.error(f"Error stopping Kafka producer: {e}")
        if self._consumer:
            try:
                logger.debug("Stopping Kafka consumer...")
                await self._consumer.stop()
                logger.info("Kafka consumer stopped.")
            except Exception as e:
                logger.error(f"Error stopping Kafka consumer: {e}")
        if self._reader_task and not self._reader_task.done():
            logger.debug("Cancelling Kafka reader task...")
            self._reader_task.cancel()
            try:
                await self._reader_task
                logger.debug("Kafka reader task cancelled.")
            except asyncio.CancelledError:
                logger.info("Kafka reader task was cancelled as expected.")
            except Exception as e:
                logger.error(f"Error during Kafka reader task cancellation: {e}")

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        if not self._producer:
            logger.error("Kafka producer is not connected. Cannot publish.")
            raise RuntimeError("Kafka producer is not connected.")

        data = payload.model_dump_json().encode()
        logger.debug(
            f"Publishing message to Kafka topic '{topic}'. Payload: {payload.model_dump_json()}"
        )
        try:
            await self._producer.send_and_wait(topic, data)
            logger.info(f"Message successfully sent to Kafka topic '{topic}'.")
        except KafkaTimeoutError as e:
            logger.error(f"Timeout sending message to Kafka topic '{topic}': {e}")
            raise
        except KafkaConnectionError as e:
            logger.error(f"Connection error sending message to Kafka topic '{topic}': {e}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error sending message to Kafka topic '{topic}': {e}")
            raise

    async def subscribe(
        self,
        topic: str,
        handler: MessageHandler,
        message_cls: Type[BaseMessage] = BaseMessage,
        ready_event: Optional[asyncio.Event] = None,
        **kwargs,
    ) -> None:
        """Subscribe to a Kafka topic.

        Args:
            topic: Topic to subscribe to.
            handler: Callback function to handle received messages.
            message_cls: The message class for deserialization (unused in this driver).
            ready_event: Optional asyncio.Event to signal when consumer is ready.
            **kwargs: Additional keyword arguments for AIOKafkaConsumer.
        """
        servers = f"{self.config.url}:{self.config.port}"
        listener_instance = None
        if ready_event:
            listener_instance = _KafkaReadyListener(ready_event, topic)

        if self._consumer is None:
            # Default Kafka consumer settings from config
            consumer_config = {
                "group_id": self.config.group_id or f"default-group-{uuid.uuid4().hex}",
                "auto_offset_reset": self.config.auto_offset_reset,
                "session_timeout_ms": 30000,
                "heartbeat_interval_ms": 10000,
                "max_poll_interval_ms": 300000,
            }
            # Allow kwargs to override config values
            consumer_config.update(kwargs)
            logger.info(
                f"Creating Kafka consumer for topic '{topic}' with effective config: {consumer_config}"
            )
            try:
                # Explicitly pass the running event loop
                loop = asyncio.get_running_loop()
                self._consumer = AIOKafkaConsumer(
                    bootstrap_servers=servers,
                    loop=loop,  # Pass loop
                    **consumer_config,  # Pass merged config
                )
                await self._consumer.start()
                logger.info(f"Kafka consumer started for topic '{topic}'. Subscribing...")
                # Pass listener if provided
                self._consumer.subscribe(
                    [topic], listener=listener_instance
                )  # This call is synchronous
                logger.info(f"Successfully subscribed to initial topic '{topic}'.")
                if self._reader_task is None or self._reader_task.done():
                    self._reader_task = asyncio.create_task(self._reader())
                    logger.info("Kafka reader task started.")
            except KafkaConnectionError as e:
                logger.error(f"Failed to connect Kafka consumer: {e}")
                # Consider how to handle this error (e.g., raise or return status)
                return  # Or raise specific exception
            except Exception as e:
                logger.error(f"An unexpected error occurred during Kafka consumer setup: {e}")
                return  # Or raise

        else:
            # Consumer already exists, add subscription if not already subscribed
            current_subscription = self._consumer.subscription()
            if topic not in current_subscription:
                new_topic_list = list(current_subscription) + [topic]
                logger.info(
                    f"Adding subscription for topic '{topic}'. Current: {current_subscription}, New: {new_topic_list}"
                )
                try:
                    # Pass listener if provided and applicable for re-subscribe
                    self._consumer.subscribe(  # This call is synchronous
                        topics=new_topic_list, listener=listener_instance
                    )
                    logger.info(f"Successfully added subscription for topic '{topic}'.")
                except Exception as e:
                    logger.error(f"Failed to add subscription for topic '{topic}': {e}")
                    return  # Or raise

        # Register handler
        if topic not in self._listeners:
            self._listeners[topic] = []
        if handler not in self._listeners[topic]:
            self._listeners[topic].append(handler)
            logger.info(f"Handler registered for topic '{topic}'.")

    async def _reader(self) -> None:
        """Single reader for all subscribed topics"""
        if not self._consumer:
            logger.error("Kafka consumer is not initialized. Reader cannot start.")
            return

        logger.info("Kafka message reader task started.")
        try:
            async for msg in self._consumer:
                topic = msg.topic
                partition = msg.partition
                offset = msg.offset
                logger.debug(
                    f"Received message from Kafka. Topic: {topic}, Partition: {partition}, Offset: {offset}, Key: {msg.key}, Timestamp: {msg.timestamp}"
                )

                # Log the raw message value before attempting to decode/process
                try:
                    if msg.value:
                        # Attempt to decode for logging, but handle potential errors
                        decoded_value_for_log = msg.value.decode("utf-8")
                        logger.debug(
                            f"Raw message value (decoded for log): {decoded_value_for_log}"
                        )
                    else:
                        logger.debug("Raw message value is None")
                except UnicodeDecodeError:
                    logger.debug(
                        f"Raw message value (bytes, could not decode as UTF-8): {msg.value!r}"
                    )
                except Exception as e_dec:
                    logger.warning(f"Could not decode message value for logging: {e_dec}")

                handlers = self._listeners.get(topic, [])
                if not handlers:
                    logger.warning(
                        f"No handlers registered for topic '{topic}'. Discarding message from Partition: {partition}, Offset: {offset}."
                    )
                    continue

                if not msg.value:
                    logger.debug(f"Skipping message with empty value from topic '{topic}'.")
                    continue

                logger.debug(
                    f"Processing message from topic '{topic}' with {len(handlers)} handler(s)."
                )
                try:
                    deserialized_message = BaseMessage.model_validate_json(msg.value)
                    for i, h in enumerate(handlers):
                        handler_name = h.__name__ if hasattr(h, "__name__") else str(h)
                        logger.debug(
                            f"Invoking handler {i+1}/{len(handlers)} ('{handler_name}') for topic '{topic}'."
                        )
                        try:
                            await h(deserialized_message)
                            logger.debug(
                                f"Handler '{handler_name}' completed for message from topic '{topic}'."
                            )
                        except Exception as e:
                            logger.error(
                                f"Handler '{handler_name}' for topic '{topic}' raised an exception: {e}",
                                exc_info=True,
                            )
                except Exception as e:
                    logger.error(
                        f"Failed to deserialize message for topic '{topic}': {e}", exc_info=True
                    )
        except asyncio.CancelledError:
            logger.info("Kafka reader task was cancelled.")
        except KafkaConnectionError as e:
            logger.error(f"Kafka connection error in reader task: {e}", exc_info=True)
            # Potentially try to reconnect or signal an issue, for now just log and exit task
        except Exception as e:
            logger.error(f"Unexpected error in Kafka reader task: {e}", exc_info=True)
        finally:
            logger.info("Kafka message reader task finished.")

    def is_connected(self) -> bool:
        return self._producer is not None and not self._producer._closed

    async def unsubscribe(self, topic: str) -> None:
        """Unsubscribe from a topic (Kafka doesn't support individual topic unsubscription easily)."""
        logger.warning(
            f"Kafka driver does not support unsubscribing from individual topics. Topic: {topic}"
        )
        # Kafka consumers are typically group-based and subscribe to multiple topics
        # Individual topic unsubscription would require recreating the consumer
        pass

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Format Kafka-specific connection error details."""
        if "kafka" in str(type(error)).lower():
            return f"Kafka error: {error}"
        return None

format_connection_error_details(error)

Format Kafka-specific connection error details.

Source code in src/argentic/core/messager/drivers/KafkaDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Format Kafka-specific connection error details."""
    if "kafka" in str(type(error)).lower():
        return f"Kafka error: {error}"
    return None

subscribe(topic, handler, message_cls=BaseMessage, ready_event=None, **kwargs) async

Subscribe to a Kafka topic.

Parameters:

Name Type Description Default
topic str

Topic to subscribe to.

required
handler MessageHandler

Callback function to handle received messages.

required
message_cls Type[BaseMessage]

The message class for deserialization (unused in this driver).

BaseMessage
ready_event Optional[Event]

Optional asyncio.Event to signal when consumer is ready.

None
**kwargs

Additional keyword arguments for AIOKafkaConsumer.

{}
Source code in src/argentic/core/messager/drivers/KafkaDriver.py
async def subscribe(
    self,
    topic: str,
    handler: MessageHandler,
    message_cls: Type[BaseMessage] = BaseMessage,
    ready_event: Optional[asyncio.Event] = None,
    **kwargs,
) -> None:
    """Subscribe to a Kafka topic.

    Args:
        topic: Topic to subscribe to.
        handler: Callback function to handle received messages.
        message_cls: The message class for deserialization (unused in this driver).
        ready_event: Optional asyncio.Event to signal when consumer is ready.
        **kwargs: Additional keyword arguments for AIOKafkaConsumer.
    """
    servers = f"{self.config.url}:{self.config.port}"
    listener_instance = None
    if ready_event:
        listener_instance = _KafkaReadyListener(ready_event, topic)

    if self._consumer is None:
        # Default Kafka consumer settings from config
        consumer_config = {
            "group_id": self.config.group_id or f"default-group-{uuid.uuid4().hex}",
            "auto_offset_reset": self.config.auto_offset_reset,
            "session_timeout_ms": 30000,
            "heartbeat_interval_ms": 10000,
            "max_poll_interval_ms": 300000,
        }
        # Allow kwargs to override config values
        consumer_config.update(kwargs)
        logger.info(
            f"Creating Kafka consumer for topic '{topic}' with effective config: {consumer_config}"
        )
        try:
            # Explicitly pass the running event loop
            loop = asyncio.get_running_loop()
            self._consumer = AIOKafkaConsumer(
                bootstrap_servers=servers,
                loop=loop,  # Pass loop
                **consumer_config,  # Pass merged config
            )
            await self._consumer.start()
            logger.info(f"Kafka consumer started for topic '{topic}'. Subscribing...")
            # Pass listener if provided
            self._consumer.subscribe(
                [topic], listener=listener_instance
            )  # This call is synchronous
            logger.info(f"Successfully subscribed to initial topic '{topic}'.")
            if self._reader_task is None or self._reader_task.done():
                self._reader_task = asyncio.create_task(self._reader())
                logger.info("Kafka reader task started.")
        except KafkaConnectionError as e:
            logger.error(f"Failed to connect Kafka consumer: {e}")
            # Consider how to handle this error (e.g., raise or return status)
            return  # Or raise specific exception
        except Exception as e:
            logger.error(f"An unexpected error occurred during Kafka consumer setup: {e}")
            return  # Or raise

    else:
        # Consumer already exists, add subscription if not already subscribed
        current_subscription = self._consumer.subscription()
        if topic not in current_subscription:
            new_topic_list = list(current_subscription) + [topic]
            logger.info(
                f"Adding subscription for topic '{topic}'. Current: {current_subscription}, New: {new_topic_list}"
            )
            try:
                # Pass listener if provided and applicable for re-subscribe
                self._consumer.subscribe(  # This call is synchronous
                    topics=new_topic_list, listener=listener_instance
                )
                logger.info(f"Successfully added subscription for topic '{topic}'.")
            except Exception as e:
                logger.error(f"Failed to add subscription for topic '{topic}': {e}")
                return  # Or raise

    # Register handler
    if topic not in self._listeners:
        self._listeners[topic] = []
    if handler not in self._listeners[topic]:
        self._listeners[topic].append(handler)
        logger.info(f"Handler registered for topic '{topic}'.")

unsubscribe(topic) async

Unsubscribe from a topic (Kafka doesn't support individual topic unsubscription easily).

Source code in src/argentic/core/messager/drivers/KafkaDriver.py
async def unsubscribe(self, topic: str) -> None:
    """Unsubscribe from a topic (Kafka doesn't support individual topic unsubscription easily)."""
    logger.warning(
        f"Kafka driver does not support unsubscribing from individual topics. Topic: {topic}"
    )
    # Kafka consumers are typically group-based and subscribe to multiple topics
    # Individual topic unsubscription would require recreating the consumer
    pass

RabbitMQ

core.messager.drivers.RabbitMQDriver.RabbitMQDriver

Bases: BaseDriver[RabbitMQDriverConfig]

Source code in src/argentic/core/messager/drivers/RabbitMQDriver.py
class RabbitMQDriver(BaseDriver[RabbitMQDriverConfig]):
    def __init__(self, config: RabbitMQDriverConfig):
        super().__init__(config)
        self._connection: Optional[aio_pika.RobustConnection] = None
        self._channel: Optional[aio_pika.Channel] = None
        # topic to list of handlers
        self._listeners: Dict[str, List[MessageHandler]] = {}
        # track queues per topic
        self._queues: Dict[str, aio_pika.Queue] = {}
        self.logger = logging.getLogger("RabbitMQDriver")

    async def connect(self) -> bool:
        # Determine virtualhost, defaulting to '/' if not specified or empty
        virtualhost = self.config.virtualhost or "/"
        # Ensure virtualhost starts with a slash if it's not empty and doesn't have one
        if virtualhost != "/" and not virtualhost.startswith("/"):
            virtualhost = "/" + virtualhost
        # If virtualhost is just '/', ensure no double slash in the URL if user was empty and we defaulted
        if virtualhost == "/":
            url_vhost_part = ""  # aio_pika implicitly uses / if path is empty
        else:
            url_vhost_part = virtualhost

        url = f"amqp://{self.config.user}:{self.config.password}@{self.config.url}:{self.config.port}{url_vhost_part}"
        self.logger.info(
            f"Connecting to RabbitMQ with URL: {url}"
        )  # Log the full URL for debugging
        self._connection = await aio_pika.connect_robust(url)
        self._channel = await self._connection.channel()
        self.logger.info(
            f"Connected to RabbitMQ at {self.config.url}:{self.config.port}, vhost: {virtualhost}"
        )
        return True

    async def disconnect(self) -> None:
        self.logger.info("Attempting to disconnect from RabbitMQ...")
        if self._connection and not self._connection.is_closed:
            self.logger.info(f"Connection object exists. is_closed: {self._connection.is_closed}")
            try:
                await self._connection.close()
                self.logger.info("Successfully closed RabbitMQ connection.")
            except Exception as e:
                self.logger.error(f"Error during RabbitMQ connection.close(): {e!r}", exc_info=True)
        else:
            self.logger.info("No active RabbitMQ connection object to close.")
        # Reset state
        self._connection = None
        self._channel = None
        self._listeners = {}
        self._queues = {}
        self.logger.info("RabbitMQ driver state reset after disconnect.")

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        if not self._channel:
            raise ConnectionError("RabbitMQ channel is not available.")
        try:
            exchange = await self._channel.declare_exchange(topic, aio_pika.ExchangeType.FANOUT)

            # Handle BaseMessage serialization
            body = payload.model_dump_json().encode("utf-8")

            message = aio_pika.Message(body=body)
            await exchange.publish(message, routing_key="")
            self.logger.debug(f"Published message to exchange: {topic}")
        except Exception as e:
            self.logger.error(f"Error publishing to exchange {topic}: {e}")
            raise

    async def subscribe(
        self,
        topic: str,
        handler: MessageHandler,
        message_cls: Type[BaseMessage] = BaseMessage,
        **kwargs,
    ) -> None:
        try:
            self.logger.info(f"Subscribe called for topic: {topic}")
            if not self._channel:
                self.logger.error(f"Cannot subscribe to topic '{topic}', channel is not available.")
                # Or raise an exception if this state is unexpected
                raise RuntimeError("RabbitMQ channel not available for subscription.")

            # register handler and setup consumer on first subscribe per topic
            if topic not in self._listeners:
                self._listeners[topic] = []
                self.logger.info(
                    f"First subscription for topic '{topic}', setting up exchange and queue."
                )
                # declare exchange and queue
                exchange = await self._channel.declare_exchange(topic, aio_pika.ExchangeType.FANOUT)
                self.logger.debug(f"Declared exchange '{topic}' type FANOUT.")
                queue = await self._channel.declare_queue(exclusive=True)
                self.logger.debug(f"Declared exclusive queue '{queue.name}' for topic '{topic}'.")
                await queue.bind(exchange)
                self.logger.debug(f"Bound queue '{queue.name}' to exchange '{topic}'.")
                self._queues[topic] = queue
                self.logger.info(f"Created and bound queue '{queue.name}' for topic: {topic}")

                # single reader for this topic
                async def _reader(message: aio_pika.IncomingMessage) -> None:
                    self.logger.debug(
                        f"[_reader for {topic}] Received raw message. Message ID: {message.message_id}, Correlation ID: {message.correlation_id}, Routing Key: {message.routing_key}"
                    )
                    try:
                        self.logger.debug(
                            f"[_reader for {topic}] Entering message.process() context for message ID: {message.message_id}"
                        )
                        async with message.process():
                            self.logger.debug(
                                f"[_reader for {topic}] Successfully entered message.process() for message ID: {message.message_id}. Body type: {type(message.body)}, Body (first 100 bytes): {message.body[:100]!r}"
                            )
                            topic_handlers = self._listeners.get(topic, [])
                            if not topic_handlers:
                                self.logger.warning(
                                    f"[_reader for {topic}] No handlers found for message ID: {message.message_id} after entering process context. This should not happen if consumer is active."
                                )
                                return

                            for i, h in enumerate(topic_handlers):
                                handler_name = getattr(h, "__name__", str(h))
                                self.logger.debug(
                                    f"[_reader for {topic}] Invoking handler {i+1}/{len(topic_handlers)} ('{handler_name}') for message ID: {message.message_id}"
                                )
                                try:
                                    # Deserialize message.body (bytes) to BaseMessage before passing to handler
                                    deserialized_message = BaseMessage.model_validate_json(
                                        message.body
                                    )
                                    await h(deserialized_message)
                                    self.logger.debug(
                                        f"[_reader for {topic}] Handler '{handler_name}' completed for message ID: {message.message_id}"
                                    )
                                except Exception as e_handler:
                                    self.logger.error(
                                        f"[_reader for {topic}] Handler '{handler_name}' failed for message ID: {message.message_id}: {e_handler!r}",
                                        exc_info=True,
                                    )
                        self.logger.debug(
                            f"[_reader for {topic}] Exited message.process() context for message ID: {message.message_id}"
                        )
                    except Exception as e_process:
                        self.logger.error(
                            f"[_reader for {topic}] Exception during message.process() or handler invocation for message ID: {message.message_id}: {e_process!r}",
                            exc_info=True,
                        )
                        # Depending on aio_pika's behavior, we might need to nack or requeue explicitly if message.process() fails internally
                        # For now, logging the error. If message.process() raises, it typically means the message wasn't acked.

                # Start consumer
                self.logger.info(f"Starting consumer for queue '{queue.name}' on topic '{topic}'.")
                consumer_tag = await queue.consume(
                    _reader
                )  # Store consumer_tag if needed for cancellation
                self.logger.info(
                    f"Started consumer for queue '{queue.name}' (topic: {topic}) with consumer_tag: {consumer_tag}."
                )
                # Store consumer_tag in self._queues or a new dict if you need to cancel specific consumers later
                # For example: self._queues[topic] = {'queue': queue, 'consumer_tag': consumer_tag}

            self._listeners[topic].append(handler)
            self.logger.info(
                f"Added handler for topic: {topic}, total handlers: {len(self._listeners[topic])}"
            )
        except Exception as e:
            self.logger.error(f"Error subscribing to topic {topic}: {e}")
            raise

    def is_connected(self) -> bool:
        return bool(self._connection and not getattr(self._connection, "is_closed", True))

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Extracts RabbitMQ specific Connection.Close frame details from an exception."""
        details = []
        details.append(f"Exception type: {type(error)}")
        # Use hasattr and getattr for safe attribute access
        if hasattr(error, "args") and error.args:
            details.append(f"Exception args: {error.args!r}")
            if (
                len(error.args) > 1
                and hasattr(error.args[1], "reply_code")
                and hasattr(error.args[1], "reply_text")
            ):
                close_frame = error.args[1]
                details.append(
                    f"RabbitMQ Connection.Close frame (from e.args[1]): "
                    f"reply_code={close_frame.reply_code}, reply_text='{close_frame.reply_text}'"
                )
                return "\n".join(details)
            elif (
                hasattr(error, "frame")
                and hasattr(error.frame, "reply_code")
                and hasattr(error.frame, "reply_text")
            ):
                close_frame = error.frame
                details.append(
                    f"RabbitMQ Connection.Close frame (from e.frame): "
                    f"reply_code={close_frame.reply_code}, reply_text='{close_frame.reply_text}'"
                )
                return "\n".join(details)

        if len(details) > 1:
            return "\n".join(details)

        return None

    async def unsubscribe(self, topic: str) -> None:
        """Unsubscribe from a RabbitMQ topic (queue)."""
        # RabbitMQ unsubscription is handled by closing the consumer
        # In practice, this is complex as it involves queue management
        self.logger.warning(f"RabbitMQ driver unsubscribe not fully implemented for topic: {topic}")
        # In a full implementation, you would need to:
        # 1. Find the consumer for this topic/queue
        # 2. Cancel the consumer
        # 3. Potentially delete the queue if it was auto-created
        pass

format_connection_error_details(error)

Extracts RabbitMQ specific Connection.Close frame details from an exception.

Source code in src/argentic/core/messager/drivers/RabbitMQDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Extracts RabbitMQ specific Connection.Close frame details from an exception."""
    details = []
    details.append(f"Exception type: {type(error)}")
    # Use hasattr and getattr for safe attribute access
    if hasattr(error, "args") and error.args:
        details.append(f"Exception args: {error.args!r}")
        if (
            len(error.args) > 1
            and hasattr(error.args[1], "reply_code")
            and hasattr(error.args[1], "reply_text")
        ):
            close_frame = error.args[1]
            details.append(
                f"RabbitMQ Connection.Close frame (from e.args[1]): "
                f"reply_code={close_frame.reply_code}, reply_text='{close_frame.reply_text}'"
            )
            return "\n".join(details)
        elif (
            hasattr(error, "frame")
            and hasattr(error.frame, "reply_code")
            and hasattr(error.frame, "reply_text")
        ):
            close_frame = error.frame
            details.append(
                f"RabbitMQ Connection.Close frame (from e.frame): "
                f"reply_code={close_frame.reply_code}, reply_text='{close_frame.reply_text}'"
            )
            return "\n".join(details)

    if len(details) > 1:
        return "\n".join(details)

    return None

unsubscribe(topic) async

Unsubscribe from a RabbitMQ topic (queue).

Source code in src/argentic/core/messager/drivers/RabbitMQDriver.py
async def unsubscribe(self, topic: str) -> None:
    """Unsubscribe from a RabbitMQ topic (queue)."""
    # RabbitMQ unsubscription is handled by closing the consumer
    # In practice, this is complex as it involves queue management
    self.logger.warning(f"RabbitMQ driver unsubscribe not fully implemented for topic: {topic}")
    # In a full implementation, you would need to:
    # 1. Find the consumer for this topic/queue
    # 2. Cancel the consumer
    # 3. Potentially delete the queue if it was auto-created
    pass

Redis

core.messager.drivers.RedisDriver.RedisDriver

Bases: BaseDriver[RedisDriverConfig]

Source code in src/argentic/core/messager/drivers/RedisDriver.py
class RedisDriver(BaseDriver[RedisDriverConfig]):
    def __init__(self, config: RedisDriverConfig):
        if not AIOREDIS_INSTALLED:
            raise ImportError(
                "aioredis is not installed. "
                "Please install it with: uv pip install argentic[redis]"
            )
        super().__init__(config)
        self._redis: Optional["aioredis.Redis"] = None
        # topic to list of handlers
        self._listeners: Dict[str, List[MessageHandler]] = {}
        self._pubsub: Optional[PubSub] = None
        self._reader_task: Optional[asyncio.Task] = None

    async def connect(self) -> bool:
        url = f"redis://{self.config.url}:{self.config.port}"
        self._redis = await aioredis.from_url(
            url,
            password=self.config.password,
        )
        return True

    async def disconnect(self) -> None:
        if self._pubsub:
            await self._pubsub.close()
        if self._reader_task and not self._reader_task.done():
            self._reader_task.cancel()
            try:
                await self._reader_task
            except asyncio.CancelledError:
                pass

        if self._redis:
            await self._redis.close()

        self._redis = None
        self._pubsub = None
        self._reader_task = None

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        # Handle BaseMessage serialization
        data = payload.model_dump_json()

        if not self._redis:
            raise ConnectionError("Redis client not connected")
        await self._redis.publish(topic, data)

    async def subscribe(
        self,
        topic: str,
        handler: MessageHandler,
        message_cls: Type[BaseMessage] = BaseMessage,
        **kwargs,
    ) -> None:
        if not self._redis:
            raise ConnectionError("Redis client not connected")
        # register handler and subscribe on first handler per topic
        if topic not in self._listeners:
            self._listeners[topic] = []
            # initialize pubsub and reader
            if self._pubsub is None:
                self._pubsub = self._redis.pubsub()
            await self._pubsub.subscribe(topic)
            if self._reader_task is None or self._reader_task.done():
                self._reader_task = asyncio.create_task(self._reader())
        self._listeners[topic].append(handler)

    async def _reader(self) -> None:
        # single reader for all topics
        if not self._pubsub:
            return
        async for message in self._pubsub.listen():
            if message.get("type") == "message":
                channel_bytes = message.get("channel")
                if not channel_bytes:
                    continue
                channel = channel_bytes.decode()
                data = message.get("data")
                if not data:
                    continue

                try:
                    msg_obj = BaseMessage.model_validate_json(data)
                    for h in self._listeners.get(channel, []):
                        await h(msg_obj)
                except Exception:
                    # Potentially log this error
                    pass

    def is_connected(self) -> bool:
        return bool(self._redis and not getattr(self._redis, "closed", True))

    async def unsubscribe(self, topic: str) -> None:
        """Unsubscribe from a Redis topic."""
        if self._pubsub and topic in self._listeners:
            await self._pubsub.unsubscribe(topic)
            del self._listeners[topic]

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Format Redis-specific connection error details."""
        if "redis" in str(type(error)).lower():
            return f"Redis error: {error}"
        return None

format_connection_error_details(error)

Format Redis-specific connection error details.

Source code in src/argentic/core/messager/drivers/RedisDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Format Redis-specific connection error details."""
    if "redis" in str(type(error)).lower():
        return f"Redis error: {error}"
    return None

unsubscribe(topic) async

Unsubscribe from a Redis topic.

Source code in src/argentic/core/messager/drivers/RedisDriver.py
async def unsubscribe(self, topic: str) -> None:
    """Unsubscribe from a Redis topic."""
    if self._pubsub and topic in self._listeners:
        await self._pubsub.unsubscribe(topic)
        del self._listeners[topic]

ZeroMQ

core.messager.drivers.ZeroMQDriver.ZeroMQDriver

Bases: BaseDriver[ZeroMQDriverConfig]

ZeroMQ driver with XPUB/XSUB proxy support.

Architecture: - PUB socket: Connects to proxy frontend (XSUB) for publishing - SUB socket: Connects to proxy backend (XPUB) for subscribing - Proxy: Optional embedded or external XPUB/XSUB router

Wire format: " " Example: "agent/command/ask_question {"type":"ask_question",...}"

Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
class ZeroMQDriver(BaseDriver[ZeroMQDriverConfig]):
    """ZeroMQ driver with XPUB/XSUB proxy support.

    Architecture:
    - PUB socket: Connects to proxy frontend (XSUB) for publishing
    - SUB socket: Connects to proxy backend (XPUB) for subscribing
    - Proxy: Optional embedded or external XPUB/XSUB router

    Wire format: "<topic> <json_payload>"
    Example: "agent/command/ask_question {\"type\":\"ask_question\",...}"
    """

    def __init__(self, config: ZeroMQDriverConfig):
        """Initialize ZeroMQ driver.

        Args:
            config: ZeroMQ driver configuration

        Raises:
            ImportError: If pyzmq is not installed
        """
        if not ZMQ_INSTALLED:
            raise ImportError(
                "pyzmq is not installed. "
                "Please install it with: pip install argentic[zeromq]"
            )
        super().__init__(config)

        # ZeroMQ context and sockets
        self._context: Optional[zmq.asyncio.Context] = None
        self._pub_socket: Optional[zmq.asyncio.Socket] = None
        self._sub_socket: Optional[zmq.asyncio.Socket] = None

        # Proxy manager (if embedded mode)
        self._proxy_manager: Optional[ZMQProxyManager] = None

        # Message handling
        self._listeners: Dict[str, List[MessageHandler]] = {}
        self._reader_task: Optional[asyncio.Task] = None

        # Connection state
        self._connected = False

    async def connect(self) -> bool:
        """Connect to ZeroMQ proxy and initialize sockets.

        Workflow:
        1. Start proxy if embedded mode
        2. Create ZeroMQ context
        3. Create and configure PUB/SUB sockets
        4. Connect sockets to proxy
        5. Start reader task

        Returns:
            True if connection successful

        Raises:
            ConnectionError: If connection fails
        """
        if self._connected:
            logger.debug("Already connected to ZeroMQ")
            return True

        try:
            # Start embedded proxy if configured
            if self.config.start_proxy and self.config.proxy_mode == "embedded":
                frontend_url = f"tcp://{self.config.url}:{self.config.port}"
                backend_url = f"tcp://{self.config.url}:{self.config.backend_port}"

                self._proxy_manager = ZMQProxyManager(frontend_url, backend_url)
                self._proxy_manager.start()
                logger.info("Embedded ZeroMQ proxy started")

                # Wait for proxy to be ready
                await asyncio.sleep(0.2)

            # Create context and sockets
            self._context = zmq.asyncio.Context()

            # PUB socket for publishing (connects to frontend)
            self._pub_socket = self._context.socket(zmq.PUB)
            self._pub_socket.setsockopt(zmq.SNDHWM, self.config.high_water_mark)
            self._pub_socket.setsockopt(zmq.LINGER, self.config.linger)

            pub_url = f"tcp://{self.config.url}:{self.config.port}"
            self._pub_socket.connect(pub_url)
            logger.debug(f"PUB socket connected to {pub_url}")

            # SUB socket for subscribing (connects to backend)
            self._sub_socket = self._context.socket(zmq.SUB)
            self._sub_socket.setsockopt(zmq.RCVHWM, self.config.high_water_mark)

            sub_url = f"tcp://{self.config.url}:{self.config.backend_port}"
            self._sub_socket.connect(sub_url)
            logger.debug(f"SUB socket connected to {sub_url}")

            # Start reader task
            self._reader_task = asyncio.create_task(self._reader())
            logger.debug("Reader task started")

            self._connected = True
            logger.info("ZeroMQ driver connected successfully")
            return True

        except Exception as e:
            logger.error(f"Failed to connect to ZeroMQ: {e}", exc_info=True)
            await self.disconnect()
            raise ConnectionError(f"ZeroMQ connection failed: {e}") from e

    async def disconnect(self) -> None:
        """Disconnect from ZeroMQ and cleanup resources.

        Workflow:
        1. Stop reader task
        2. Close sockets
        3. Terminate context
        4. Stop proxy if embedded
        5. Clear state
        """
        if not self._connected:
            logger.debug("Not connected, skipping disconnect")
            return

        logger.info("Disconnecting ZeroMQ driver")

        # Stop reader task
        if self._reader_task and not self._reader_task.done():
            self._reader_task.cancel()
            try:
                await self._reader_task
            except asyncio.CancelledError:
                logger.debug("Reader task cancelled")

        # Close sockets
        if self._pub_socket:
            self._pub_socket.close(linger=0)
            self._pub_socket = None

        if self._sub_socket:
            self._sub_socket.close(linger=0)
            self._sub_socket = None

        # Terminate context
        if self._context:
            self._context.term()
            self._context = None

        # Stop embedded proxy
        if self._proxy_manager:
            self._proxy_manager.stop()
            self._proxy_manager = None

        # Clear state
        self._listeners.clear()
        self._reader_task = None
        self._connected = False

        logger.info("ZeroMQ driver disconnected")

    async def publish(
        self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
    ) -> None:
        """Publish message to topic.

        Wire format: "<topic> <json_payload>"

        Note: qos and retain parameters are ignored (ZeroMQ doesn't support them).

        Args:
            topic: Topic to publish to (must not contain spaces)
            payload: Message to publish
            qos: Ignored (for API compatibility)
            retain: Ignored (for API compatibility)

        Raises:
            ConnectionError: If not connected
            ValueError: If topic contains spaces
        """
        if not self._connected or not self._pub_socket:
            raise ConnectionError("ZeroMQ driver not connected")

        if " " in topic:
            raise ValueError(
                f"Topic '{topic}' contains spaces, which are not allowed in ZeroMQ driver. "
                "Use slashes or underscores instead."
            )

        # Serialize message
        json_data = payload.model_dump_json()

        # Format: "<topic> <json>"
        message = f"{topic} {json_data}"

        # Send with retry logic
        max_retries = 3
        for attempt in range(max_retries):
            try:
                await self._pub_socket.send_string(message, flags=zmq.NOBLOCK)
                logger.debug(f"Published to {topic}: {len(json_data)} bytes")
                return
            except zmq.Again:
                # Send buffer full, wait and retry
                if attempt < max_retries - 1:
                    await asyncio.sleep(0.01 * (attempt + 1))
                else:
                    raise ConnectionError(
                        f"Failed to publish to {topic}: send buffer full after {max_retries} retries"
                    )
            except zmq.ZMQError as e:
                logger.error(f"ZMQ error publishing to {topic}: {e}")
                raise ConnectionError(f"ZMQ publish error: {e}") from e

    async def subscribe(
        self,
        topic: str,
        handler: MessageHandler,
        message_cls: Type[BaseMessage] = BaseMessage,
        **kwargs,
    ) -> None:
        """Subscribe to topic and register handler.

        ZeroMQ uses prefix matching for topics. Subscribing to "agent/command"
        will receive messages for "agent/command/ask_question", etc.

        Args:
            topic: Topic to subscribe to
            handler: Async callback for messages
            message_cls: Message class for validation (currently unused)
            **kwargs: Additional arguments (ignored)

        Raises:
            ConnectionError: If not connected
        """
        if not self._connected or not self._sub_socket:
            raise ConnectionError("ZeroMQ driver not connected")

        # Register handler
        if topic not in self._listeners:
            self._listeners[topic] = []

            # Subscribe socket to topic (prefix matching)
            topic_bytes = topic.encode(self.config.topic_encoding)
            self._sub_socket.setsockopt(zmq.SUBSCRIBE, topic_bytes)
            logger.debug(f"Subscribed to topic: {topic}")

        self._listeners[topic].append(handler)
        logger.debug(f"Handler registered for {topic} ({len(self._listeners[topic])} total)")

    async def unsubscribe(self, topic: str) -> None:
        """Unsubscribe from topic and remove all handlers.

        Args:
            topic: Topic to unsubscribe from
        """
        if topic in self._listeners:
            # Unsubscribe socket
            if self._sub_socket:
                topic_bytes = topic.encode(self.config.topic_encoding)
                self._sub_socket.setsockopt(zmq.UNSUBSCRIBE, topic_bytes)
                logger.debug(f"Unsubscribed from topic: {topic}")

            # Remove handlers
            del self._listeners[topic]

    async def _reader(self) -> None:
        """Reader loop for incoming messages.

        Continuously receives messages from SUB socket, parses topic and payload,
        and dispatches to registered handlers. Runs until cancelled.

        Message format: "<topic> <json_payload>"
        """
        logger.debug("Reader loop started")

        while True:
            try:
                if not self._sub_socket:
                    logger.warning("SUB socket not available, stopping reader")
                    break

                # Receive message (blocks until available)
                message = await self._sub_socket.recv_string()

                # Parse: "<topic> <json>"
                parts = message.split(" ", 1)
                if len(parts) != 2:
                    logger.warning(f"Invalid message format (expected '<topic> <json>'): {message[:100]}")
                    continue

                topic, json_data = parts

                # Deserialize message
                try:
                    msg_obj = BaseMessage.model_validate_json(json_data)
                except Exception as e:
                    logger.warning(f"Failed to deserialize message from {topic}: {e}")
                    continue

                # Dispatch to handlers
                # Find all matching topics (prefix matching)
                matched_handlers = []
                for registered_topic, handlers in self._listeners.items():
                    if topic.startswith(registered_topic):
                        matched_handlers.extend(handlers)

                if not matched_handlers:
                    logger.debug(f"No handlers for topic: {topic}")
                    continue

                # Call handlers
                for handler in matched_handlers:
                    try:
                        await handler(msg_obj)
                    except Exception as e:
                        logger.error(
                            f"Handler error for topic {topic}: {e}",
                            exc_info=True
                        )

            except asyncio.CancelledError:
                logger.debug("Reader loop cancelled")
                break
            except zmq.ZMQError as e:
                logger.error(f"ZMQ error in reader: {e}")
                # Try to reconnect
                await self._reconnect()
            except Exception as e:
                logger.error(f"Unexpected error in reader: {e}", exc_info=True)
                await asyncio.sleep(0.1)  # Avoid tight loop on persistent errors

        logger.debug("Reader loop stopped")

    async def _reconnect(self) -> None:
        """Attempt to reconnect sockets and resubscribe to topics.

        This is called when a ZMQ error occurs in the reader loop.
        """
        logger.warning("Attempting to reconnect ZeroMQ sockets")

        try:
            # Save current subscriptions
            subscriptions = list(self._listeners.keys())

            # Close and recreate sockets
            if self._pub_socket:
                self._pub_socket.close(linger=0)

            if self._sub_socket:
                self._sub_socket.close(linger=0)

            # Recreate sockets
            if self._context:
                # PUB socket
                self._pub_socket = self._context.socket(zmq.PUB)
                self._pub_socket.setsockopt(zmq.SNDHWM, self.config.high_water_mark)
                self._pub_socket.setsockopt(zmq.LINGER, self.config.linger)
                pub_url = f"tcp://{self.config.url}:{self.config.port}"
                self._pub_socket.connect(pub_url)

                # SUB socket
                self._sub_socket = self._context.socket(zmq.SUB)
                self._sub_socket.setsockopt(zmq.RCVHWM, self.config.high_water_mark)
                sub_url = f"tcp://{self.config.url}:{self.config.backend_port}"
                self._sub_socket.connect(sub_url)

                # Resubscribe to all topics
                for topic in subscriptions:
                    topic_bytes = topic.encode(self.config.topic_encoding)
                    self._sub_socket.setsockopt(zmq.SUBSCRIBE, topic_bytes)

                logger.info("ZeroMQ sockets reconnected successfully")

        except Exception as e:
            logger.error(f"Reconnection failed: {e}", exc_info=True)
            self._connected = False

    def is_connected(self) -> bool:
        """Check if driver is connected.

        Returns:
            True if connected and sockets are available
        """
        return self._connected and self._pub_socket is not None and self._sub_socket is not None

    def format_connection_error_details(self, error: Exception) -> Optional[str]:
        """Format ZeroMQ-specific connection error details.

        Args:
            error: Exception that occurred

        Returns:
            Formatted error message or None
        """
        if "zmq" in str(type(error)).lower():
            return f"ZeroMQ error: {error}"
        return None

__init__(config)

Initialize ZeroMQ driver.

Parameters:

Name Type Description Default
config ZeroMQDriverConfig

ZeroMQ driver configuration

required

Raises:

Type Description
ImportError

If pyzmq is not installed

Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
def __init__(self, config: ZeroMQDriverConfig):
    """Initialize ZeroMQ driver.

    Args:
        config: ZeroMQ driver configuration

    Raises:
        ImportError: If pyzmq is not installed
    """
    if not ZMQ_INSTALLED:
        raise ImportError(
            "pyzmq is not installed. "
            "Please install it with: pip install argentic[zeromq]"
        )
    super().__init__(config)

    # ZeroMQ context and sockets
    self._context: Optional[zmq.asyncio.Context] = None
    self._pub_socket: Optional[zmq.asyncio.Socket] = None
    self._sub_socket: Optional[zmq.asyncio.Socket] = None

    # Proxy manager (if embedded mode)
    self._proxy_manager: Optional[ZMQProxyManager] = None

    # Message handling
    self._listeners: Dict[str, List[MessageHandler]] = {}
    self._reader_task: Optional[asyncio.Task] = None

    # Connection state
    self._connected = False

connect() async

Connect to ZeroMQ proxy and initialize sockets.

Workflow: 1. Start proxy if embedded mode 2. Create ZeroMQ context 3. Create and configure PUB/SUB sockets 4. Connect sockets to proxy 5. Start reader task

Returns:

Type Description
bool

True if connection successful

Raises:

Type Description
ConnectionError

If connection fails

Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
async def connect(self) -> bool:
    """Connect to ZeroMQ proxy and initialize sockets.

    Workflow:
    1. Start proxy if embedded mode
    2. Create ZeroMQ context
    3. Create and configure PUB/SUB sockets
    4. Connect sockets to proxy
    5. Start reader task

    Returns:
        True if connection successful

    Raises:
        ConnectionError: If connection fails
    """
    if self._connected:
        logger.debug("Already connected to ZeroMQ")
        return True

    try:
        # Start embedded proxy if configured
        if self.config.start_proxy and self.config.proxy_mode == "embedded":
            frontend_url = f"tcp://{self.config.url}:{self.config.port}"
            backend_url = f"tcp://{self.config.url}:{self.config.backend_port}"

            self._proxy_manager = ZMQProxyManager(frontend_url, backend_url)
            self._proxy_manager.start()
            logger.info("Embedded ZeroMQ proxy started")

            # Wait for proxy to be ready
            await asyncio.sleep(0.2)

        # Create context and sockets
        self._context = zmq.asyncio.Context()

        # PUB socket for publishing (connects to frontend)
        self._pub_socket = self._context.socket(zmq.PUB)
        self._pub_socket.setsockopt(zmq.SNDHWM, self.config.high_water_mark)
        self._pub_socket.setsockopt(zmq.LINGER, self.config.linger)

        pub_url = f"tcp://{self.config.url}:{self.config.port}"
        self._pub_socket.connect(pub_url)
        logger.debug(f"PUB socket connected to {pub_url}")

        # SUB socket for subscribing (connects to backend)
        self._sub_socket = self._context.socket(zmq.SUB)
        self._sub_socket.setsockopt(zmq.RCVHWM, self.config.high_water_mark)

        sub_url = f"tcp://{self.config.url}:{self.config.backend_port}"
        self._sub_socket.connect(sub_url)
        logger.debug(f"SUB socket connected to {sub_url}")

        # Start reader task
        self._reader_task = asyncio.create_task(self._reader())
        logger.debug("Reader task started")

        self._connected = True
        logger.info("ZeroMQ driver connected successfully")
        return True

    except Exception as e:
        logger.error(f"Failed to connect to ZeroMQ: {e}", exc_info=True)
        await self.disconnect()
        raise ConnectionError(f"ZeroMQ connection failed: {e}") from e

disconnect() async

Disconnect from ZeroMQ and cleanup resources.

Workflow: 1. Stop reader task 2. Close sockets 3. Terminate context 4. Stop proxy if embedded 5. Clear state

Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
async def disconnect(self) -> None:
    """Disconnect from ZeroMQ and cleanup resources.

    Workflow:
    1. Stop reader task
    2. Close sockets
    3. Terminate context
    4. Stop proxy if embedded
    5. Clear state
    """
    if not self._connected:
        logger.debug("Not connected, skipping disconnect")
        return

    logger.info("Disconnecting ZeroMQ driver")

    # Stop reader task
    if self._reader_task and not self._reader_task.done():
        self._reader_task.cancel()
        try:
            await self._reader_task
        except asyncio.CancelledError:
            logger.debug("Reader task cancelled")

    # Close sockets
    if self._pub_socket:
        self._pub_socket.close(linger=0)
        self._pub_socket = None

    if self._sub_socket:
        self._sub_socket.close(linger=0)
        self._sub_socket = None

    # Terminate context
    if self._context:
        self._context.term()
        self._context = None

    # Stop embedded proxy
    if self._proxy_manager:
        self._proxy_manager.stop()
        self._proxy_manager = None

    # Clear state
    self._listeners.clear()
    self._reader_task = None
    self._connected = False

    logger.info("ZeroMQ driver disconnected")

format_connection_error_details(error)

Format ZeroMQ-specific connection error details.

Parameters:

Name Type Description Default
error Exception

Exception that occurred

required

Returns:

Type Description
Optional[str]

Formatted error message or None

Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
def format_connection_error_details(self, error: Exception) -> Optional[str]:
    """Format ZeroMQ-specific connection error details.

    Args:
        error: Exception that occurred

    Returns:
        Formatted error message or None
    """
    if "zmq" in str(type(error)).lower():
        return f"ZeroMQ error: {error}"
    return None

is_connected()

Check if driver is connected.

Returns:

Type Description
bool

True if connected and sockets are available

Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
def is_connected(self) -> bool:
    """Check if driver is connected.

    Returns:
        True if connected and sockets are available
    """
    return self._connected and self._pub_socket is not None and self._sub_socket is not None

publish(topic, payload, qos=0, retain=False) async

Publish message to topic.

Wire format: " "

Note: qos and retain parameters are ignored (ZeroMQ doesn't support them).

Parameters:

Name Type Description Default
topic str

Topic to publish to (must not contain spaces)

required
payload BaseMessage

Message to publish

required
qos int

Ignored (for API compatibility)

0
retain bool

Ignored (for API compatibility)

False

Raises:

Type Description
ConnectionError

If not connected

ValueError

If topic contains spaces

Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
async def publish(
    self, topic: str, payload: BaseMessage, qos: int = 0, retain: bool = False
) -> None:
    """Publish message to topic.

    Wire format: "<topic> <json_payload>"

    Note: qos and retain parameters are ignored (ZeroMQ doesn't support them).

    Args:
        topic: Topic to publish to (must not contain spaces)
        payload: Message to publish
        qos: Ignored (for API compatibility)
        retain: Ignored (for API compatibility)

    Raises:
        ConnectionError: If not connected
        ValueError: If topic contains spaces
    """
    if not self._connected or not self._pub_socket:
        raise ConnectionError("ZeroMQ driver not connected")

    if " " in topic:
        raise ValueError(
            f"Topic '{topic}' contains spaces, which are not allowed in ZeroMQ driver. "
            "Use slashes or underscores instead."
        )

    # Serialize message
    json_data = payload.model_dump_json()

    # Format: "<topic> <json>"
    message = f"{topic} {json_data}"

    # Send with retry logic
    max_retries = 3
    for attempt in range(max_retries):
        try:
            await self._pub_socket.send_string(message, flags=zmq.NOBLOCK)
            logger.debug(f"Published to {topic}: {len(json_data)} bytes")
            return
        except zmq.Again:
            # Send buffer full, wait and retry
            if attempt < max_retries - 1:
                await asyncio.sleep(0.01 * (attempt + 1))
            else:
                raise ConnectionError(
                    f"Failed to publish to {topic}: send buffer full after {max_retries} retries"
                )
        except zmq.ZMQError as e:
            logger.error(f"ZMQ error publishing to {topic}: {e}")
            raise ConnectionError(f"ZMQ publish error: {e}") from e

subscribe(topic, handler, message_cls=BaseMessage, **kwargs) async

Subscribe to topic and register handler.

ZeroMQ uses prefix matching for topics. Subscribing to "agent/command" will receive messages for "agent/command/ask_question", etc.

Parameters:

Name Type Description Default
topic str

Topic to subscribe to

required
handler MessageHandler

Async callback for messages

required
message_cls Type[BaseMessage]

Message class for validation (currently unused)

BaseMessage
**kwargs

Additional arguments (ignored)

{}

Raises:

Type Description
ConnectionError

If not connected

Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
async def subscribe(
    self,
    topic: str,
    handler: MessageHandler,
    message_cls: Type[BaseMessage] = BaseMessage,
    **kwargs,
) -> None:
    """Subscribe to topic and register handler.

    ZeroMQ uses prefix matching for topics. Subscribing to "agent/command"
    will receive messages for "agent/command/ask_question", etc.

    Args:
        topic: Topic to subscribe to
        handler: Async callback for messages
        message_cls: Message class for validation (currently unused)
        **kwargs: Additional arguments (ignored)

    Raises:
        ConnectionError: If not connected
    """
    if not self._connected or not self._sub_socket:
        raise ConnectionError("ZeroMQ driver not connected")

    # Register handler
    if topic not in self._listeners:
        self._listeners[topic] = []

        # Subscribe socket to topic (prefix matching)
        topic_bytes = topic.encode(self.config.topic_encoding)
        self._sub_socket.setsockopt(zmq.SUBSCRIBE, topic_bytes)
        logger.debug(f"Subscribed to topic: {topic}")

    self._listeners[topic].append(handler)
    logger.debug(f"Handler registered for {topic} ({len(self._listeners[topic])} total)")

unsubscribe(topic) async

Unsubscribe from topic and remove all handlers.

Parameters:

Name Type Description Default
topic str

Topic to unsubscribe from

required
Source code in src/argentic/core/messager/drivers/ZeroMQDriver.py
async def unsubscribe(self, topic: str) -> None:
    """Unsubscribe from topic and remove all handlers.

    Args:
        topic: Topic to unsubscribe from
    """
    if topic in self._listeners:
        # Unsubscribe socket
        if self._sub_socket:
            topic_bytes = topic.encode(self.config.topic_encoding)
            self._sub_socket.setsockopt(zmq.UNSUBSCRIBE, topic_bytes)
            logger.debug(f"Unsubscribed from topic: {topic}")

        # Remove handlers
        del self._listeners[topic]