Connection Management
How RevenProx manages client connections.
Connection Lifecycle
┌─────────────┐
│ Accept │ ← New TCP connection
└──────┬──────┘
│
▼
┌─────────────┐
│ Parse │ ← Extract headers, path
└──────┬──────┘
│
▼
┌─────────────┐
│ Authenticate│ ← Verify JWT token
└──────┬──────┘
│
▼
┌─────────────┐
│ Register │ ← Add to connection pool
└──────┬──────┘
│
▼
┌─────────────┐
│ Subscribe │ ← Register for topic
└──────┬──────┘
│
▼
┌─────────────┐
│ Active │ ← Streaming events
└──────┬──────┘
│ (timeout/disconnect/error)
▼
┌─────────────┐
│ Cleanup │ ← Remove from pool
└─────────────┘
Connection States
| State | Description | Transitions To |
|---|---|---|
connecting |
Initial TCP handshake | active, closed |
active |
Receiving events | idle, closing, closed |
idle |
No recent activity | active, closing |
closing |
Graceful shutdown | closed |
closed |
Connection terminated | - |
Connection Pool
Data Structure
ConnectionPool
├── connections: HashMap<ConnectionId, Connection>
├── topic_subscriptions: HashMap<TopicUUID, Set<ConnectionId>>
├── mutex: Mutex
├── next_id: AtomicU64
└── config: PoolConfig
Operations
Add Connection:
1. Acquire mutex
2. Generate unique ID
3. Insert into connections map
4. Release mutex
5. Return connection ID
Remove Connection:
1. Acquire mutex
2. Remove from connections map
3. Remove from all topic subscriptions
4. Release mutex
5. Close underlying socket
Subscribe to Topic:
1. Acquire mutex
2. Add connection ID to topic's subscriber set
3. Update Bloom filter
4. Release mutex
5. Notify peers (async)
Concurrency
The pool uses a mutex for most operations, but optimizes for the common case:
- Fast path: Read-only operations use read lock
- Slow path: Mutations use write lock
- Atomic counters for statistics
Message Queues
Each connection has a bounded message queue:
RingBuffer
├── items: [Message; CAPACITY]
├── head: usize (next read position)
├── tail: usize (next write position)
└── count: AtomicUsize
Backpressure Handling
When queue is full:
| Policy | Behavior |
|---|---|
drop_newest |
Reject new message |
drop_oldest |
Remove oldest, add new |
block |
Wait for space (not recommended) |
Queue Sizing
Memory per connection = QUEUE_SIZE × AVG_MESSAGE_SIZE
Example:
1000 messages × 200 bytes = 200 KB per connection
100,000 connections = 20 GB
Keepalive Mechanism
Purpose
Keepalives serve to: 1. Detect dead connections 2. Prevent proxy/firewall timeouts 3. Maintain NAT mappings
Implementation
Every keepalive_interval_sec:
For each active connection:
If last_activity > keepalive_interval:
Send SSE comment ": keepalive\n\n"
Update last_activity
Wire Format
The colon prefix makes it an SSE comment, ignored by clients but keeps connection alive.
Timeout Handling
Connection Timeout
Connections with no activity beyond connection_timeout_sec are closed:
Cleanup Thread:
Every cleanup_interval:
For each connection:
If now - last_activity > connection_timeout:
Close connection
Graceful Shutdown
On SIGTERM/SIGINT: 1. Stop accepting new connections 2. Send close frames to all clients 3. Wait for pending messages to drain 4. Close all connections 5. Exit
Thread Pool
Worker Threads
Worker Thread Loop:
While running:
Wait on work queue (with timeout)
If work item received:
Handle connection request
Else:
Check for shutdown signal
Work Distribution
Accept Thread Worker Pool
│ │
│ ┌─────────────────┐ │
├─►│ Work Queue │◄─────────┤
│ │ (bounded MPMC) │ │
│ └─────────────────┘ │
│ │
│ ┌──────┴──────┐
│ │ Workers │
│ ├─────────────┤
│ │ W1 W2 W3 W4 │
│ └─────────────┘
Sizing Guidelines
| Workload | Recommended Threads |
|---|---|
| I/O bound | 2 × CPU cores |
| CPU bound | CPU cores |
| Mixed | 1.5 × CPU cores |
Connection Limits
Per-Server Limits
System Limits
# File descriptors
ulimit -n 1000000
# TCP settings
sysctl net.core.somaxconn=65535
sysctl net.ipv4.tcp_max_syn_backlog=65535
Limit Enforcement
On new connection:
If active_count >= max_connections:
Send 503 Service Unavailable
Close connection
Increment rejected_count
Else:
Accept connection
Increment active_count
Metrics
Connection Metrics
| Metric | Description |
|---|---|
connections_active |
Current connection count |
connections_total |
Lifetime connection count |
connections_rejected |
Rejected due to limits |
connection_duration_avg |
Average connection lifetime |
Queue Metrics
| Metric | Description |
|---|---|
queue_depth_avg |
Average messages in queue |
queue_depth_max |
Maximum queue depth |
messages_dropped |
Dropped due to backpressure |
Memory Management
Connection Memory
Pre-allocated where possible: - Connection struct: Fixed size - Message queue: Fixed capacity - Metadata buffers: Bounded
Cleanup
On connection close:
1. Drain message queue
2. Free metadata strings
3. Remove from pool
4. Close socket FD
5. Update counters
Error Handling
Connection Errors
| Error | Action |
|---|---|
| Read timeout | Close connection |
| Write error | Close connection |
| Protocol error | Send error, close |
| Auth failure | Send 401, close |
Recovery
Clients should implement reconnection: