Skip to content

Message Routing

How RevenProx routes messages to subscribers.

Topic Router

Data Structure

TopicRouter
├── local_subscriptions: HashMap<TopicUUID, HashSet<ConnectionId>>
├── remote_subscriptions: BloomFilter
├── peer_routing: HashMap<TopicUUID, Set<PeerId>>
├── mutex: RWLock
└── nng_publisher: Publisher

Subscription Index

Maps topics to subscribers for O(1) lookup:

Topic A (UUID: abc123...)
├── Connection 1
├── Connection 5
└── Connection 12

Topic B (UUID: def456...)
├── Connection 3
└── Connection 8

Local Routing

Message Broadcast

broadcast(topic, message):
    1. Lock routing table (read)
    2. Get subscriber set for topic
    3. For each subscriber:
        a. Get connection from pool
        b. Enqueue message
        c. Signal writer
    4. Unlock routing table

Complexity

Subscribers Broadcast Time
1 O(1)
10 O(10)
100 O(100)
1000 O(1000)

Linear in subscriber count, but each enqueue is O(1).

Distributed Routing

Subscription Propagation

When a client subscribes:

1. Local: Add to local subscription map
2. Local: Update Bloom filter
3. Remote: Publish subscription event via NNG
4. Peers: Receive and update their Bloom filters

Message Forwarding

On message for topic T:
    1. Deliver to local subscribers
    2. Check Bloom filter for remote interest
    3. If possibly remote:
        a. Query peer routing table
        b. Forward to interested peers
    4. Peers deliver to their local subscribers

Bloom Filter Optimization

Bloom filters provide fast "maybe/no" answers:

check_remote_interest(topic):
    If bloom_filter.may_contain(topic):
        // Might have remote subscribers
        // Check peer routing for certainty
        return query_peer_routing(topic)
    Else:
        // Definitely no remote subscribers
        return empty_set

False Positive Handling: - Bloom filter says "maybe" but no actual subscribers - Message forwarded unnecessarily - Peer discards (no local subscribers) - Overhead but no correctness issue

NNG Communication

Publisher-Subscriber Pattern

┌─────────────┐         ┌─────────────┐
│   Proxy A   │         │   Proxy B   │
│             │         │             │
│ ┌─────────┐ │   NNG   │ ┌─────────┐ │
│ │   Pub   │─┼────────►│ │   Sub   │ │
│ └─────────┘ │         │ └─────────┘ │
│             │         │             │
│ ┌─────────┐ │         │ ┌─────────┐ │
│ │   Sub   │◄┼─────────┼─│   Pub   │ │
│ └─────────┘ │         │ └─────────┘ │
└─────────────┘         └─────────────┘

Message Types

Type Purpose
SUB_ADD New subscription
SUB_REMOVE Subscription removed
EVENT Message to broadcast
HEARTBEAT Peer alive signal
SYNC_REQ Request state sync
SYNC_RESP State sync response

Message Format

┌────────────────────────────────┐
│ Type (1 byte)                  │
│ Flags (1 byte)                 │
│ Source Peer ID (16 bytes)      │
│ Topic UUID (16 bytes)          │
│ Sequence Number (8 bytes)      │
│ Payload Length (4 bytes)       │
│ Payload (variable)             │
│ Checksum (4 bytes)             │
└────────────────────────────────┘

Gossip Protocol

State Synchronization

Periodic gossip ensures consistency:

Every gossip_interval_sec:
    1. Select random peers
    2. Exchange Bloom filter digests
    3. Identify differences
    4. Exchange missing subscriptions

Anti-Entropy

Merkle trees detect inconsistencies:

        Root
       /    \
     A        B
    / \      / \
   1   2    3   4

Compare roots:
- Same → Consistent
- Different → Traverse to find differences

Conflict Resolution

Vector clocks determine ordering:

Event A: {proxy1: 5, proxy2: 3}
Event B: {proxy1: 4, proxy2: 4}

Neither dominates → Concurrent events
Resolution: Merge (subscriptions are additive)

Routing Tables

Local Subscriptions

const LocalSubscriptions = struct {
    topics: std.AutoHashMap([16]u8, std.AutoHashMap(u64, void)),

    pub fn subscribe(self: *LocalSubscriptions, topic: [16]u8, conn: u64) void {
        var subs = self.topics.getOrPut(topic) catch return;
        if (!subs.found_existing) {
            subs.value_ptr.* = std.AutoHashMap(u64, void).init(allocator);
        }
        subs.value_ptr.put(conn, {}) catch return;
    }

    pub fn getSubscribers(self: *LocalSubscriptions, topic: [16]u8) ?[]const u64 {
        if (self.topics.get(topic)) |subs| {
            return subs.keys();
        }
        return null;
    }
};

Peer Routing

const PeerRouting = struct {
    peers: std.AutoHashMap([16]u8, std.AutoHashMap([16]u8, void)),

    pub fn getInterestedPeers(self: *PeerRouting, topic: [16]u8) ?[]const [16]u8 {
        if (self.peers.get(topic)) |peer_set| {
            return peer_set.keys();
        }
        return null;
    }
};

Message Deduplication

Event IDs

Each event has a unique ID:

{timestamp}-{sequence}-{proxy_id}
Example: 1703894400000-001-abc123

Dedup Window

Recent event IDs are tracked:

Dedup Cache (per topic):
├── Max entries: 1000
├── TTL: 60 seconds
└── Eviction: LRU

Duplicate Detection

on_message_received(event):
    If dedup_cache.contains(event.id):
        // Duplicate, discard
        return

    dedup_cache.add(event.id)
    process_event(event)

Fanout Optimization

Batching

Group messages for efficiency:

Batch Buffer:
├── messages: [Message; BATCH_SIZE]
├── count: usize
├── last_flush: Timestamp

If count >= BATCH_SIZE or now - last_flush > BATCH_TIMEOUT:
    flush_batch()

Subscriber Grouping

For large fanout, group by network locality:

Topic with 10,000 subscribers:
├── Local: 3,000 subscribers → Direct delivery
├── Peer A: 4,000 subscribers → Single forward
└── Peer B: 3,000 subscribers → Single forward

Metrics

Routing Metrics

Metric Description
messages_routed Total messages routed
messages_forwarded Messages sent to peers
fanout_avg Average subscribers per message
routing_latency_ms Time to route message

Subscription Metrics

Metric Description
subscriptions_local Local subscription count
subscriptions_remote Remote (Bloom filter) count
topics_active Topics with subscribers

Performance Tuning

High Fanout Topics

For topics with many subscribers:

[http]
thread_pool_size = 32        # More workers
max_message_queue_size = 100 # Smaller queues
backpressure_policy = "drop_oldest"

Low Latency

For time-sensitive messages:

[nng]
message_batch_size = 10
message_batch_timeout_ms = 5

Memory Efficiency

For many topics with few subscribers:

[distributed_state]
bloom_filter_capacity = 100000
bloom_filter_fpr = 0.01

Next Steps