Skip to content

Chapter 5: MQTT Proxy

In Chapter 4, we explored the middleware services and HAL that give the hub its device-management smarts. But all those services need a way to talk to the cloud — to send telemetry up and receive commands down. That's where the MQTT Proxy comes in.

Think of the MQTT Proxy as a shared mailroom for the entire hub. Instead of every service opening its own connection to AWS IoT Core (expensive, hard to manage), they all hand their mail to one central proxy, which maintains a single MQTT connection upstream.

Why a Proxy?

AWS IoT Core communicates over MQTT, but opening multiple TLS connections from a resource-constrained hub is wasteful. The MQTT Proxy solves this with a simple rule: one connection to the cloud, many clients locally.

Each internal component (Agent, OTA Manager, Hub Onboarding, etc.) connects to the proxy over IPC — the same IPC framework we covered in Chapter 3. The proxy multiplexes all their messages over one MQTT session.

📁 The proxy lives in commonUtils/IoTSmartHomeDevice-MqttProxy/.

Architecture at a Glance

The proxy server is built from a handful of cooperating pieces, all wired together through a shared context:

// include/mqtt_proxy/core/context.h
struct MqttProxyContext {
  std::shared_ptr<MqttConnector> mqtt_connector;
  std::shared_ptr<ClientRegistry> registry;
  std::shared_ptr<WorkerThread> worker;
};

Three components, three responsibilities:

Component Role
MqttConnector Manages the single MQTT connection to AWS IoT Core
ClientRegistry Tracks which local clients subscribe to which topics
WorkerThread Queues and dispatches messages in both directions

Client Registration

Before a service can publish or subscribe through the proxy, it must register as a client. The ClientRegistry maintains a map of client names to their subscriptions:

// include/mqtt_proxy/core/client_registry.h
struct Client {
  std::unordered_map<std::string,
    std::unordered_set<std::string>> subscriptions;
};

Each client maps topics to sets of message types. When the Agent subscribes to a topic, the registry records it and the proxy subscribes to that topic on the real MQTT connection:

// src/mqtt_proxy/server/ipc_svc.cpp (subscribe handler)
status = registry->AddClient(req_msg->client_name);
status = registry->AddSubscription(
    req_msg->client_name, req_msg->topic,
    req_msg->message_type);
mqtt_connector->SubscribeTopic(req_msg->topic);

The proxy is smart about system topics (prefixed with $SYS). These are internal-only — they never get forwarded to the real MQTT broker.

The Client-Side API

Services don't talk to the proxy directly over raw IPC. They use the IotshdMqttProxyClient class, which wraps all the IPC plumbing into a clean interface:

// include/mqtt_proxy/client/api.h
class IotshdMqttProxyClient {
  iotmi_statusCode_t Connect(int8_t retry = 10);
  iotmi_statusCode_t SubscribeTopic(...);
  ResultMsg PublishToTopic(...);
  ResultMsg PublishToTopicAsync(...);
};

There's also a C wrapper in include/mqtt_proxy/client/api_c.h for components written in C. A typical usage looks like:

IotshdMqttProxyClient client("Agent");
client.Connect();
client.SubscribeTopic(topic, callback);
client.PublishToTopic(topic, payload, "DeviceEvent");

Message Flow: North and South

Messages flow in two directions, and the proxy uses a WorkerThread with two queues to handle them:

  • Northbound (device → cloud): A client publishes via IPC → the proxy enqueues the message → the worker thread pops it, wraps it in the Common Message Format, and publishes to AWS IoT Core.
  • Southbound (cloud → device): A message arrives from AWS IoT Core → the proxy enqueues it → the worker thread pops it, looks up subscribers in the registry, and fans it out over IPC.
// include/mqtt_proxy/core/worker_thread.h
enum class QueueType { kNorthBound, kSouthBound };

The worker thread continuously drains both queues, calling the appropriate handler for each direction.

Northbound: Publishing to the Cloud

When a client calls PublishToTopic, the message travels through IPC to the proxy server. The IPC handler stamps it with a UUID, timestamp, and the hub's managed_thing_id, then pushes it onto the northbound queue:

// src/mqtt_proxy/server/ipc_svc.cpp
strncpy(req_msg->message_id, uuid.c_str(), ...);
strncpy(req_msg->timestamp, timestamp.c_str(), ...);
worker->PushMessage(*req_msg, QueueType::kNorthBound);

The NorthBoundHandler in main.cpp picks it up, wraps the payload in the Common Message Format header, and calls mqtt_connector->PublishToTopic().

Southbound: Receiving from the Cloud

When a message arrives from AWS IoT Core, the MqttConnector fires the IncomingMqttMsgCallback, which pushes the message onto the southbound queue. The SouthBoundHandler then fans it out:

// src/mqtt_proxy/server/main.cpp (SouthBoundHandler)
auto subscribers = registry->GetSubscribers(
    msg->topic, msg->message_type);
for (const auto &subscriber : subscribers) {
  IotmiIpcSvc_PublishEvent(event_id, buffer, ...);
}

Each subscriber has a dedicated IPC publish socket. The proxy looks up the right socket index and pushes the message to exactly the clients that care about it.

The Sequence in Action

Here's the full round-trip when the Agent publishes a message and later receives a cloud response:

sequenceDiagram
    participant Agent
    participant ProxyClient as MqttProxy Client
    participant ProxyServer as MqttProxy Server
    participant Cloud as AWS IoT Core

    Agent->>ProxyClient: PublishToTopic(topic, payload)
    ProxyClient->>ProxyServer: IPC request (northbound)
    ProxyServer->>Cloud: MQTT PUBLISH
    Cloud->>ProxyServer: MQTT message (southbound)
    ProxyServer->>Agent: IPC publish (fan-out)

IPC Wiring: Dedicated Sockets per Client

One detail worth noting: the proxy doesn't use a single broadcast socket for southbound messages. It maintains dedicated IPC publish URLs for each known client type:

// include/mqtt_proxy/common/ipc_common.h
#define MQTTPROXY_AGENT_PUB_URL \
  "ipc:///tmp/iotshd_mqttproxy_agent_ipc_pub"
#define MQTTPROXY_OTA_PUB_URL \
  "ipc:///tmp/iotshd_mqttproxy_ota_ipc_pub"
#define MQTTPROXY_HUB_ONBOARDING_PUB_URL \
  "ipc:///tmp/iotshd_mqttproxy_hub_onboarding_ipc_pub"

This means the Agent only receives messages it subscribed to — it never sees OTA traffic, and vice versa. The GetPubUrlIdx() function maps a client name to its socket index.

The Connectors Module: aws-crt-cpp Wrapper

The actual MQTT connection is handled by the Connectors module at commonUtils/IoTSmartHomeDevice-Connectors/MqttClientCpp/. This is a thin wrapper around the aws-crt-cpp MQTT5 client library.

📁 include/DeviceMqttClient.hpp

The IotshDeviceMqttClient class provides:

bool connect();
bool disconnect();
bool subscribeTopic(String topic);
bool publishToTopic(String topic, String message);
bool reconnect();  // clean reconnect + re-subscribe

The proxy's MqttConnector (in include/mqtt_proxy/server/mqtt_connector.h) wraps this further, adding status-code returns and the callback plumbing that feeds incoming messages into the southbound queue.

Key connection parameters come from the hub's config file:

Config Key Purpose
endpoint AWS IoT Core endpoint URL
client_id MQTT client identifier
managed_thing_id The hub's identity in IoT Managed Integrations
cert_path / key_path TLS credentials (file path or secure storage)

MQTT Topics

The SDK defines two root topics for all cloud communication:

// include/mqtt/iotmi_mqtt_topics.h
#define IOTMI_MQTT_NORTH_BOUND_TOPIC \
  "$aws/rules/north..."
#define IOTMI_MQTT_SOUTH_BOUND_ROOT_TOPIC \
  "south..."

Northbound messages go through an AWS IoT Rules Engine topic. Southbound messages arrive on a dedicated root topic that the proxy subscribes to on behalf of its clients.

The MqttProxyMsg: A Flat, Serializable Envelope

Every message flowing through the proxy — in either direction — is wrapped in a MqttProxyMsg struct. This is a fixed-size, flat structure designed for efficient IPC serialization:

// include/mqtt_proxy/common/mqtt_proxy_msg.h
struct MqttProxyMsg {
  char client_name[33];
  char topic[129];
  char message_type[129];
  char message_id[37];
  char managed_thing_id[129];
  // ... plus payload pointer
};

The CastMqttProxyMsgToBuffer() and CastBufferToMqttProxyMsg() functions handle zero-copy serialization for IPC transport.

Startup Sequence

Looking at src/mqtt_proxy/server/main.cpp, the proxy boots in this order:

  1. Parse config — read endpoint, certs, client ID from the config file
  2. Wait for provisioning — block until the device is provisioned
  3. Create ClientRegistry — empty registry, ready for clients
  4. Create MqttConnector — set up the aws-crt-cpp MQTT5 client
  5. Create WorkerThread — wire up north/south handlers
  6. Start IPC server — begin accepting client requests
  7. Connect to MQTT — establish the cloud connection with exponential backoff retry

The retry logic is worth noting — it uses exponential backoff starting at 1 second, capping at 5 minutes:

// src/mqtt_proxy/server/main.cpp
const int INITIAL_BACKOFF_MS = 1000;
const int MAX_BACKOFF_MS = 5 * 60 * 1000;

Connection Status Notifications

The proxy also supports a $SYS/MqttProxy system topic that clients can subscribe to for connection status updates. This lets services know when the cloud connection drops or reconnects — without those subscriptions ever hitting the real MQTT broker.

Recap

Concept What It Does Key File
MqttProxyContext Wires together all proxy components core/context.h
ClientRegistry Tracks client subscriptions core/client_registry.h
WorkerThread Queues messages north and south core/worker_thread.h
MqttConnector Manages the single MQTT connection server/mqtt_connector.h
IotshdMqttProxyClient Client-side API for services client/api.h
MqttProxyMsg Flat message envelope for IPC common/mqtt_proxy_msg.h
IotshDeviceMqttClient aws-crt-cpp MQTT5 wrapper Connectors/MqttClientCpp/

What's Next?

Now that we understand how the hub communicates with the cloud, we're ready to see it all come together. In Chapter 6: Hub Onboarding, we'll walk through the process that takes a fresh hub from unprovisioned to fully registered with AWS IoT Core — the moment the MQTT Proxy's connection first lights up and the hub joins the IoT Managed Integrations service.