Skip to content

NNG Messaging

Machineuse uses NNG (Nanomsg Next Generation) for high-performance inter-node communication.

Why NNG?

  • No External Broker: Direct peer-to-peer communication
  • High Performance: Zero-copy message passing
  • Multiple Patterns: REQ/REP, PUB/SUB, PUSH/PULL
  • Cross-Platform: Consistent behavior across systems

Communication Patterns

REQ/REP (Request/Reply)

Used for command-response interactions between control plane and workers.

Control Plane                    Worker
     │                             │
     │──── CreateInstance ────────►│
     │                             │
     │◄─── InstanceCreated ────────│
     │                             │

Use Cases: - Instance creation/deletion - Configuration updates - Health checks

Example:

# Control Plane (Requester)
from machineuse.core.messaging import MessagingClient

client = MessagingClient("control-plane")
await client.connect_request_socket("tcp://worker-1:5555")

response = await client.send_request(
    endpoint="/instances/create",
    data={"image": "ubuntu:22.04"}
)
# Worker (Responder)
from machineuse.core.messaging import MessagingServer

server = MessagingServer("worker-1")
await server.bind_reply_socket("tcp://0.0.0.0:5555")

@server.handler("/instances/create")
async def handle_create(request):
    instance = await create_instance(request.data)
    return {"instance_id": instance.id}

await server.run()

PUB/SUB (Publish/Subscribe)

Used for event broadcasting from control plane to all workers.

Control Plane
     │──── Event: NodeConfigUpdated ────►┌─────────┐
     │                                   │Worker 1 │
     │──── Event: NodeConfigUpdated ────►├─────────┤
     │                                   │Worker 2 │
     │──── Event: NodeConfigUpdated ────►├─────────┤
     │                                   │Worker N │
     └───────────────────────────────────┴─────────┘

Use Cases: - Configuration broadcasts - Cluster-wide announcements - Status updates

Example:

# Control Plane (Publisher)
publisher = MessagingClient("control-plane")
await publisher.bind_pub_socket("tcp://0.0.0.0:5556")

await publisher.publish("config.update", {
    "setting": "max_instances",
    "value": 100
})
# Worker (Subscriber)
subscriber = MessagingClient("worker-1")
await subscriber.connect_sub_socket("tcp://control-plane:5556")
await subscriber.subscribe("config.")

async for message in subscriber.receive():
    print(f"Received: {message.topic} - {message.data}")

PUSH/PULL (Task Distribution)

Used for distributing tasks across workers with load balancing.

Control Plane (PUSH)
     │──── Task ────►┌─────────┐
     │               │Worker 1 │ (PULL)
     │──── Task ────►├─────────┤
     │               │Worker 2 │ (PULL)
     │──── Task ────►├─────────┤
     │               │Worker N │ (PULL)
     └───────────────┴─────────┘

Use Cases: - Batch operations - Scheduled maintenance tasks - Background processing

Message Protocol

Message Structure

{
  "id": "msg_abc123",
  "type": "request",
  "endpoint": "/instances/create",
  "timestamp": "2026-03-19T10:30:00Z",
  "sender": "control-plane-1",
  "data": {
    "image": "ubuntu:22.04",
    "config": {}
  }
}

Message Types

Type Description
request Command requiring response
response Reply to request
event Broadcast notification
task Queued work item
heartbeat Health check signal

Heartbeat System

Workers send periodic heartbeats to the control plane:

Worker ──── Heartbeat ────► Control Plane
       │                         │
       │                    Check timeout
       │                         │
       │◄─── Acknowledged ───────│

Heartbeat Payload

{
  "node_id": "worker-1",
  "timestamp": "2026-03-19T10:30:00Z",
  "status": "online",
  "instances": 15,
  "resources": {
    "cpu_percent": 45.0,
    "memory_percent": 62.0,
    "disk_percent": 30.0
  }
}

Timeout Handling

  • Default heartbeat interval: 10 seconds
  • Timeout threshold: 30 seconds (3 missed heartbeats)
  • On timeout: Mark node offline, trigger migration

Error Handling

Retry Logic

async def send_with_retry(client, endpoint, data, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await client.send_request(endpoint, data)
        except TimeoutError:
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise

Circuit Breaker

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failures = 0
        self.threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.state = "closed"
        self.last_failure = None

    async def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure > self.reset_timeout:
                self.state = "half-open"
            else:
                raise CircuitOpenError()

        try:
            result = await func(*args, **kwargs)
            self.reset()
            return result
        except Exception as e:
            self.record_failure()
            raise

Configuration

Control Plane

{
  "messaging": {
    "bind_address": "tcp://0.0.0.0:5555",
    "pub_address": "tcp://0.0.0.0:5556",
    "heartbeat_interval_seconds": 10,
    "heartbeat_timeout_seconds": 30,
    "max_message_size_mb": 64
  }
}

Worker Node

{
  "messaging": {
    "control_plane_address": "tcp://control-plane:5555",
    "sub_address": "tcp://control-plane:5556",
    "heartbeat_interval_seconds": 10,
    "reconnect_interval_seconds": 5
  }
}

TLS Security

Enable TLS for secure communication:

{
  "messaging": {
    "tls": {
      "enabled": true,
      "cert_file": "/etc/machineuse/server.crt",
      "key_file": "/etc/machineuse/server.key",
      "ca_file": "/etc/machineuse/ca.crt",
      "verify_peer": true
    }
  }
}

Monitoring

Metrics

  • Messages sent/received per second
  • Average message latency
  • Error rate
  • Connection count

Logging

import logging

logging.getLogger("machineuse.messaging").setLevel(logging.DEBUG)

Log format:

2026-03-19 10:30:00 DEBUG messaging: Sent request msg_abc123 to worker-1
2026-03-19 10:30:01 DEBUG messaging: Received response for msg_abc123 (latency: 15ms)

Troubleshooting

Connection Issues

# Test connectivity
nc -zv control-plane 5555

# Check NNG socket status
machineuse-cli debug sockets

Message Timeouts

  • Check network latency
  • Verify message size limits
  • Review worker load

High Latency

  • Monitor queue depths
  • Check worker processing time
  • Consider scaling workers