From 8b0eb0db78ea12bca858762e1ab5970fbd3a83e5 Mon Sep 17 00:00:00 2001 From: bob Date: Sun, 31 May 2026 15:27:37 -0500 Subject: [PATCH] Cut metered-proxy bandwidth: re-sweep floor + wire-size logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit JobQueue now skips bands swept within MinResweepHours (config, default 6h) instead of re-scraping the whole catalogue continuously — the dominant cost on the metered residential proxy. Roughly linear savings with no data loss (full pagination retained); 0 disables it. Worker logs the real compressed transferSize per job (what the proxy bills) rather than the ~6.5x-larger decompressed length, so spend is visible. Co-Authored-By: Claude Opus 4.8 --- BlueLaminate/BlueLaminate.C2/JobQueue.cs | 25 ++++++++++- BlueLaminate/BlueLaminate.C2/Program.cs | 7 ++- BlueLaminate/BlueLaminate.C2/appsettings.json | 3 +- docker-compose.yml | 3 ++ worker/worker.py | 43 +++++++++++++------ 5 files changed, 65 insertions(+), 16 deletions(-) diff --git a/BlueLaminate/BlueLaminate.C2/JobQueue.cs b/BlueLaminate/BlueLaminate.C2/JobQueue.cs index bd8a358..7406662 100644 --- a/BlueLaminate/BlueLaminate.C2/JobQueue.cs +++ b/BlueLaminate/BlueLaminate.C2/JobQueue.cs @@ -12,6 +12,14 @@ namespace BlueLaminate.C2; /// in memory so two workers can't get the same one, and builds a free-text search. On /// completion the ingest stamps ListingsSweptAt, so the band drops to the back — /// the sweep loops the whole catalogue continuously and resumes cleanly after restarts. +/// +/// A floor keeps a band from being re-handed-out until +/// its data is at least that stale. Without it the queue re-scrapes the whole catalogue +/// as fast as the workers run, which on a metered residential proxy is the dominant cost; +/// the floor trades a little price-freshness for a roughly linear bandwidth cut (a 6h +/// floor vs. continuous ≈ 6× less, if a full pass takes ~1h). When every band is fresher +/// than the floor the queue hands out nothing (workers idle) until one ages past it. +/// /// public sealed class JobQueue { @@ -20,10 +28,20 @@ public sealed class JobQueue private static readonly TimeSpan LeaseTtl = TimeSpan.FromMinutes(15); private const int CandidateBatch = 100; + private readonly TimeSpan _minResweepInterval; private readonly SemaphoreSlim _gate = new(1, 1); private readonly ConcurrentDictionary _leases = new(); // conditionId -> leasedAt private readonly ConcurrentDictionary _inFlight = new(); // jobId -> mapping + /// + /// How stale a band's ListingsSweptAt must be before it's eligible again. + /// disables the floor (continuous re-sweep). + /// + public JobQueue(TimeSpan minResweepInterval) + { + _minResweepInterval = minResweepInterval; + } + public async Task ClaimNextAsync(SkinTrackerDbContext db, int maxPages, CancellationToken ct) { await _gate.WaitAsync(ct); @@ -39,8 +57,13 @@ public sealed class JobQueue } } - // Stalest bands first (never-swept null sorts before any timestamp). + // Only consider bands that are never-swept or stale past the re-sweep floor, + // then stalest first (never-swept null sorts before any timestamp). With the + // floor in place a fully-fresh catalogue yields no candidates, so workers idle + // instead of needlessly re-pulling ~1MB pages on the metered proxy. + var freshCutoff = DateTimeOffset.UtcNow - _minResweepInterval; var candidates = await db.SkinConditions + .Where(c => c.ListingsSweptAt == null || c.ListingsSweptAt <= freshCutoff) .OrderBy(c => c.ListingsSweptAt.HasValue) .ThenBy(c => c.ListingsSweptAt) .Select(c => new Candidate( diff --git a/BlueLaminate/BlueLaminate.C2/Program.cs b/BlueLaminate/BlueLaminate.C2/Program.cs index 0e67b29..4eb4772 100644 --- a/BlueLaminate/BlueLaminate.C2/Program.cs +++ b/BlueLaminate/BlueLaminate.C2/Program.cs @@ -14,7 +14,12 @@ var builder = WebApplication.CreateBuilder(new WebApplicationOptions ContentRootPath = AppContext.BaseDirectory, }); builder.Services.AddBlueLaminateCore(builder.Configuration); -builder.Services.AddSingleton(); + +// Re-sweep floor: don't re-hand-out a band whose listings were swept less than this +// many hours ago. The dominant cost on the metered residential proxy is re-scraping +// already-fresh bands, so this caps how often any band is re-pulled. 0 = continuous. +var minResweepHours = builder.Configuration.GetValue("MinResweepHours", 6.0); +builder.Services.AddSingleton(new JobQueue(TimeSpan.FromHours(minResweepHours))); var app = builder.Build(); diff --git a/BlueLaminate/BlueLaminate.C2/appsettings.json b/BlueLaminate/BlueLaminate.C2/appsettings.json index ab0d67e..6038867 100644 --- a/BlueLaminate/BlueLaminate.C2/appsettings.json +++ b/BlueLaminate/BlueLaminate.C2/appsettings.json @@ -12,5 +12,6 @@ "SkinTracker": "Host=localhost;Port=5432;Database=skintracker;Username=postgres" }, "WorkerToken": "dev-worker-token", - "MaxPagesPerJob": 60 + "MaxPagesPerJob": 60, + "MinResweepHours": 6 } diff --git a/docker-compose.yml b/docker-compose.yml index 1b1c07e..84ac174 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,9 @@ services: ConnectionStrings__SkinTracker: ${SKINTRACKER_CONN:-Host=host.docker.internal;Port=5432;Database=skintracker;Username=postgres} WorkerToken: ${WORKER_TOKEN:-dev-worker-token} MaxPagesPerJob: ${MAX_PAGES_PER_JOB:-60} + # Re-sweep floor (hours): skip bands swept more recently than this. The big lever + # for metered-proxy bandwidth — fewer redundant re-pulls. 0 = continuous re-sweep. + MinResweepHours: ${MIN_RESWEEP_HOURS:-6} ports: - "5080:5080" extra_hosts: diff --git a/worker/worker.py b/worker/worker.py index d6e6a5a..8a46441 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -283,19 +283,28 @@ async def post_result(job_id, payload): # --- scraping --------------------------------------------------------------------- -async def fetch_json(page, url: str) -> tuple[str, str]: +async def fetch_json(page, url: str) -> tuple[str, str, int]: + """Fetch in-page and also read back the Resource Timing transferSize — the actual + COMPRESSED bytes on the wire (what the metered proxy bills), not len(body) which is + the decompressed size. Returns (status, body, wire_bytes); wire_bytes is -1 if the + timing entry wasn't available. Same-origin (cs.money), so the size fields are exposed.""" expr = ( f"fetch({url!r}, {{credentials:'include', headers:{{'accept':'application/json'}}}})" - f".then(async r => JSON.stringify({{status: r.status, body: await r.text()}}))" + f".then(async r => {{" + f" const body = await r.text();" + f" const e = performance.getEntriesByName({url!r}).slice(-1)[0];" + f" return JSON.stringify({{status: r.status, body: body," + f" wire: e ? e.transferSize : -1, dec: e ? e.decodedBodySize : -1}});" + f"}})" ) raw = await page.evaluate(expr, await_promise=True) if not isinstance(raw, str): - return ("-1", "") + return ("-1", "", -1) try: obj = json.loads(raw) - return (str(obj.get("status", "-1")), obj.get("body", "")) - except json.JSONDecodeError: - return ("-1", raw) + return (str(obj.get("status", "-1")), obj.get("body", ""), int(obj.get("wire", -1))) + except (json.JSONDecodeError, ValueError, TypeError): + return ("-1", raw, -1) async def _click(page, text, timeout=3): @@ -346,28 +355,32 @@ def extract_items(html: str) -> list: return [] -async def scrape_job(page, job) -> tuple[list, int, str]: +async def scrape_job(page, job) -> tuple[list, int, str, int]: """Scrape ALL listings for one skin+wear via a forward float cursor. A search page returns at most 60 items and ignores offset, but cs.money sorts by float (order=asc&sort=float) and filters by minFloat. So we walk the float axis: grab the 60 lowest-float items at/above `lo`, advance `lo` to the highest float on the page, and repeat until a page is under the cap. The boundary item is re-fetched - (minFloat is inclusive) and dropped by the id dedup. Returns (items, fetches, reason). + (minFloat is inclusive) and dropped by the id dedup. Returns + (items, fetches, reason, wire_bytes) where wire_bytes is the metered (compressed) cost. """ search = urllib.parse.quote_plus(job["search"]) max_fetches = job.get("maxPages", 40) # safety cap on page fetches per job seen: dict = {} fetches = 0 + wire = 0 lo = 0.0 reason = "completed" while fetches < max_fetches: - status, body = await fetch_json(page, PAGE.format(search=search, lo=lo)) + status, body, wbytes = await fetch_json(page, PAGE.format(search=search, lo=lo)) fetches += 1 + if wbytes > 0: + wire += wbytes if "Just a moment" in body or "challenge-platform" in body: - return list(seen.values()), fetches, "challenged" + return list(seen.values()), fetches, "challenged", wire items = extract_items(body) floats = [] @@ -396,7 +409,7 @@ async def scrape_job(page, job) -> tuple[list, int, str]: else: reason = "fetch-cap" - return list(seen.values()), fetches, reason + return list(seen.values()), fetches, reason, wire async def main(): @@ -429,6 +442,7 @@ async def main(): page = await browser.get("about:blank") await warm(page) + total_wire = 0 # metered (compressed) bytes this worker has pulled, lifetime while True: job = await get_job() if not job: @@ -436,7 +450,8 @@ async def main(): continue print(f"Job {job['jobId'][:8]} — search {job['search']!r}") - items, pages, reason = await scrape_job(page, job) + items, pages, reason, wire = await scrape_job(page, job) + total_wire += wire if reason == "challenged": # The exit IP is likely flagged. On IPRoyal, rotate to a fresh sticky @@ -453,7 +468,9 @@ async def main(): "items": items, "pages": pages, "stoppedReason": reason}) summary = (f"matched {result.get('matched')}, new {result.get('inserted')}, " f"upd {result.get('updated')}, removed {result.get('removed')}") if result else "post failed" - print(f" scraped {len(items)} items ({pages}p, {reason}) -> {summary}") + wire_kb = wire / 1024 + print(f" scraped {len(items)} items ({pages}p, {reason}, {wire_kb:.0f}KB wire) " + f"-> {summary} [lifetime {total_wire / 1_048_576:.1f}MB]") await page.sleep(DELAY + random.uniform(0, JITTER)) finally: