"""The shared worker runtime — everything that's identical across market workers. `Worker` is a template-method base: it owns the proxy/browser bring-up, the poll -> scrape -> post loop, Cloudflare-driven IP rotation, result logging, and graceful shutdown. A market worker subclasses it and fills in only what differs — how to dismiss the consent banner, how to scrape one job, and how to describe a job in the log. The two ~300-line workers used to copy this whole loop verbatim. """ import asyncio import json import logging import random import signal from abc import ABC, abstractmethod from dataclasses import dataclass import nodriver as uc from .c2 import C2Client from .config import Settings from .proxy import LocalForwardingProxy, iproyal_password, new_session_id @dataclass class ScrapeResult: """What a single job scrape yields. `wire_bytes` is the metered (compressed) cost.""" items: list pages: int reason: str wire_bytes: int = 0 def looks_like_challenge(body: str) -> bool: """True for an actual Cloudflare interstitial (or an empty body). Keyed on CF markers, NOT a leading '<' — a real market page IS html, so a startswith('<') check would flag every good page fetch as a challenge.""" b = body or "" return not b.strip() or "Just a moment" in b or "challenge-platform" in b async def page_fetch(page, url: str, accept: str = "application/json") -> tuple[int, str, int]: """Fetch in-page from the warm (Cloudflare-cleared) session and read back the Resource Timing transferSize — the actual compressed bytes the metered proxy bills (or -1 when cross-origin timing isn't exposed). Returns (status, body, wire_bytes). Use accept='text/html' for an SSR page payload, the default JSON for an API.""" expr = ( f"fetch({url!r}, {{credentials:'include', headers:{{'accept': {accept!r}}}}})" f".then(async r => {{" f" const body = await r.text();" f" const e = performance.getEntriesByName({url!r}).slice(-1)[0];" f" return JSON.stringify({{status: r.status, body: body, wire: e ? e.transferSize : -1}});" f"}}).catch(e => JSON.stringify({{status: -1, body: String(e), wire: -1}}))" ) raw = await page.evaluate(expr, await_promise=True) if not isinstance(raw, str): return (-1, "", -1) try: obj = json.loads(raw) return (int(obj.get("status", -1)), obj.get("body", ""), int(obj.get("wire", -1))) except (json.JSONDecodeError, ValueError, TypeError): return (-1, raw, -1) async def click(page, text: str, timeout: int = 3) -> bool: """Best-match click on visible text; swallow the not-found/timeout case.""" try: el = await page.find(text, best_match=True, timeout=timeout) if el: await el.click() return True except Exception: pass return False class Worker(ABC): # Per-market constants, set by the subclass. name: str = "worker" jobs_path: str = "/jobs" default_market_url: str = "" def __init__(self, settings: Settings): self.settings = settings self.market_url = settings.market_url or self.default_market_url self.c2 = C2Client(settings.c2_url, settings.token, self.jobs_path) self.log = logging.getLogger(self.name) self._forwarder: LocalForwardingProxy | None = None self._session_id: str | None = None self._stop = asyncio.Event() # --- hooks a market worker overrides ------------------------------------------ @abstractmethod async def scrape_job(self, page, job) -> ScrapeResult: """Scrape ALL listings for one job and return them.""" @abstractmethod def describe_job(self, job) -> str: """One-line job description for the log (e.g. the search term or slug).""" async def dismiss_consent(self, page) -> str | None: """Dismiss the cookie banner privacy-first; return a note, or None if absent. Default: nothing to do. Markets with a banner override this.""" return None # --- shared machinery --------------------------------------------------------- def _iproyal_password(self, session_id: str) -> str: s = self.settings return iproyal_password(s.iproyal_password, s.iproyal_country, s.iproyal_lifetime_min, session_id) async def _pace(self, page) -> None: await page.sleep(self.settings.delay + random.uniform(0, self.settings.jitter)) async def warm(self, page) -> None: """Open the market and clear Cloudflare so the session holds cf_clearance.""" s = self.settings self.log.info("warming session at %s (clear Cloudflare; %ds)", self.market_url, s.solve_seconds) await page.get(self.market_url) await page.sleep(s.solve_seconds) note = await self.dismiss_consent(page) self.log.info("consent: %s", note or "left up") async def _setup_proxy(self) -> tuple[str | None, str]: """IPRoyal (auth'd, per-worker sticky IP) takes priority; else a plain auth-free PROXY; else this host's own IP. Returns (proxy_endpoint, human_label).""" s = self.settings if s.use_iproyal: self._session_id = new_session_id() self._forwarder = await LocalForwardingProxy( s.iproyal_host, s.iproyal_port, s.iproyal_username, self._iproyal_password(self._session_id)).start() label = f"iproyal[{s.iproyal_country or 'any'}] session {self._session_id} via {self._forwarder.endpoint}" return self._forwarder.endpoint, label return s.proxy, (s.proxy or "own IP") def _browser_args(self, proxy: str | None) -> list[str]: s = self.settings args = [f"--proxy-server={proxy}"] if proxy else [] if not s.load_images: # Disable image loading at the engine level — the dominant bandwidth cost on # an image-heavy market, and unneeded for CF clearance or the JSON API. args.append("--blink-settings=imagesEnabled=false") if s.chrome_no_sandbox: # Required when running Chromium as root in a container. args += ["--no-sandbox", "--disable-dev-shm-usage"] return args async def _on_challenge(self, page) -> None: """The exit IP is likely flagged. On IPRoyal, rotate to a fresh sticky session (new IP) before re-warming; otherwise just re-solve in place.""" if self._forwarder is not None: self._session_id = new_session_id() self._forwarder.set_password(self._iproyal_password(self._session_id)) self.log.warning("challenged; rotating exit IP -> session %s, re-warming", self._session_id) else: self.log.warning("challenged; re-warming session") await self.warm(page) def _log_result(self, res: ScrapeResult, posted: dict | None, total_wire: int) -> None: if posted: summary = (f"matched {posted.get('matched')}, new {posted.get('inserted')}, " f"upd {posted.get('updated')}, removed {posted.get('removed')}") else: summary = "post failed" self.log.info("scraped %d items (%dp, %s, %.0fKB wire) -> %s [lifetime %.1fMB]", len(res.items), res.pages, res.reason, res.wire_bytes / 1024, summary, total_wire / 1_048_576) def _install_signal_handlers(self) -> None: """Stop the loop on SIGINT/SIGTERM so `docker stop` shuts down cleanly. Not supported on Windows (ProactorEventLoop) — there Ctrl-C still raises KeyboardInterrupt, which the run loop's finally handles just as well.""" try: loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, self._stop.set) except (NotImplementedError, AttributeError): pass async def _idle(self) -> None: """Sleep when the C2 has no work, but wake immediately on shutdown.""" try: await asyncio.wait_for(self._stop.wait(), timeout=self.settings.idle_seconds) except asyncio.TimeoutError: pass async def run(self) -> None: self._install_signal_handlers() s = self.settings proxy, proxy_label = await self._setup_proxy() self.log.info("starting (C2=%s, proxy=%s, images=%s)", s.c2_url, proxy_label, "on" if s.load_images else "off") browser = await uc.start( headless=False, browser_executable_path=s.browser_path, browser_args=self._browser_args(proxy)) try: page = await browser.get("about:blank") await self.warm(page) total_wire = 0 # metered (compressed) bytes pulled, lifetime while not self._stop.is_set(): job = await self.c2.get_job() if not job: await self._idle() continue self.log.info("job %s — %s", job["jobId"][:8], self.describe_job(job)) res = await self.scrape_job(page, job) total_wire += res.wire_bytes if res.reason == "challenged": await self._on_challenge(page) posted = await self.c2.post_result(job["jobId"], { "items": res.items, "pages": res.pages, "stoppedReason": res.reason}) self._log_result(res, posted, total_wire) await self._pace(page) finally: self.log.info("shutting down") browser.stop() if self._forwarder is not None: await self._forwarder.stop() def run(worker_cls: type[Worker]) -> None: """Boot a worker from the environment: parse config, set up logging, run the loop on nodriver's event loop. The thin market scripts call this and nothing else.""" from . import log as log_setup settings = Settings.from_env() log_setup.configure(settings.log_level, settings.log_json) uc.loop().run_until_complete(worker_cls(settings).run())