NNG Messaging¶
Machineuse uses NNG (Nanomsg Next Generation) for high-performance inter-node communication.
Why NNG?¶
- No External Broker: Direct peer-to-peer communication
- High Performance: Zero-copy message passing
- Multiple Patterns: REQ/REP, PUB/SUB, PUSH/PULL
- Cross-Platform: Consistent behavior across systems
Communication Patterns¶
REQ/REP (Request/Reply)¶
Used for command-response interactions between control plane and workers.
Use Cases: - Instance creation/deletion - Configuration updates - Health checks
Example:
# Control Plane (Requester)
from machineuse.core.messaging import MessagingClient
client = MessagingClient("control-plane")
await client.connect_request_socket("tcp://worker-1:5555")
response = await client.send_request(
endpoint="/instances/create",
data={"image": "ubuntu:22.04"}
)
# Worker (Responder)
from machineuse.core.messaging import MessagingServer
server = MessagingServer("worker-1")
await server.bind_reply_socket("tcp://0.0.0.0:5555")
@server.handler("/instances/create")
async def handle_create(request):
instance = await create_instance(request.data)
return {"instance_id": instance.id}
await server.run()
PUB/SUB (Publish/Subscribe)¶
Used for event broadcasting from control plane to all workers.
Control Plane
│
│──── Event: NodeConfigUpdated ────►┌─────────┐
│ │Worker 1 │
│──── Event: NodeConfigUpdated ────►├─────────┤
│ │Worker 2 │
│──── Event: NodeConfigUpdated ────►├─────────┤
│ │Worker N │
└───────────────────────────────────┴─────────┘
Use Cases: - Configuration broadcasts - Cluster-wide announcements - Status updates
Example:
# Control Plane (Publisher)
publisher = MessagingClient("control-plane")
await publisher.bind_pub_socket("tcp://0.0.0.0:5556")
await publisher.publish("config.update", {
"setting": "max_instances",
"value": 100
})
# Worker (Subscriber)
subscriber = MessagingClient("worker-1")
await subscriber.connect_sub_socket("tcp://control-plane:5556")
await subscriber.subscribe("config.")
async for message in subscriber.receive():
print(f"Received: {message.topic} - {message.data}")
PUSH/PULL (Task Distribution)¶
Used for distributing tasks across workers with load balancing.
Control Plane (PUSH)
│
│──── Task ────►┌─────────┐
│ │Worker 1 │ (PULL)
│──── Task ────►├─────────┤
│ │Worker 2 │ (PULL)
│──── Task ────►├─────────┤
│ │Worker N │ (PULL)
└───────────────┴─────────┘
Use Cases: - Batch operations - Scheduled maintenance tasks - Background processing
Message Protocol¶
Message Structure¶
{
"id": "msg_abc123",
"type": "request",
"endpoint": "/instances/create",
"timestamp": "2026-03-19T10:30:00Z",
"sender": "control-plane-1",
"data": {
"image": "ubuntu:22.04",
"config": {}
}
}
Message Types¶
| Type | Description |
|---|---|
request | Command requiring response |
response | Reply to request |
event | Broadcast notification |
task | Queued work item |
heartbeat | Health check signal |
Heartbeat System¶
Workers send periodic heartbeats to the control plane:
Heartbeat Payload¶
{
"node_id": "worker-1",
"timestamp": "2026-03-19T10:30:00Z",
"status": "online",
"instances": 15,
"resources": {
"cpu_percent": 45.0,
"memory_percent": 62.0,
"disk_percent": 30.0
}
}
Timeout Handling¶
- Default heartbeat interval: 10 seconds
- Timeout threshold: 30 seconds (3 missed heartbeats)
- On timeout: Mark node offline, trigger migration
Error Handling¶
Retry Logic¶
async def send_with_retry(client, endpoint, data, max_retries=3):
for attempt in range(max_retries):
try:
return await client.send_request(endpoint, data)
except TimeoutError:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
raise
Circuit Breaker¶
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failures = 0
self.threshold = failure_threshold
self.reset_timeout = reset_timeout
self.state = "closed"
self.last_failure = None
async def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure > self.reset_timeout:
self.state = "half-open"
else:
raise CircuitOpenError()
try:
result = await func(*args, **kwargs)
self.reset()
return result
except Exception as e:
self.record_failure()
raise
Configuration¶
Control Plane¶
{
"messaging": {
"bind_address": "tcp://0.0.0.0:5555",
"pub_address": "tcp://0.0.0.0:5556",
"heartbeat_interval_seconds": 10,
"heartbeat_timeout_seconds": 30,
"max_message_size_mb": 64
}
}
Worker Node¶
{
"messaging": {
"control_plane_address": "tcp://control-plane:5555",
"sub_address": "tcp://control-plane:5556",
"heartbeat_interval_seconds": 10,
"reconnect_interval_seconds": 5
}
}
TLS Security¶
Enable TLS for secure communication:
{
"messaging": {
"tls": {
"enabled": true,
"cert_file": "/etc/machineuse/server.crt",
"key_file": "/etc/machineuse/server.key",
"ca_file": "/etc/machineuse/ca.crt",
"verify_peer": true
}
}
}
Monitoring¶
Metrics¶
- Messages sent/received per second
- Average message latency
- Error rate
- Connection count
Logging¶
Log format:
2026-03-19 10:30:00 DEBUG messaging: Sent request msg_abc123 to worker-1
2026-03-19 10:30:01 DEBUG messaging: Received response for msg_abc123 (latency: 15ms)
Troubleshooting¶
Connection Issues¶
# Test connectivity
nc -zv control-plane 5555
# Check NNG socket status
machineuse-cli debug sockets
Message Timeouts¶
- Check network latency
- Verify message size limits
- Review worker load
High Latency¶
- Monitor queue depths
- Check worker processing time
- Consider scaling workers