Files
Operation-Blue-Laminate-v2/worker/worker.py
bob 8b0eb0db78 Cut metered-proxy bandwidth: re-sweep floor + wire-size logging
JobQueue now skips bands swept within MinResweepHours (config, default 6h) instead of re-scraping the whole catalogue continuously — the dominant cost on the metered residential proxy. Roughly linear savings with no data loss (full pagination retained); 0 disables it. Worker logs the real compressed transferSize per job (what the proxy bills) rather than the ~6.5x-larger decompressed length, so spend is visible.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 15:27:37 -05:00

484 lines
21 KiB
Python

"""
cs.money scrape worker (pull model).
Holds ONE warm nodriver session (the thing that beats Cloudflare), then loops:
poll the .NET C2 for a job, scrape that skin+wear's sell-orders via in-page fetch
from the cleared session, and post the results back. The C2 owns job selection
(stalest skin+wear first) and persistence; this worker just fetches and forwards.
cd worker
.venv\\Scripts\\Activate.ps1
pip install -r requirements.txt
python worker.py
Env knobs:
C2_URL C2 base URL (default http://localhost:5080)
WORKER_TOKEN shared secret, must match the C2's WorkerToken (default dev-worker-token)
MARKET_URL market page to warm the session on (default the buy market)
SOLVE_SECONDS seconds to clear Cloudflare on startup (default 30)
DELAY / JITTER base + random seconds between page fetches (default 2.0 / 1.5)
IDLE_SECONDS sleep when the C2 has no work (default 10)
BROWSER_PATH path to Chrome/Edge if auto-detect fails
Proxy (pick one; IPRoyal takes priority when its creds are set):
IPROYAL_USERNAME IPRoyal residential account username
IPROYAL_PASSWORD IPRoyal residential account password
IPROYAL_COUNTRY ISO country for the exit (default us; blank = any)
IPROYAL_LIFETIME_MIN sticky-IP hold in minutes (default 60)
PROXY host:port for an auth-free proxy (fallback; omit to use your own IP)
Each worker process mints its own random IPRoyal sticky session at startup, so N
workers get N distinct residential exit IPs with no coordination — scale with
`docker compose up --scale worker=N`. On a Cloudflare challenge the worker rotates
to a fresh session (new IP) and re-warms. Chromium can't carry proxy credentials on
--proxy-server, so we run a tiny in-process forwarder (LocalForwardingProxy below)
that injects the IPRoyal auth and chains to the gateway; Chrome talks only to an
auth-free 127.0.0.1 endpoint, keeping us at zero CDP (a CDP auth handler is a
Cloudflare tell).
"""
import asyncio
import base64
import json
import os
import random
import re
import urllib.error
import urllib.parse
import urllib.request
import uuid
import nodriver as uc
C2_URL = os.environ.get("C2_URL", "http://localhost:5080").rstrip("/")
TOKEN = os.environ.get("WORKER_TOKEN", "dev-worker-token")
MARKET_URL = os.environ.get("MARKET_URL", "https://cs.money/market/buy/")
SOLVE_SECONDS = int(os.environ.get("SOLVE_SECONDS", "30"))
DELAY = float(os.environ.get("DELAY", "2.0"))
JITTER = float(os.environ.get("JITTER", "1.5"))
IDLE_SECONDS = int(os.environ.get("IDLE_SECONDS", "10"))
PROXY = os.environ.get("PROXY")
BROWSER_PATH = os.environ.get("BROWSER_PATH")
# IPRoyal residential gateway. One fixed host/port; country, sticky-session id and
# lifetime are encoded as underscore params appended to the password (see
# _iproyal_password). Mirrors the .NET IpRoyalProxyProvider scheme.
IPROYAL_HOST = os.environ.get("IPROYAL_HOST", "geo.iproyal.com")
IPROYAL_PORT = int(os.environ.get("IPROYAL_PORT", "12321"))
IPROYAL_USERNAME = os.environ.get("IPROYAL_USERNAME")
IPROYAL_PASSWORD = os.environ.get("IPROYAL_PASSWORD")
IPROYAL_COUNTRY = os.environ.get("IPROYAL_COUNTRY", "us").strip().lower()
IPROYAL_LIFETIME_MIN = int(os.environ.get("IPROYAL_LIFETIME_MIN", "60"))
# Residential proxy is metered per GB. Cloudflare gates on JS, not images, and the
# sell-orders API is pure JSON — so block images by default to slash page-render
# bandwidth. Set LOAD_IMAGES=1 to re-enable (e.g. for debugging the visible page).
LOAD_IMAGES = os.environ.get("LOAD_IMAGES") == "1"
# cs.money is an Astro SSR app: the free-text market search filters server-side and
# the resulting listings are embedded in the page as a __page-params JSON blob. The
# /2.0/market/sell-orders API rejects a `search` param (HTTP 400), so we fetch the
# PAGE for a search and read the embedded items — same item shape as the API.
#
# A page returns at most 60 and offset is ignored, so we paginate with a FORWARD
# CURSOR on float: cs.money honors `order=asc&sort=float` + `minFloat`, and float is
# full-precision and effectively unique per item. We grab the 60 lowest-float items
# at/above `lo`, advance `lo` to the highest float returned, and repeat until a page
# is under the cap. (The old minPrice/maxPrice bisection silently truncated cheap
# skins: >60 listings can share a sub-$0.02 reference band, which no price window can
# split — floats almost never tie, so the cursor always makes progress.)
PAGE = ("https://cs.money/market/buy/?search={search}"
"&order=asc&sort=float&minFloat={lo:.12f}&maxFloat=1")
PAGE_CAP = 60 # items per SSR page
PAGE_PARAMS_RE = re.compile(
r'<script\b[^>]*id="__page-params"[^>]*>(.*?)</script>', re.S)
# --- IPRoyal residential proxy ----------------------------------------------------
def _new_session_id() -> str:
"""Short, opaque, URL-safe token. IPRoyal pins one residential exit IP per
distinct session value, so a fresh id == a fresh IP."""
return uuid.uuid4().hex[:10]
def _iproyal_password(session_id: str) -> str:
"""Bake the targeting/session knobs onto the account password, IPRoyal-style:
"<pass>_country-us_session-<id>_lifetime-60m". Country is optional."""
pw = IPROYAL_PASSWORD
if IPROYAL_COUNTRY:
pw += f"_country-{IPROYAL_COUNTRY}"
pw += f"_session-{session_id}_lifetime-{IPROYAL_LIFETIME_MIN}m"
return pw
class LocalForwardingProxy:
"""In-process HTTP proxy on 127.0.0.1 that chains every connection to the IPRoyal
gateway, injecting the Proxy-Authorization header itself. Chromium ignores creds in
--proxy-server and the in-browser ways to answer the gateway's 407 (a CDP auth
handler, or a disabled MV2 extension) are Cloudflare tells — so we terminate the
browser->proxy hop locally and add auth here, leaving Chrome to talk to an auth-free
endpoint at zero CDP. HTTPS (all cs.money serves) flows through the CONNECT tunnel,
so this proxy only relays ciphertext and never sees plaintext. Ported from the .NET
LocalForwardingProxy. The active session token can be swapped live (set_password) to
move to a fresh exit IP without restarting the browser. (New tunnels pick up the new
IP; any still-open keep-alive tunnel stays on the old one until it closes.)"""
def __init__(self, host: str, port: int, username: str, password: str):
self._host = host
self._port = port
self._username = username
self._password = password
self._server: asyncio.AbstractServer | None = None
self.endpoint = ""
def set_password(self, password: str) -> None:
self._password = password
def _auth_header(self) -> str:
token = base64.b64encode(f"{self._username}:{self._password}".encode()).decode()
return f"Proxy-Authorization: Basic {token}\r\n"
async def start(self) -> "LocalForwardingProxy":
self._server = await asyncio.start_server(self._handle, "127.0.0.1", 0)
port = self._server.sockets[0].getsockname()[1]
self.endpoint = f"127.0.0.1:{port}"
return self
async def stop(self) -> None:
if self._server is not None:
self._server.close()
try:
await self._server.wait_closed()
except Exception:
pass
@staticmethod
async def _read_header(reader: asyncio.StreamReader) -> str | None:
"""Read up to the end of the HTTP header block (CRLFCRLF). None on EOF/overflow."""
try:
data = await reader.readuntil(b"\r\n\r\n")
except (asyncio.IncompleteReadError, asyncio.LimitOverrunError):
return None
return data.decode("latin-1")
async def _handle(self, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter) -> None:
up_writer: asyncio.StreamWriter | None = None
try:
header = await self._read_header(client_reader)
if not header:
return
parts = header.split("\r\n", 1)[0].split(" ")
if len(parts) < 2:
return
method, target = parts[0], parts[1]
up_reader, up_writer = await asyncio.open_connection(self._host, self._port)
if method.upper() == "CONNECT":
# HTTPS: open an authenticated tunnel upstream, then relay raw bytes.
up_writer.write(
f"CONNECT {target} HTTP/1.1\r\nHost: {target}\r\n{self._auth_header()}\r\n".encode())
await up_writer.drain()
up_header = await self._read_header(up_reader)
status = up_header.split(" ", 2) if up_header else []
if len(status) < 2 or status[1] != "200":
line = (up_header or "no response").split("\r\n", 1)[0]
print(f" proxy: upstream refused CONNECT {target}: {line}")
client_writer.write(b"HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n")
await client_writer.drain()
return
client_writer.write(b"HTTP/1.1 200 Connection established\r\n\r\n")
await client_writer.drain()
else:
# Plain HTTP: re-inject the request upstream with auth, then relay.
idx = header.index("\r\n") + 2
up_writer.write((header[:idx] + self._auth_header() + header[idx:]).encode())
await up_writer.drain()
await self._relay(client_reader, client_writer, up_reader, up_writer)
except Exception:
pass # one bad tunnel must never take down the listener
finally:
for w in (client_writer, up_writer):
if w is not None:
try:
w.close()
except Exception:
pass
@staticmethod
async def _relay(
client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter,
up_reader: asyncio.StreamReader, up_writer: asyncio.StreamWriter) -> None:
# Pipe both directions, but tear the whole tunnel down as soon as EITHER side
# closes (mirrors the .NET WhenAny). Waiting for both — as a plain gather does —
# leaks a task holding two sockets on every half-closed connection, which piles
# up fast across a long multi-worker run. Closing both writers when the first
# pipe finishes unblocks the other's pending read so both tasks settle.
async def pipe(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
try:
while data := await reader.read(65536):
writer.write(data)
await writer.drain()
except Exception:
pass
a = asyncio.create_task(pipe(client_reader, up_writer))
b = asyncio.create_task(pipe(up_reader, client_writer))
try:
await asyncio.wait({a, b}, return_when=asyncio.FIRST_COMPLETED)
finally:
for w in (client_writer, up_writer):
try:
w.close()
except Exception:
pass
await asyncio.gather(a, b, return_exceptions=True)
def looks_like_challenge(body: str) -> bool:
s = (body or "").lstrip()
return not s or s.startswith("<") or "Just a moment" in body or "challenge-platform" in body
# --- C2 HTTP (stdlib, run off the event loop) -------------------------------------
def _get_job_sync():
req = urllib.request.Request(f"{C2_URL}/jobs/next", headers={"X-Worker-Token": TOKEN})
try:
with urllib.request.urlopen(req, timeout=15) as r:
if r.status == 204:
return None
return json.loads(r.read() or b"null")
except urllib.error.HTTPError as e:
print(f" C2 /jobs/next -> HTTP {e.code}")
return None
except urllib.error.URLError as e:
print(f" C2 unreachable: {e}")
return None
def _post_result_sync(job_id: str, payload: dict):
data = json.dumps(payload).encode()
req = urllib.request.Request(
f"{C2_URL}/jobs/{job_id}/result", data=data, method="POST",
headers={"X-Worker-Token": TOKEN, "Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=60) as r:
return json.loads(r.read() or b"null")
except urllib.error.HTTPError as e:
print(f" C2 result -> HTTP {e.code}: {e.read()[:200]!r}")
return None
except urllib.error.URLError as e:
print(f" C2 unreachable posting result: {e}")
return None
async def get_job():
return await asyncio.to_thread(_get_job_sync)
async def post_result(job_id, payload):
return await asyncio.to_thread(_post_result_sync, job_id, payload)
# --- scraping ---------------------------------------------------------------------
async def fetch_json(page, url: str) -> tuple[str, str, int]:
"""Fetch in-page and also read back the Resource Timing transferSize — the actual
COMPRESSED bytes on the wire (what the metered proxy bills), not len(body) which is
the decompressed size. Returns (status, body, wire_bytes); wire_bytes is -1 if the
timing entry wasn't available. Same-origin (cs.money), so the size fields are exposed."""
expr = (
f"fetch({url!r}, {{credentials:'include', headers:{{'accept':'application/json'}}}})"
f".then(async r => {{"
f" const body = await r.text();"
f" const e = performance.getEntriesByName({url!r}).slice(-1)[0];"
f" return JSON.stringify({{status: r.status, body: body,"
f" wire: e ? e.transferSize : -1, dec: e ? e.decodedBodySize : -1}});"
f"}})"
)
raw = await page.evaluate(expr, await_promise=True)
if not isinstance(raw, str):
return ("-1", "", -1)
try:
obj = json.loads(raw)
return (str(obj.get("status", "-1")), obj.get("body", ""), int(obj.get("wire", -1)))
except (json.JSONDecodeError, ValueError, TypeError):
return ("-1", raw, -1)
async def _click(page, text, timeout=3):
try:
el = await page.find(text, best_match=True, timeout=timeout)
if el:
await el.click()
return True
except Exception:
pass
return False
async def dismiss_consent(page):
"""Privacy-preserving. The banner only offers 'Accept all' / 'Manage cookies';
the Reject-all control lives inside the Manage window. So: Manage -> Reject all ->
Confirm. (The data path reads SSR __page-params regardless, but this keeps the
session honest and unblocks any future interaction.)"""
steps = []
if await _click(page, "Manage cookies") or await _click(page, "Manage"):
await page.sleep(1)
if await _click(page, "Reject all"):
steps.append("reject-all")
for c in ("Confirm my choice", "Confirm", "Save"):
if await _click(page, c):
steps.append(f"confirm:{c}")
break
return ", ".join(steps) if steps else None
async def warm(page):
"""Open the market and clear Cloudflare so the session holds cf_clearance."""
print(f"Warming session at {MARKET_URL} (clear Cloudflare; {SOLVE_SECONDS}s)...")
await page.get(MARKET_URL)
await page.sleep(SOLVE_SECONDS)
clicked = await dismiss_consent(page)
print(f"Consent: {'dismissed via ' + clicked if clicked else 'left up'}")
def extract_items(html: str) -> list:
"""Pull inventory.items out of the page's __page-params JSON blob."""
m = PAGE_PARAMS_RE.search(html)
if not m:
return []
try:
return json.loads(m.group(1)).get("inventory", {}).get("items", []) or []
except json.JSONDecodeError:
return []
async def scrape_job(page, job) -> tuple[list, int, str, int]:
"""Scrape ALL listings for one skin+wear via a forward float cursor.
A search page returns at most 60 items and ignores offset, but cs.money sorts by
float (order=asc&sort=float) and filters by minFloat. So we walk the float axis:
grab the 60 lowest-float items at/above `lo`, advance `lo` to the highest float on
the page, and repeat until a page is under the cap. The boundary item is re-fetched
(minFloat is inclusive) and dropped by the id dedup. Returns
(items, fetches, reason, wire_bytes) where wire_bytes is the metered (compressed) cost.
"""
search = urllib.parse.quote_plus(job["search"])
max_fetches = job.get("maxPages", 40) # safety cap on page fetches per job
seen: dict = {}
fetches = 0
wire = 0
lo = 0.0
reason = "completed"
while fetches < max_fetches:
status, body, wbytes = await fetch_json(page, PAGE.format(search=search, lo=lo))
fetches += 1
if wbytes > 0:
wire += wbytes
if "Just a moment" in body or "challenge-platform" in body:
return list(seen.values()), fetches, "challenged", wire
items = extract_items(body)
floats = []
for it in items:
if it.get("id") is not None:
seen[it["id"]] = it
fl = (it.get("asset") or {}).get("float")
if isinstance(fl, (int, float)):
floats.append(fl)
if len(items) < PAGE_CAP:
break # last page — fewer than the cap means we've seen everything
# Advance the cursor past the highest float on this page. Items at exactly that
# float are re-fetched next round (minFloat is inclusive) and deduped by id.
nxt = max(floats) if floats else None
if nxt is None or nxt <= lo:
# Cursor can't advance: >60 listings share a single float value, or the
# items carry no float. Bail loudly rather than spin — a flagged gap beats
# a silent one (this is the failure the price-window version hid).
reason = "stuck-float-tie"
break
lo = nxt
await page.sleep(DELAY + random.uniform(0, JITTER))
else:
reason = "fetch-cap"
return list(seen.values()), fetches, reason, wire
async def main():
# IPRoyal (auth'd, per-worker sticky IP) takes priority; else a plain auth-free
# PROXY; else this host's own IP. The forwarder injects IPRoyal auth so Chrome
# only ever sees an auth-free 127.0.0.1 endpoint.
forwarder = None
session_id = None
if IPROYAL_USERNAME and IPROYAL_PASSWORD:
session_id = _new_session_id()
forwarder = await LocalForwardingProxy(
IPROYAL_HOST, IPROYAL_PORT, IPROYAL_USERNAME, _iproyal_password(session_id)).start()
proxy = forwarder.endpoint
proxy_label = f"iproyal[{IPROYAL_COUNTRY or 'any'}] session {session_id} via {forwarder.endpoint}"
else:
proxy = PROXY
proxy_label = PROXY or "own IP"
args = [f"--proxy-server={proxy}"] if proxy else []
if not LOAD_IMAGES:
# Disable image loading at the engine level — the dominant bandwidth cost on
# an image-heavy market, and unneeded for CF clearance or the JSON API.
args.append("--blink-settings=imagesEnabled=false")
if os.environ.get("CHROME_NO_SANDBOX") == "1":
# Required when running Chromium as root in a container.
args += ["--no-sandbox", "--disable-dev-shm-usage"]
print(f"Starting worker (C2={C2_URL}, proxy={proxy_label}, images={'on' if LOAD_IMAGES else 'off'})...")
browser = await uc.start(headless=False, browser_executable_path=BROWSER_PATH, browser_args=args)
try:
page = await browser.get("about:blank")
await warm(page)
total_wire = 0 # metered (compressed) bytes this worker has pulled, lifetime
while True:
job = await get_job()
if not job:
await asyncio.sleep(IDLE_SECONDS)
continue
print(f"Job {job['jobId'][:8]} — search {job['search']!r}")
items, pages, reason, wire = await scrape_job(page, job)
total_wire += wire
if reason == "challenged":
# The exit IP is likely flagged. On IPRoyal, rotate to a fresh sticky
# session (new IP) before re-warming; otherwise just re-solve in place.
if forwarder is not None:
session_id = _new_session_id()
forwarder.set_password(_iproyal_password(session_id))
print(f" challenged; rotating exit IP -> session {session_id}, re-warming...")
else:
print(" re-challenged; re-warming session...")
await warm(page)
result = await post_result(job["jobId"], {
"items": items, "pages": pages, "stoppedReason": reason})
summary = (f"matched {result.get('matched')}, new {result.get('inserted')}, "
f"upd {result.get('updated')}, removed {result.get('removed')}") if result else "post failed"
wire_kb = wire / 1024
print(f" scraped {len(items)} items ({pages}p, {reason}, {wire_kb:.0f}KB wire) "
f"-> {summary} [lifetime {total_wire / 1_048_576:.1f}MB]")
await page.sleep(DELAY + random.uniform(0, JITTER))
finally:
browser.stop()
if forwarder is not None:
await forwarder.stop()
if __name__ == "__main__":
uc.loop().run_until_complete(main())