Skip to content

Connection Management

How RevenProx manages client connections.

Connection Lifecycle

┌─────────────┐
│   Accept    │ ← New TCP connection
└──────┬──────┘
┌─────────────┐
│   Parse     │ ← Extract headers, path
└──────┬──────┘
┌─────────────┐
│ Authenticate│ ← Verify JWT token
└──────┬──────┘
┌─────────────┐
│  Register   │ ← Add to connection pool
└──────┬──────┘
┌─────────────┐
│  Subscribe  │ ← Register for topic
└──────┬──────┘
┌─────────────┐
│   Active    │ ← Streaming events
└──────┬──────┘
       │ (timeout/disconnect/error)
┌─────────────┐
│   Cleanup   │ ← Remove from pool
└─────────────┘

Connection States

State Description Transitions To
connecting Initial TCP handshake active, closed
active Receiving events idle, closing, closed
idle No recent activity active, closing
closing Graceful shutdown closed
closed Connection terminated -

Connection Pool

Data Structure

ConnectionPool
├── connections: HashMap<ConnectionId, Connection>
├── topic_subscriptions: HashMap<TopicUUID, Set<ConnectionId>>
├── mutex: Mutex
├── next_id: AtomicU64
└── config: PoolConfig

Operations

Add Connection:

1. Acquire mutex
2. Generate unique ID
3. Insert into connections map
4. Release mutex
5. Return connection ID

Remove Connection:

1. Acquire mutex
2. Remove from connections map
3. Remove from all topic subscriptions
4. Release mutex
5. Close underlying socket

Subscribe to Topic:

1. Acquire mutex
2. Add connection ID to topic's subscriber set
3. Update Bloom filter
4. Release mutex
5. Notify peers (async)

Concurrency

The pool uses a mutex for most operations, but optimizes for the common case:

  • Fast path: Read-only operations use read lock
  • Slow path: Mutations use write lock
  • Atomic counters for statistics

Message Queues

Each connection has a bounded message queue:

RingBuffer
├── items: [Message; CAPACITY]
├── head: usize (next read position)
├── tail: usize (next write position)
└── count: AtomicUsize

Backpressure Handling

When queue is full:

Policy Behavior
drop_newest Reject new message
drop_oldest Remove oldest, add new
block Wait for space (not recommended)

Queue Sizing

Memory per connection = QUEUE_SIZE × AVG_MESSAGE_SIZE

Example:
1000 messages × 200 bytes = 200 KB per connection
100,000 connections = 20 GB

Keepalive Mechanism

Purpose

Keepalives serve to: 1. Detect dead connections 2. Prevent proxy/firewall timeouts 3. Maintain NAT mappings

Implementation

Every keepalive_interval_sec:
    For each active connection:
        If last_activity > keepalive_interval:
            Send SSE comment ": keepalive\n\n"
            Update last_activity

Wire Format

: keepalive

The colon prefix makes it an SSE comment, ignored by clients but keeps connection alive.

Timeout Handling

Connection Timeout

Connections with no activity beyond connection_timeout_sec are closed:

Cleanup Thread:
    Every cleanup_interval:
        For each connection:
            If now - last_activity > connection_timeout:
                Close connection

Graceful Shutdown

On SIGTERM/SIGINT: 1. Stop accepting new connections 2. Send close frames to all clients 3. Wait for pending messages to drain 4. Close all connections 5. Exit

Thread Pool

Worker Threads

Worker Thread Loop:
    While running:
        Wait on work queue (with timeout)
        If work item received:
            Handle connection request
        Else:
            Check for shutdown signal

Work Distribution

Accept Thread                    Worker Pool
     │                               │
     │  ┌─────────────────┐          │
     ├─►│  Work Queue     │◄─────────┤
     │  │ (bounded MPMC)  │          │
     │  └─────────────────┘          │
     │                               │
     │                        ┌──────┴──────┐
     │                        │   Workers   │
     │                        ├─────────────┤
     │                        │ W1 W2 W3 W4 │
     │                        └─────────────┘

Sizing Guidelines

Workload Recommended Threads
I/O bound 2 × CPU cores
CPU bound CPU cores
Mixed 1.5 × CPU cores

Connection Limits

Per-Server Limits

[http]
max_connections = 100000

System Limits

# File descriptors
ulimit -n 1000000

# TCP settings
sysctl net.core.somaxconn=65535
sysctl net.ipv4.tcp_max_syn_backlog=65535

Limit Enforcement

On new connection:
    If active_count >= max_connections:
        Send 503 Service Unavailable
        Close connection
        Increment rejected_count
    Else:
        Accept connection
        Increment active_count

Metrics

Connection Metrics

Metric Description
connections_active Current connection count
connections_total Lifetime connection count
connections_rejected Rejected due to limits
connection_duration_avg Average connection lifetime

Queue Metrics

Metric Description
queue_depth_avg Average messages in queue
queue_depth_max Maximum queue depth
messages_dropped Dropped due to backpressure

Memory Management

Connection Memory

Pre-allocated where possible: - Connection struct: Fixed size - Message queue: Fixed capacity - Metadata buffers: Bounded

Cleanup

On connection close:
    1. Drain message queue
    2. Free metadata strings
    3. Remove from pool
    4. Close socket FD
    5. Update counters

Error Handling

Connection Errors

Error Action
Read timeout Close connection
Write error Close connection
Protocol error Send error, close
Auth failure Send 401, close

Recovery

Clients should implement reconnection:

On disconnect:
    Wait backoff period
    Reconnect with Last-Event-ID
    Resume from last known position

Next Steps