Message Routing
How RevenProx routes messages to subscribers.
Topic Router
Data Structure
TopicRouter
├── local_subscriptions: HashMap<TopicUUID, HashSet<ConnectionId>>
├── remote_subscriptions: BloomFilter
├── peer_routing: HashMap<TopicUUID, Set<PeerId>>
├── mutex: RWLock
└── nng_publisher: Publisher
Subscription Index
Maps topics to subscribers for O(1) lookup:
Topic A (UUID: abc123...)
├── Connection 1
├── Connection 5
└── Connection 12
Topic B (UUID: def456...)
├── Connection 3
└── Connection 8
Local Routing
Message Broadcast
broadcast(topic, message):
1. Lock routing table (read)
2. Get subscriber set for topic
3. For each subscriber:
a. Get connection from pool
b. Enqueue message
c. Signal writer
4. Unlock routing table
Complexity
| Subscribers | Broadcast Time |
|---|---|
| 1 | O(1) |
| 10 | O(10) |
| 100 | O(100) |
| 1000 | O(1000) |
Linear in subscriber count, but each enqueue is O(1).
Distributed Routing
Subscription Propagation
When a client subscribes:
1. Local: Add to local subscription map
2. Local: Update Bloom filter
3. Remote: Publish subscription event via NNG
4. Peers: Receive and update their Bloom filters
Message Forwarding
On message for topic T:
1. Deliver to local subscribers
2. Check Bloom filter for remote interest
3. If possibly remote:
a. Query peer routing table
b. Forward to interested peers
4. Peers deliver to their local subscribers
Bloom Filter Optimization
Bloom filters provide fast "maybe/no" answers:
check_remote_interest(topic):
If bloom_filter.may_contain(topic):
// Might have remote subscribers
// Check peer routing for certainty
return query_peer_routing(topic)
Else:
// Definitely no remote subscribers
return empty_set
False Positive Handling: - Bloom filter says "maybe" but no actual subscribers - Message forwarded unnecessarily - Peer discards (no local subscribers) - Overhead but no correctness issue
NNG Communication
Publisher-Subscriber Pattern
┌─────────────┐ ┌─────────────┐
│ Proxy A │ │ Proxy B │
│ │ │ │
│ ┌─────────┐ │ NNG │ ┌─────────┐ │
│ │ Pub │─┼────────►│ │ Sub │ │
│ └─────────┘ │ │ └─────────┘ │
│ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Sub │◄┼─────────┼─│ Pub │ │
│ └─────────┘ │ │ └─────────┘ │
└─────────────┘ └─────────────┘
Message Types
| Type | Purpose |
|---|---|
SUB_ADD |
New subscription |
SUB_REMOVE |
Subscription removed |
EVENT |
Message to broadcast |
HEARTBEAT |
Peer alive signal |
SYNC_REQ |
Request state sync |
SYNC_RESP |
State sync response |
Message Format
┌────────────────────────────────┐
│ Type (1 byte) │
│ Flags (1 byte) │
│ Source Peer ID (16 bytes) │
│ Topic UUID (16 bytes) │
│ Sequence Number (8 bytes) │
│ Payload Length (4 bytes) │
│ Payload (variable) │
│ Checksum (4 bytes) │
└────────────────────────────────┘
Gossip Protocol
State Synchronization
Periodic gossip ensures consistency:
Every gossip_interval_sec:
1. Select random peers
2. Exchange Bloom filter digests
3. Identify differences
4. Exchange missing subscriptions
Anti-Entropy
Merkle trees detect inconsistencies:
Root
/ \
A B
/ \ / \
1 2 3 4
Compare roots:
- Same → Consistent
- Different → Traverse to find differences
Conflict Resolution
Vector clocks determine ordering:
Event A: {proxy1: 5, proxy2: 3}
Event B: {proxy1: 4, proxy2: 4}
Neither dominates → Concurrent events
Resolution: Merge (subscriptions are additive)
Routing Tables
Local Subscriptions
const LocalSubscriptions = struct {
topics: std.AutoHashMap([16]u8, std.AutoHashMap(u64, void)),
pub fn subscribe(self: *LocalSubscriptions, topic: [16]u8, conn: u64) void {
var subs = self.topics.getOrPut(topic) catch return;
if (!subs.found_existing) {
subs.value_ptr.* = std.AutoHashMap(u64, void).init(allocator);
}
subs.value_ptr.put(conn, {}) catch return;
}
pub fn getSubscribers(self: *LocalSubscriptions, topic: [16]u8) ?[]const u64 {
if (self.topics.get(topic)) |subs| {
return subs.keys();
}
return null;
}
};
Peer Routing
const PeerRouting = struct {
peers: std.AutoHashMap([16]u8, std.AutoHashMap([16]u8, void)),
pub fn getInterestedPeers(self: *PeerRouting, topic: [16]u8) ?[]const [16]u8 {
if (self.peers.get(topic)) |peer_set| {
return peer_set.keys();
}
return null;
}
};
Message Deduplication
Event IDs
Each event has a unique ID:
Dedup Window
Recent event IDs are tracked:
Duplicate Detection
on_message_received(event):
If dedup_cache.contains(event.id):
// Duplicate, discard
return
dedup_cache.add(event.id)
process_event(event)
Fanout Optimization
Batching
Group messages for efficiency:
Batch Buffer:
├── messages: [Message; BATCH_SIZE]
├── count: usize
├── last_flush: Timestamp
If count >= BATCH_SIZE or now - last_flush > BATCH_TIMEOUT:
flush_batch()
Subscriber Grouping
For large fanout, group by network locality:
Topic with 10,000 subscribers:
├── Local: 3,000 subscribers → Direct delivery
├── Peer A: 4,000 subscribers → Single forward
└── Peer B: 3,000 subscribers → Single forward
Metrics
Routing Metrics
| Metric | Description |
|---|---|
messages_routed |
Total messages routed |
messages_forwarded |
Messages sent to peers |
fanout_avg |
Average subscribers per message |
routing_latency_ms |
Time to route message |
Subscription Metrics
| Metric | Description |
|---|---|
subscriptions_local |
Local subscription count |
subscriptions_remote |
Remote (Bloom filter) count |
topics_active |
Topics with subscribers |
Performance Tuning
High Fanout Topics
For topics with many subscribers:
[http]
thread_pool_size = 32 # More workers
max_message_queue_size = 100 # Smaller queues
backpressure_policy = "drop_oldest"
Low Latency
For time-sensitive messages:
Memory Efficiency
For many topics with few subscribers: