Fix worker proxy relay leak and enable noVNC under --scale

_relay waited for both pipe directions (gather), leaking a task holding two sockets on every half-closed tunnel — visible as a flood of pending-task lines under load. Tear the tunnel down when either side closes (FIRST_COMPLETED + close both writers), matching the .NET LocalForwardingProxy's WhenAny. Also move the worker's noVNC to an ephemeral host port so replicas don't collide under 'docker compose up --scale worker=N'.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
bob
2026-05-31 15:12:51 -05:00
parent dc7c3f99ae
commit 94177f9a8c
2 changed files with 21 additions and 5 deletions

View File

@@ -45,5 +45,8 @@ services:
depends_on: depends_on:
- c2 - c2
ports: ports:
- "6080:6080" # noVNC: http://localhost:6080/vnc.html # Ephemeral host port so replicas don't collide under --scale. Find a worker's
# noVNC with `docker compose port worker 6080` (or `docker ps`), then open
# http://localhost:<mapped>/vnc.html to watch / solve a challenge.
- "6080"
restart: unless-stopped restart: unless-stopped

View File

@@ -209,6 +209,11 @@ class LocalForwardingProxy:
async def _relay( async def _relay(
client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter,
up_reader: asyncio.StreamReader, up_writer: asyncio.StreamWriter) -> None: up_reader: asyncio.StreamReader, up_writer: asyncio.StreamWriter) -> None:
# Pipe both directions, but tear the whole tunnel down as soon as EITHER side
# closes (mirrors the .NET WhenAny). Waiting for both — as a plain gather does —
# leaks a task holding two sockets on every half-closed connection, which piles
# up fast across a long multi-worker run. Closing both writers when the first
# pipe finishes unblocks the other's pending read so both tasks settle.
async def pipe(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: async def pipe(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
try: try:
while data := await reader.read(65536): while data := await reader.read(65536):
@@ -216,10 +221,18 @@ class LocalForwardingProxy:
await writer.drain() await writer.drain()
except Exception: except Exception:
pass pass
await asyncio.gather(
pipe(client_reader, up_writer), a = asyncio.create_task(pipe(client_reader, up_writer))
pipe(up_reader, client_writer), b = asyncio.create_task(pipe(up_reader, client_writer))
) try:
await asyncio.wait({a, b}, return_when=asyncio.FIRST_COMPLETED)
finally:
for w in (client_writer, up_writer):
try:
w.close()
except Exception:
pass
await asyncio.gather(a, b, return_exceptions=True)
def looks_like_challenge(body: str) -> bool: def looks_like_challenge(body: str) -> bool: