Add cs.money worker stack with per-worker IPRoyal residential proxy
Brings up the pull-model scraper: the .NET C2 hands skin+wear jobs to Python nodriver workers that scrape cs.money and post results back, plus the supporting Core/EFCore data model, migrations, and docker-compose orchestration. IPRoyal proxying lets workers scale horizontally with a distinct residential exit IP each: every worker process mints its own sticky session at startup, and an in-process forwarding proxy injects the gateway auth so Chromium talks only to an auth-free localhost endpoint (zero CDP). On a Cloudflare challenge a worker rotates to a fresh session/IP and re-warms. Verified end-to-end against live IPRoyal: distinct US residential exits per worker and IP rotation on demand. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
453
worker/worker.py
Normal file
453
worker/worker.py
Normal file
@@ -0,0 +1,453 @@
|
||||
"""
|
||||
cs.money scrape worker (pull model).
|
||||
|
||||
Holds ONE warm nodriver session (the thing that beats Cloudflare), then loops:
|
||||
poll the .NET C2 for a job, scrape that skin+wear's sell-orders via in-page fetch
|
||||
from the cleared session, and post the results back. The C2 owns job selection
|
||||
(stalest skin+wear first) and persistence; this worker just fetches and forwards.
|
||||
|
||||
cd worker
|
||||
.venv\\Scripts\\Activate.ps1
|
||||
pip install -r requirements.txt
|
||||
python worker.py
|
||||
|
||||
Env knobs:
|
||||
C2_URL C2 base URL (default http://localhost:5080)
|
||||
WORKER_TOKEN shared secret, must match the C2's WorkerToken (default dev-worker-token)
|
||||
MARKET_URL market page to warm the session on (default the buy market)
|
||||
SOLVE_SECONDS seconds to clear Cloudflare on startup (default 30)
|
||||
DELAY / JITTER base + random seconds between page fetches (default 2.0 / 1.5)
|
||||
IDLE_SECONDS sleep when the C2 has no work (default 10)
|
||||
BROWSER_PATH path to Chrome/Edge if auto-detect fails
|
||||
|
||||
Proxy (pick one; IPRoyal takes priority when its creds are set):
|
||||
IPROYAL_USERNAME IPRoyal residential account username
|
||||
IPROYAL_PASSWORD IPRoyal residential account password
|
||||
IPROYAL_COUNTRY ISO country for the exit (default us; blank = any)
|
||||
IPROYAL_LIFETIME_MIN sticky-IP hold in minutes (default 60)
|
||||
PROXY host:port for an auth-free proxy (fallback; omit to use your own IP)
|
||||
|
||||
Each worker process mints its own random IPRoyal sticky session at startup, so N
|
||||
workers get N distinct residential exit IPs with no coordination — scale with
|
||||
`docker compose up --scale worker=N`. On a Cloudflare challenge the worker rotates
|
||||
to a fresh session (new IP) and re-warms. Chromium can't carry proxy credentials on
|
||||
--proxy-server, so we run a tiny in-process forwarder (LocalForwardingProxy below)
|
||||
that injects the IPRoyal auth and chains to the gateway; Chrome talks only to an
|
||||
auth-free 127.0.0.1 endpoint, keeping us at zero CDP (a CDP auth handler is a
|
||||
Cloudflare tell).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import uuid
|
||||
|
||||
import nodriver as uc
|
||||
|
||||
C2_URL = os.environ.get("C2_URL", "http://localhost:5080").rstrip("/")
|
||||
TOKEN = os.environ.get("WORKER_TOKEN", "dev-worker-token")
|
||||
MARKET_URL = os.environ.get("MARKET_URL", "https://cs.money/market/buy/")
|
||||
SOLVE_SECONDS = int(os.environ.get("SOLVE_SECONDS", "30"))
|
||||
DELAY = float(os.environ.get("DELAY", "2.0"))
|
||||
JITTER = float(os.environ.get("JITTER", "1.5"))
|
||||
IDLE_SECONDS = int(os.environ.get("IDLE_SECONDS", "10"))
|
||||
PROXY = os.environ.get("PROXY")
|
||||
BROWSER_PATH = os.environ.get("BROWSER_PATH")
|
||||
|
||||
# IPRoyal residential gateway. One fixed host/port; country, sticky-session id and
|
||||
# lifetime are encoded as underscore params appended to the password (see
|
||||
# _iproyal_password). Mirrors the .NET IpRoyalProxyProvider scheme.
|
||||
IPROYAL_HOST = os.environ.get("IPROYAL_HOST", "geo.iproyal.com")
|
||||
IPROYAL_PORT = int(os.environ.get("IPROYAL_PORT", "12321"))
|
||||
IPROYAL_USERNAME = os.environ.get("IPROYAL_USERNAME")
|
||||
IPROYAL_PASSWORD = os.environ.get("IPROYAL_PASSWORD")
|
||||
IPROYAL_COUNTRY = os.environ.get("IPROYAL_COUNTRY", "us").strip().lower()
|
||||
IPROYAL_LIFETIME_MIN = int(os.environ.get("IPROYAL_LIFETIME_MIN", "60"))
|
||||
# Residential proxy is metered per GB. Cloudflare gates on JS, not images, and the
|
||||
# sell-orders API is pure JSON — so block images by default to slash page-render
|
||||
# bandwidth. Set LOAD_IMAGES=1 to re-enable (e.g. for debugging the visible page).
|
||||
LOAD_IMAGES = os.environ.get("LOAD_IMAGES") == "1"
|
||||
|
||||
# cs.money is an Astro SSR app: the free-text market search filters server-side and
|
||||
# the resulting listings are embedded in the page as a __page-params JSON blob. The
|
||||
# /2.0/market/sell-orders API rejects a `search` param (HTTP 400), so we fetch the
|
||||
# PAGE for a search and read the embedded items — same item shape as the API.
|
||||
#
|
||||
# A page returns at most 60 and offset is ignored, so we paginate with a FORWARD
|
||||
# CURSOR on float: cs.money honors `order=asc&sort=float` + `minFloat`, and float is
|
||||
# full-precision and effectively unique per item. We grab the 60 lowest-float items
|
||||
# at/above `lo`, advance `lo` to the highest float returned, and repeat until a page
|
||||
# is under the cap. (The old minPrice/maxPrice bisection silently truncated cheap
|
||||
# skins: >60 listings can share a sub-$0.02 reference band, which no price window can
|
||||
# split — floats almost never tie, so the cursor always makes progress.)
|
||||
PAGE = ("https://cs.money/market/buy/?search={search}"
|
||||
"&order=asc&sort=float&minFloat={lo:.12f}&maxFloat=1")
|
||||
PAGE_CAP = 60 # items per SSR page
|
||||
PAGE_PARAMS_RE = re.compile(
|
||||
r'<script\b[^>]*id="__page-params"[^>]*>(.*?)</script>', re.S)
|
||||
|
||||
|
||||
# --- IPRoyal residential proxy ----------------------------------------------------
|
||||
|
||||
def _new_session_id() -> str:
|
||||
"""Short, opaque, URL-safe token. IPRoyal pins one residential exit IP per
|
||||
distinct session value, so a fresh id == a fresh IP."""
|
||||
return uuid.uuid4().hex[:10]
|
||||
|
||||
|
||||
def _iproyal_password(session_id: str) -> str:
|
||||
"""Bake the targeting/session knobs onto the account password, IPRoyal-style:
|
||||
"<pass>_country-us_session-<id>_lifetime-60m". Country is optional."""
|
||||
pw = IPROYAL_PASSWORD
|
||||
if IPROYAL_COUNTRY:
|
||||
pw += f"_country-{IPROYAL_COUNTRY}"
|
||||
pw += f"_session-{session_id}_lifetime-{IPROYAL_LIFETIME_MIN}m"
|
||||
return pw
|
||||
|
||||
|
||||
class LocalForwardingProxy:
|
||||
"""In-process HTTP proxy on 127.0.0.1 that chains every connection to the IPRoyal
|
||||
gateway, injecting the Proxy-Authorization header itself. Chromium ignores creds in
|
||||
--proxy-server and the in-browser ways to answer the gateway's 407 (a CDP auth
|
||||
handler, or a disabled MV2 extension) are Cloudflare tells — so we terminate the
|
||||
browser->proxy hop locally and add auth here, leaving Chrome to talk to an auth-free
|
||||
endpoint at zero CDP. HTTPS (all cs.money serves) flows through the CONNECT tunnel,
|
||||
so this proxy only relays ciphertext and never sees plaintext. Ported from the .NET
|
||||
LocalForwardingProxy. The active session token can be swapped live (set_password) to
|
||||
move to a fresh exit IP without restarting the browser. (New tunnels pick up the new
|
||||
IP; any still-open keep-alive tunnel stays on the old one until it closes.)"""
|
||||
|
||||
def __init__(self, host: str, port: int, username: str, password: str):
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._username = username
|
||||
self._password = password
|
||||
self._server: asyncio.AbstractServer | None = None
|
||||
self.endpoint = ""
|
||||
|
||||
def set_password(self, password: str) -> None:
|
||||
self._password = password
|
||||
|
||||
def _auth_header(self) -> str:
|
||||
token = base64.b64encode(f"{self._username}:{self._password}".encode()).decode()
|
||||
return f"Proxy-Authorization: Basic {token}\r\n"
|
||||
|
||||
async def start(self) -> "LocalForwardingProxy":
|
||||
self._server = await asyncio.start_server(self._handle, "127.0.0.1", 0)
|
||||
port = self._server.sockets[0].getsockname()[1]
|
||||
self.endpoint = f"127.0.0.1:{port}"
|
||||
return self
|
||||
|
||||
async def stop(self) -> None:
|
||||
if self._server is not None:
|
||||
self._server.close()
|
||||
try:
|
||||
await self._server.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
async def _read_header(reader: asyncio.StreamReader) -> str | None:
|
||||
"""Read up to the end of the HTTP header block (CRLFCRLF). None on EOF/overflow."""
|
||||
try:
|
||||
data = await reader.readuntil(b"\r\n\r\n")
|
||||
except (asyncio.IncompleteReadError, asyncio.LimitOverrunError):
|
||||
return None
|
||||
return data.decode("latin-1")
|
||||
|
||||
async def _handle(self, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter) -> None:
|
||||
up_writer: asyncio.StreamWriter | None = None
|
||||
try:
|
||||
header = await self._read_header(client_reader)
|
||||
if not header:
|
||||
return
|
||||
parts = header.split("\r\n", 1)[0].split(" ")
|
||||
if len(parts) < 2:
|
||||
return
|
||||
method, target = parts[0], parts[1]
|
||||
|
||||
up_reader, up_writer = await asyncio.open_connection(self._host, self._port)
|
||||
if method.upper() == "CONNECT":
|
||||
# HTTPS: open an authenticated tunnel upstream, then relay raw bytes.
|
||||
up_writer.write(
|
||||
f"CONNECT {target} HTTP/1.1\r\nHost: {target}\r\n{self._auth_header()}\r\n".encode())
|
||||
await up_writer.drain()
|
||||
up_header = await self._read_header(up_reader)
|
||||
status = up_header.split(" ", 2) if up_header else []
|
||||
if len(status) < 2 or status[1] != "200":
|
||||
line = (up_header or "no response").split("\r\n", 1)[0]
|
||||
print(f" proxy: upstream refused CONNECT {target}: {line}")
|
||||
client_writer.write(b"HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n")
|
||||
await client_writer.drain()
|
||||
return
|
||||
client_writer.write(b"HTTP/1.1 200 Connection established\r\n\r\n")
|
||||
await client_writer.drain()
|
||||
else:
|
||||
# Plain HTTP: re-inject the request upstream with auth, then relay.
|
||||
idx = header.index("\r\n") + 2
|
||||
up_writer.write((header[:idx] + self._auth_header() + header[idx:]).encode())
|
||||
await up_writer.drain()
|
||||
|
||||
await self._relay(client_reader, client_writer, up_reader, up_writer)
|
||||
except Exception:
|
||||
pass # one bad tunnel must never take down the listener
|
||||
finally:
|
||||
for w in (client_writer, up_writer):
|
||||
if w is not None:
|
||||
try:
|
||||
w.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
async def _relay(
|
||||
client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter,
|
||||
up_reader: asyncio.StreamReader, up_writer: asyncio.StreamWriter) -> None:
|
||||
async def pipe(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
||||
try:
|
||||
while data := await reader.read(65536):
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
except Exception:
|
||||
pass
|
||||
await asyncio.gather(
|
||||
pipe(client_reader, up_writer),
|
||||
pipe(up_reader, client_writer),
|
||||
)
|
||||
|
||||
|
||||
def looks_like_challenge(body: str) -> bool:
|
||||
s = (body or "").lstrip()
|
||||
return not s or s.startswith("<") or "Just a moment" in body or "challenge-platform" in body
|
||||
|
||||
|
||||
# --- C2 HTTP (stdlib, run off the event loop) -------------------------------------
|
||||
|
||||
def _get_job_sync():
|
||||
req = urllib.request.Request(f"{C2_URL}/jobs/next", headers={"X-Worker-Token": TOKEN})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as r:
|
||||
if r.status == 204:
|
||||
return None
|
||||
return json.loads(r.read() or b"null")
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f" C2 /jobs/next -> HTTP {e.code}")
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f" C2 unreachable: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _post_result_sync(job_id: str, payload: dict):
|
||||
data = json.dumps(payload).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{C2_URL}/jobs/{job_id}/result", data=data, method="POST",
|
||||
headers={"X-Worker-Token": TOKEN, "Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=60) as r:
|
||||
return json.loads(r.read() or b"null")
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f" C2 result -> HTTP {e.code}: {e.read()[:200]!r}")
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f" C2 unreachable posting result: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def get_job():
|
||||
return await asyncio.to_thread(_get_job_sync)
|
||||
|
||||
|
||||
async def post_result(job_id, payload):
|
||||
return await asyncio.to_thread(_post_result_sync, job_id, payload)
|
||||
|
||||
|
||||
# --- scraping ---------------------------------------------------------------------
|
||||
|
||||
async def fetch_json(page, url: str) -> tuple[str, str]:
|
||||
expr = (
|
||||
f"fetch({url!r}, {{credentials:'include', headers:{{'accept':'application/json'}}}})"
|
||||
f".then(async r => JSON.stringify({{status: r.status, body: await r.text()}}))"
|
||||
)
|
||||
raw = await page.evaluate(expr, await_promise=True)
|
||||
if not isinstance(raw, str):
|
||||
return ("-1", "")
|
||||
try:
|
||||
obj = json.loads(raw)
|
||||
return (str(obj.get("status", "-1")), obj.get("body", ""))
|
||||
except json.JSONDecodeError:
|
||||
return ("-1", raw)
|
||||
|
||||
|
||||
async def _click(page, text, timeout=3):
|
||||
try:
|
||||
el = await page.find(text, best_match=True, timeout=timeout)
|
||||
if el:
|
||||
await el.click()
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
async def dismiss_consent(page):
|
||||
"""Privacy-preserving. The banner only offers 'Accept all' / 'Manage cookies';
|
||||
the Reject-all control lives inside the Manage window. So: Manage -> Reject all ->
|
||||
Confirm. (The data path reads SSR __page-params regardless, but this keeps the
|
||||
session honest and unblocks any future interaction.)"""
|
||||
steps = []
|
||||
if await _click(page, "Manage cookies") or await _click(page, "Manage"):
|
||||
await page.sleep(1)
|
||||
if await _click(page, "Reject all"):
|
||||
steps.append("reject-all")
|
||||
for c in ("Confirm my choice", "Confirm", "Save"):
|
||||
if await _click(page, c):
|
||||
steps.append(f"confirm:{c}")
|
||||
break
|
||||
return ", ".join(steps) if steps else None
|
||||
|
||||
|
||||
async def warm(page):
|
||||
"""Open the market and clear Cloudflare so the session holds cf_clearance."""
|
||||
print(f"Warming session at {MARKET_URL} (clear Cloudflare; {SOLVE_SECONDS}s)...")
|
||||
await page.get(MARKET_URL)
|
||||
await page.sleep(SOLVE_SECONDS)
|
||||
clicked = await dismiss_consent(page)
|
||||
print(f"Consent: {'dismissed via ' + clicked if clicked else 'left up'}")
|
||||
|
||||
|
||||
def extract_items(html: str) -> list:
|
||||
"""Pull inventory.items out of the page's __page-params JSON blob."""
|
||||
m = PAGE_PARAMS_RE.search(html)
|
||||
if not m:
|
||||
return []
|
||||
try:
|
||||
return json.loads(m.group(1)).get("inventory", {}).get("items", []) or []
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
|
||||
|
||||
async def scrape_job(page, job) -> tuple[list, int, str]:
|
||||
"""Scrape ALL listings for one skin+wear via a forward float cursor.
|
||||
|
||||
A search page returns at most 60 items and ignores offset, but cs.money sorts by
|
||||
float (order=asc&sort=float) and filters by minFloat. So we walk the float axis:
|
||||
grab the 60 lowest-float items at/above `lo`, advance `lo` to the highest float on
|
||||
the page, and repeat until a page is under the cap. The boundary item is re-fetched
|
||||
(minFloat is inclusive) and dropped by the id dedup. Returns (items, fetches, reason).
|
||||
"""
|
||||
search = urllib.parse.quote_plus(job["search"])
|
||||
max_fetches = job.get("maxPages", 40) # safety cap on page fetches per job
|
||||
seen: dict = {}
|
||||
fetches = 0
|
||||
lo = 0.0
|
||||
reason = "completed"
|
||||
|
||||
while fetches < max_fetches:
|
||||
status, body = await fetch_json(page, PAGE.format(search=search, lo=lo))
|
||||
fetches += 1
|
||||
|
||||
if "Just a moment" in body or "challenge-platform" in body:
|
||||
return list(seen.values()), fetches, "challenged"
|
||||
|
||||
items = extract_items(body)
|
||||
floats = []
|
||||
for it in items:
|
||||
if it.get("id") is not None:
|
||||
seen[it["id"]] = it
|
||||
fl = (it.get("asset") or {}).get("float")
|
||||
if isinstance(fl, (int, float)):
|
||||
floats.append(fl)
|
||||
|
||||
if len(items) < PAGE_CAP:
|
||||
break # last page — fewer than the cap means we've seen everything
|
||||
|
||||
# Advance the cursor past the highest float on this page. Items at exactly that
|
||||
# float are re-fetched next round (minFloat is inclusive) and deduped by id.
|
||||
nxt = max(floats) if floats else None
|
||||
if nxt is None or nxt <= lo:
|
||||
# Cursor can't advance: >60 listings share a single float value, or the
|
||||
# items carry no float. Bail loudly rather than spin — a flagged gap beats
|
||||
# a silent one (this is the failure the price-window version hid).
|
||||
reason = "stuck-float-tie"
|
||||
break
|
||||
lo = nxt
|
||||
|
||||
await page.sleep(DELAY + random.uniform(0, JITTER))
|
||||
else:
|
||||
reason = "fetch-cap"
|
||||
|
||||
return list(seen.values()), fetches, reason
|
||||
|
||||
|
||||
async def main():
|
||||
# IPRoyal (auth'd, per-worker sticky IP) takes priority; else a plain auth-free
|
||||
# PROXY; else this host's own IP. The forwarder injects IPRoyal auth so Chrome
|
||||
# only ever sees an auth-free 127.0.0.1 endpoint.
|
||||
forwarder = None
|
||||
session_id = None
|
||||
if IPROYAL_USERNAME and IPROYAL_PASSWORD:
|
||||
session_id = _new_session_id()
|
||||
forwarder = await LocalForwardingProxy(
|
||||
IPROYAL_HOST, IPROYAL_PORT, IPROYAL_USERNAME, _iproyal_password(session_id)).start()
|
||||
proxy = forwarder.endpoint
|
||||
proxy_label = f"iproyal[{IPROYAL_COUNTRY or 'any'}] session {session_id} via {forwarder.endpoint}"
|
||||
else:
|
||||
proxy = PROXY
|
||||
proxy_label = PROXY or "own IP"
|
||||
|
||||
args = [f"--proxy-server={proxy}"] if proxy else []
|
||||
if not LOAD_IMAGES:
|
||||
# Disable image loading at the engine level — the dominant bandwidth cost on
|
||||
# an image-heavy market, and unneeded for CF clearance or the JSON API.
|
||||
args.append("--blink-settings=imagesEnabled=false")
|
||||
if os.environ.get("CHROME_NO_SANDBOX") == "1":
|
||||
# Required when running Chromium as root in a container.
|
||||
args += ["--no-sandbox", "--disable-dev-shm-usage"]
|
||||
print(f"Starting worker (C2={C2_URL}, proxy={proxy_label}, images={'on' if LOAD_IMAGES else 'off'})...")
|
||||
browser = await uc.start(headless=False, browser_executable_path=BROWSER_PATH, browser_args=args)
|
||||
try:
|
||||
page = await browser.get("about:blank")
|
||||
await warm(page)
|
||||
|
||||
while True:
|
||||
job = await get_job()
|
||||
if not job:
|
||||
await asyncio.sleep(IDLE_SECONDS)
|
||||
continue
|
||||
|
||||
print(f"Job {job['jobId'][:8]} — search {job['search']!r}")
|
||||
items, pages, reason = await scrape_job(page, job)
|
||||
|
||||
if reason == "challenged":
|
||||
# The exit IP is likely flagged. On IPRoyal, rotate to a fresh sticky
|
||||
# session (new IP) before re-warming; otherwise just re-solve in place.
|
||||
if forwarder is not None:
|
||||
session_id = _new_session_id()
|
||||
forwarder.set_password(_iproyal_password(session_id))
|
||||
print(f" challenged; rotating exit IP -> session {session_id}, re-warming...")
|
||||
else:
|
||||
print(" re-challenged; re-warming session...")
|
||||
await warm(page)
|
||||
|
||||
result = await post_result(job["jobId"], {
|
||||
"items": items, "pages": pages, "stoppedReason": reason})
|
||||
summary = (f"matched {result.get('matched')}, new {result.get('inserted')}, "
|
||||
f"upd {result.get('updated')}, removed {result.get('removed')}") if result else "post failed"
|
||||
print(f" scraped {len(items)} items ({pages}p, {reason}) -> {summary}")
|
||||
|
||||
await page.sleep(DELAY + random.uniform(0, JITTER))
|
||||
finally:
|
||||
browser.stop()
|
||||
if forwarder is not None:
|
||||
await forwarder.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uc.loop().run_until_complete(main())
|
||||
Reference in New Issue
Block a user