Files
2026-06-01 10:52:06 -05:00

155 lines
6.7 KiB
Python

"""IPRoyal residential proxy plumbing.
The in-process forwarder + the password/session helpers — identical across every market
worker, so they live here. HTTPS market traffic flows through the CONNECT tunnel, so the
forwarder only ever relays ciphertext. Ported from the .NET LocalForwardingProxy /
IpRoyalProxyProvider.
"""
import asyncio
import base64
import logging
import uuid
log = logging.getLogger("proxy")
def new_session_id() -> str:
"""Short, opaque, URL-safe token. IPRoyal pins one residential exit IP per distinct
session value, so a fresh id == a fresh IP."""
return uuid.uuid4().hex[:10]
def iproyal_password(password: str, country: str, lifetime_min: int, session_id: str) -> str:
"""Bake the targeting/session knobs onto the account password, IPRoyal-style:
"<pass>_country-us_session-<id>_lifetime-60m". Country is optional."""
pw = password
if country:
pw += f"_country-{country}"
pw += f"_session-{session_id}_lifetime-{lifetime_min}m"
return pw
class LocalForwardingProxy:
"""In-process HTTP proxy on 127.0.0.1 that chains every connection to the IPRoyal
gateway, injecting the Proxy-Authorization header itself. Chromium ignores creds in
--proxy-server and the in-browser ways to answer the gateway's 407 (a CDP auth
handler, or a disabled MV2 extension) are Cloudflare tells — so we terminate the
browser->proxy hop locally and add auth here, leaving Chrome to talk to an auth-free
endpoint at zero CDP. HTTPS (all market traffic) flows through the CONNECT tunnel, so
this proxy only relays ciphertext and never sees plaintext. The active session token
can be swapped live (set_password) to move to a fresh exit IP without restarting the
browser. (New tunnels pick up the new IP; any still-open keep-alive tunnel stays on
the old one until it closes.)"""
def __init__(self, host: str, port: int, username: str, password: str):
self._host = host
self._port = port
self._username = username
self._password = password
self._server: asyncio.AbstractServer | None = None
self.endpoint = ""
def set_password(self, password: str) -> None:
self._password = password
def _auth_header(self) -> str:
token = base64.b64encode(f"{self._username}:{self._password}".encode()).decode()
return f"Proxy-Authorization: Basic {token}\r\n"
async def start(self) -> "LocalForwardingProxy":
self._server = await asyncio.start_server(self._handle, "127.0.0.1", 0)
port = self._server.sockets[0].getsockname()[1]
self.endpoint = f"127.0.0.1:{port}"
return self
async def stop(self) -> None:
if self._server is not None:
self._server.close()
try:
await self._server.wait_closed()
except Exception:
pass
@staticmethod
async def _read_header(reader: asyncio.StreamReader) -> str | None:
"""Read up to the end of the HTTP header block (CRLFCRLF). None on EOF/overflow."""
try:
data = await reader.readuntil(b"\r\n\r\n")
except (asyncio.IncompleteReadError, asyncio.LimitOverrunError):
return None
return data.decode("latin-1")
async def _handle(self, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter) -> None:
up_writer: asyncio.StreamWriter | None = None
try:
header = await self._read_header(client_reader)
if not header:
return
parts = header.split("\r\n", 1)[0].split(" ")
if len(parts) < 2:
return
method, target = parts[0], parts[1]
up_reader, up_writer = await asyncio.open_connection(self._host, self._port)
if method.upper() == "CONNECT":
# HTTPS: open an authenticated tunnel upstream, then relay raw bytes.
up_writer.write(
f"CONNECT {target} HTTP/1.1\r\nHost: {target}\r\n{self._auth_header()}\r\n".encode())
await up_writer.drain()
up_header = await self._read_header(up_reader)
status = up_header.split(" ", 2) if up_header else []
if len(status) < 2 or status[1] != "200":
line = (up_header or "no response").split("\r\n", 1)[0]
log.warning("upstream refused CONNECT %s: %s", target, line)
client_writer.write(b"HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n")
await client_writer.drain()
return
client_writer.write(b"HTTP/1.1 200 Connection established\r\n\r\n")
await client_writer.drain()
else:
# Plain HTTP: re-inject the request upstream with auth, then relay.
idx = header.index("\r\n") + 2
up_writer.write((header[:idx] + self._auth_header() + header[idx:]).encode())
await up_writer.drain()
await self._relay(client_reader, client_writer, up_reader, up_writer)
except Exception:
pass # one bad tunnel must never take down the listener
finally:
for w in (client_writer, up_writer):
if w is not None:
try:
w.close()
except Exception:
pass
@staticmethod
async def _relay(
client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter,
up_reader: asyncio.StreamReader, up_writer: asyncio.StreamWriter) -> None:
# Pipe both directions, but tear the whole tunnel down as soon as EITHER side
# closes (mirrors the .NET WhenAny). Waiting for both — as a plain gather does —
# leaks a task holding two sockets on every half-closed connection, which piles
# up fast across a long multi-worker run. Closing both writers when the first pipe
# finishes unblocks the other's pending read so both tasks settle.
async def pipe(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
try:
while data := await reader.read(65536):
writer.write(data)
await writer.drain()
except Exception:
pass
a = asyncio.create_task(pipe(client_reader, up_writer))
b = asyncio.create_task(pipe(up_reader, client_writer))
try:
await asyncio.wait({a, b}, return_when=asyncio.FIRST_COMPLETED)
finally:
for w in (client_writer, up_writer):
try:
w.close()
except Exception:
pass
await asyncio.gather(a, b, return_exceptions=True)