557 lines
22 KiB
C#
557 lines
22 KiB
C#
using BlueLaminate.EFCore.Data;
|
|
using BlueLaminate.EFCore.Entities;
|
|
using BlueLaminate.Scraper.CsFloat;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace BlueLaminate.Cli;
|
|
|
|
/// <param name="Pages">How many API pages were fetched.</param>
|
|
/// <param name="Seen">Total listings returned across those pages.</param>
|
|
/// <param name="Inserted">New listings inserted.</param>
|
|
/// <param name="Updated">Existing listings refreshed (price/last-seen/etc.).</param>
|
|
/// <param name="Removed">Listings flagged Removed (only on a complete pass).</param>
|
|
/// <param name="Linked">Listings resolved to a catalogue skin by def/paint.</param>
|
|
/// <param name="StoppedReason">Why the sweep ended.</param>
|
|
public sealed record ListingSweepResult(
|
|
int Pages,
|
|
int Seen,
|
|
int Inserted,
|
|
int Updated,
|
|
int Removed,
|
|
int Linked,
|
|
string StoppedReason);
|
|
|
|
/// <param name="SkinsCovered">Catalogue skins fully paged this run.</param>
|
|
/// <param name="SkinsSkipped">Skins left untouched (e.g. request budget ran out).</param>
|
|
public sealed record CatalogSweepResult(
|
|
int SkinsCovered,
|
|
int SkinsSkipped,
|
|
int Pages,
|
|
int Seen,
|
|
int Inserted,
|
|
int Updated,
|
|
int Removed,
|
|
string StoppedReason);
|
|
|
|
/// <summary>
|
|
/// Global incremental sweep of CSFloat active listings into the database. Pages
|
|
/// <c>sort_by=most_recent</c> with no item filter, so it captures every listing —
|
|
/// including items not in our catalogue. Each listing is upserted by its stable
|
|
/// CSFloat id; <see cref="Listing.FirstSeenAt"/>/<see cref="Listing.LastSeenAt"/>
|
|
/// bound the observation window.
|
|
///
|
|
/// Two things keep it safe against the 200-request rate limit and partial runs:
|
|
/// <list type="bullet">
|
|
/// <item><b>Pacing.</b> After each page it inspects the client's rate-limit
|
|
/// headers; when remaining is low it sleeps until the reset epoch rather than
|
|
/// risking a 429.</item>
|
|
/// <item><b>Removed-tracking only on a complete pass.</b> Marking unseen listings
|
|
/// as Removed is only valid when the whole market was covered. A capped or
|
|
/// incremental run that stops early must not do it, or it would falsely "sell"
|
|
/// everything it didn't reach.</item>
|
|
/// </list>
|
|
/// </summary>
|
|
public sealed class ListingSweepService
|
|
{
|
|
public const string Source = "listings";
|
|
public const string CatalogSource = "listings-catalog";
|
|
|
|
// Pace before the bucket is fully empty so a slightly-stale counter can't tip
|
|
// us into a 429.
|
|
private const int RateLimitSafetyMargin = 2;
|
|
|
|
private readonly SkinTrackerDbContext _db;
|
|
private readonly CsFloatListingsClient _client;
|
|
private readonly ILogger<ListingSweepService> _logger;
|
|
|
|
public ListingSweepService(
|
|
SkinTrackerDbContext db,
|
|
CsFloatListingsClient client,
|
|
ILogger<ListingSweepService> logger)
|
|
{
|
|
_db = db;
|
|
_client = client;
|
|
_logger = logger;
|
|
}
|
|
|
|
/// <param name="maxRequests">Hard cap on API pages this run (rate-limit budget).</param>
|
|
/// <param name="maxListings">Hard cap on listings ingested this run.</param>
|
|
/// <param name="incremental">
|
|
/// Stop once a whole page is already-known listings (cheap daily delta). When
|
|
/// false, keep paging until the cursor or a cap is exhausted (cold pass).
|
|
/// </param>
|
|
/// <param name="delayBetweenPages">Optional courtesy delay between pages.</param>
|
|
public async Task<ListingSweepResult> SweepAsync(
|
|
int maxRequests = 4,
|
|
int maxListings = 200,
|
|
bool incremental = true,
|
|
TimeSpan? delayBetweenPages = null,
|
|
CancellationToken ct = default)
|
|
{
|
|
var now = DateTimeOffset.UtcNow;
|
|
var pages = 0;
|
|
var seen = 0;
|
|
var inserted = 0;
|
|
var updated = 0;
|
|
var linked = 0;
|
|
string? cursor = null;
|
|
string stoppedReason = "cursor exhausted";
|
|
var completePass = true;
|
|
|
|
// Catalogue lookup for best-effort skin linking, built once per run.
|
|
var skinByIndex = await _db.Skins
|
|
.Where(s => s.DefIndex != null && s.PaintIndex != null)
|
|
.Select(s => new { s.Id, s.DefIndex, s.PaintIndex })
|
|
.ToDictionaryAsync(s => (s.DefIndex!.Value, s.PaintIndex!.Value), s => s.Id, ct);
|
|
|
|
// Track which listing ids we touched this run, so a complete pass can flag
|
|
// the rest as Removed.
|
|
var touchedIds = new HashSet<string>();
|
|
var touchedInstanceIds = new HashSet<int>();
|
|
|
|
while (true)
|
|
{
|
|
if (pages >= maxRequests)
|
|
{
|
|
stoppedReason = $"hit max-requests cap ({maxRequests})";
|
|
completePass = false;
|
|
break;
|
|
}
|
|
if (seen >= maxListings)
|
|
{
|
|
stoppedReason = $"hit max-listings cap ({maxListings})";
|
|
completePass = false;
|
|
break;
|
|
}
|
|
|
|
ListingsPageResult page;
|
|
try
|
|
{
|
|
page = await _client.FetchPageAsync(
|
|
defIndex: null, paintIndex: null, sortBy: "most_recent",
|
|
limit: 50, cursor: cursor, ct: ct);
|
|
}
|
|
catch (CsFloatApiException ex)
|
|
{
|
|
_logger.LogError("Sweep aborted: {Message}", ex.Message);
|
|
stoppedReason = $"API error: {ex.Status}";
|
|
completePass = false;
|
|
break;
|
|
}
|
|
|
|
pages++;
|
|
seen += page.Listings.Count;
|
|
|
|
var (ins, upd, link, allKnown) = await IngestPageAsync(
|
|
page.Listings, skinByIndex, touchedIds, touchedInstanceIds, now, ct);
|
|
inserted += ins;
|
|
updated += upd;
|
|
linked += link;
|
|
|
|
_logger.LogInformation(
|
|
"Page {Page}: {Count} listings ({Ins} new, {Upd} updated); {Rate}",
|
|
pages, page.Listings.Count, ins, upd, _client.LastRateLimit);
|
|
|
|
cursor = page.Cursor;
|
|
|
|
// End of the market.
|
|
if (string.IsNullOrEmpty(cursor) || page.Listings.Count == 0)
|
|
{
|
|
stoppedReason = "cursor exhausted";
|
|
break;
|
|
}
|
|
|
|
// Incremental short-circuit: a full page we already knew means we've
|
|
// caught up to the previous sweep. This is a partial pass by design.
|
|
if (incremental && allKnown)
|
|
{
|
|
stoppedReason = "reached already-seen listings (incremental)";
|
|
completePass = false;
|
|
break;
|
|
}
|
|
|
|
await PaceAsync(delayBetweenPages, ct);
|
|
}
|
|
|
|
// Persist inserts/updates before computing Removed so the touched set is durable.
|
|
await _db.SaveChangesAsync(ct);
|
|
|
|
var removed = 0;
|
|
if (completePass)
|
|
removed = await MarkRemovedAsync(touchedIds, now, ct);
|
|
else
|
|
_logger.LogInformation("Partial pass — skipping Removed-tracking to avoid false sales.");
|
|
|
|
await FlagDupesAsync(touchedInstanceIds, now, ct);
|
|
|
|
await _db.ScrapeRuns.AddAsync(
|
|
new ScrapeRun { Source = Source, RanAt = now, ItemCount = seen }, ct);
|
|
await _db.SaveChangesAsync(ct);
|
|
|
|
return new ListingSweepResult(pages, seen, inserted, updated, removed, linked, stoppedReason);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Catalogue-driven sweep: walk skins that have def/paint indexes and query
|
|
/// each one's listings with a server-side def_index+paint_index filter. The
|
|
/// API returns only that skin's listings, so no rate-limit budget is wasted on
|
|
/// stickers/cases/agents — every request is productive weapon data. Because
|
|
/// each skin is paged to completion, Removed-tracking is accurate per skin
|
|
/// even when the overall run is capped: a skin we fully covered but whose old
|
|
/// listing is now absent is genuinely gone.
|
|
/// </summary>
|
|
/// <param name="maxRequests">Hard cap on API pages across the whole run.</param>
|
|
/// <param name="maxListingsPerSkin">Safety cap on pages-worth per skin.</param>
|
|
/// <param name="delayBetweenPages">Optional courtesy delay between pages.</param>
|
|
public async Task<CatalogSweepResult> SweepCatalogAsync(
|
|
int maxRequests = 50,
|
|
int maxListingsPerSkin = 500,
|
|
TimeSpan? delayBetweenPages = null,
|
|
CancellationToken ct = default)
|
|
{
|
|
var now = DateTimeOffset.UtcNow;
|
|
|
|
// Least-recently-swept first (never-swept skins sort first because null
|
|
// orders before any timestamp ascending). This is the cross-run resume:
|
|
// a capped run always continues from where the previous one stopped, and
|
|
// the stalest data refreshes first.
|
|
var skins = await _db.Skins
|
|
.Where(s => s.DefIndex != null && s.PaintIndex != null)
|
|
.OrderBy(s => s.ListingsSweptAt)
|
|
.Select(s => new { s.Id, Def = s.DefIndex!.Value, Paint = s.PaintIndex!.Value })
|
|
.ToListAsync(ct);
|
|
|
|
var pages = 0;
|
|
var seen = 0;
|
|
var inserted = 0;
|
|
var updated = 0;
|
|
var removed = 0;
|
|
var covered = 0;
|
|
var stoppedReason = "all catalogue skins covered";
|
|
|
|
foreach (var skin in skins)
|
|
{
|
|
if (pages >= maxRequests)
|
|
{
|
|
stoppedReason = $"hit max-requests cap ({maxRequests})";
|
|
break;
|
|
}
|
|
|
|
// One-entry lookup so IngestPageAsync resolves SkinId to this skin.
|
|
var lookup = new Dictionary<(int, int), int> { [(skin.Def, skin.Paint)] = skin.Id };
|
|
var touchedIds = new HashSet<string>();
|
|
var touchedInstanceIds = new HashSet<int>();
|
|
string? cursor = null;
|
|
var skinComplete = true;
|
|
var skinSeen = 0;
|
|
|
|
while (true)
|
|
{
|
|
if (pages >= maxRequests)
|
|
{
|
|
stoppedReason = $"hit max-requests cap ({maxRequests})";
|
|
skinComplete = false;
|
|
break;
|
|
}
|
|
|
|
ListingsPageResult page;
|
|
try
|
|
{
|
|
page = await _client.FetchPageAsync(
|
|
defIndex: skin.Def, paintIndex: skin.Paint, sortBy: "lowest_price",
|
|
limit: 50, cursor: cursor, ct: ct);
|
|
}
|
|
catch (CsFloatApiException ex)
|
|
{
|
|
_logger.LogError("Catalogue sweep aborted on skin {SkinId}: {Message}", skin.Id, ex.Message);
|
|
await _db.SaveChangesAsync(ct);
|
|
return Finish($"API error: {ex.Status}");
|
|
}
|
|
|
|
pages++;
|
|
seen += page.Listings.Count;
|
|
skinSeen += page.Listings.Count;
|
|
|
|
var (ins, upd, _, _) = await IngestPageAsync(
|
|
page.Listings, lookup, touchedIds, touchedInstanceIds, now, ct);
|
|
inserted += ins;
|
|
updated += upd;
|
|
|
|
cursor = page.Cursor;
|
|
if (string.IsNullOrEmpty(cursor) || page.Listings.Count == 0)
|
|
break;
|
|
if (skinSeen >= maxListingsPerSkin)
|
|
{
|
|
skinComplete = false; // didn't reach the end; don't mark Removed
|
|
break;
|
|
}
|
|
|
|
await PaceAsync(delayBetweenPages, ct);
|
|
}
|
|
|
|
// Per-skin Removed-tracking + resume stamp: only when this skin was
|
|
// paged to the end. A partial skin (hit the per-skin cap) is left with
|
|
// its old ListingsSweptAt so the next run revisits it first.
|
|
if (skinComplete)
|
|
{
|
|
removed += await MarkRemovedForSkinAsync(skin.Id, touchedIds, now, ct);
|
|
await _db.Skins
|
|
.Where(s => s.Id == skin.Id)
|
|
.ExecuteUpdateAsync(
|
|
setters => setters.SetProperty(s => s.ListingsSweptAt, now), ct);
|
|
covered++;
|
|
}
|
|
|
|
// Persist this skin's listings/instances before dupe analysis so the
|
|
// asset-id grouping query sees them.
|
|
await _db.SaveChangesAsync(ct);
|
|
await FlagDupesAsync(touchedInstanceIds, now, ct);
|
|
|
|
await _db.SaveChangesAsync(ct);
|
|
await PaceAsync(delayBetweenPages, ct);
|
|
}
|
|
|
|
await _db.ScrapeRuns.AddAsync(
|
|
new ScrapeRun { Source = CatalogSource, RanAt = now, ItemCount = seen }, ct);
|
|
await _db.SaveChangesAsync(ct);
|
|
|
|
return Finish(stoppedReason);
|
|
|
|
CatalogSweepResult Finish(string reason) =>
|
|
new(covered, skins.Count - covered, pages, seen, inserted, updated, removed, reason);
|
|
}
|
|
|
|
// Flag this skin's once-Active listings that we didn't see this run as Removed.
|
|
private async Task<int> MarkRemovedForSkinAsync(
|
|
int skinId, HashSet<string> touchedIds, DateTimeOffset now, CancellationToken ct)
|
|
{
|
|
return await _db.Listings
|
|
.Where(l => l.SkinId == skinId
|
|
&& l.Status == ListingStatus.Active
|
|
&& !touchedIds.Contains(l.CsFloatListingId))
|
|
.ExecuteUpdateAsync(
|
|
setters => setters
|
|
.SetProperty(l => l.Status, ListingStatus.Removed)
|
|
.SetProperty(l => l.RemovedAt, now),
|
|
ct);
|
|
}
|
|
|
|
// Upsert a page of listings. Returns counts plus whether every listing on the
|
|
// page already existed (the incremental stop signal). Also resolves each
|
|
// listing to a SkinInstance (the physical item, by fingerprint) and records
|
|
// the touched instance ids so the caller can run dupe detection over them.
|
|
private async Task<(int Inserted, int Updated, int Linked, bool AllKnown)> IngestPageAsync(
|
|
IReadOnlyList<CsFloatListing> listings,
|
|
IReadOnlyDictionary<(int, int), int> skinByIndex,
|
|
HashSet<string> touchedIds,
|
|
HashSet<int> touchedInstanceIds,
|
|
DateTimeOffset now,
|
|
CancellationToken ct)
|
|
{
|
|
if (listings.Count == 0)
|
|
return (0, 0, 0, true);
|
|
|
|
var ids = listings.Select(l => l.ListingId).ToList();
|
|
var existing = await _db.Listings
|
|
.Where(l => ids.Contains(l.CsFloatListingId))
|
|
.ToDictionaryAsync(l => l.CsFloatListingId, ct);
|
|
|
|
var inserted = 0;
|
|
var updated = 0;
|
|
var linked = 0;
|
|
var allKnown = true;
|
|
|
|
foreach (var l in listings)
|
|
{
|
|
touchedIds.Add(l.ListingId);
|
|
int? skinId = skinByIndex.TryGetValue((l.DefIndex, l.PaintIndex), out var id) ? id : null;
|
|
if (skinId is not null)
|
|
linked++;
|
|
|
|
// Resolve the physical item only when we know the skin — the
|
|
// fingerprint is meaningless without it.
|
|
var instance = skinId is { } sid
|
|
? await ResolveInstanceAsync(sid, l, now, ct)
|
|
: null;
|
|
if (instance is not null)
|
|
touchedInstanceIds.Add(instance.Id);
|
|
|
|
if (existing.TryGetValue(l.ListingId, out var row))
|
|
{
|
|
// Refresh mutable fields. Price can change; a re-appeared listing
|
|
// returns to Active.
|
|
row.Price = l.Price;
|
|
row.LastSeenAt = now;
|
|
row.Status = ListingStatus.Active;
|
|
row.RemovedAt = null;
|
|
row.SkinId = skinId;
|
|
row.AssetId = l.AssetId;
|
|
row.SkinInstance = instance;
|
|
updated++;
|
|
}
|
|
else
|
|
{
|
|
allKnown = false;
|
|
var entity = MapToEntity(l, skinId, now);
|
|
entity.SkinInstance = instance;
|
|
_db.Listings.Add(entity);
|
|
inserted++;
|
|
}
|
|
}
|
|
|
|
return (inserted, updated, linked, allKnown);
|
|
}
|
|
|
|
// Find the SkinInstance matching this listing's fingerprint, or create one.
|
|
// The fingerprint is (skin, full-precision float, seed, stattrak, souvenir).
|
|
// It is deliberately NOT unique — duped copies share it — so a match may
|
|
// already represent more than one physical item; dupe detection runs later.
|
|
private async Task<SkinInstance> ResolveInstanceAsync(
|
|
int skinId, CsFloatListing l, DateTimeOffset now, CancellationToken ct)
|
|
{
|
|
var seed = l.PaintSeed.ToString();
|
|
|
|
// Check the change-tracker first (an instance just added earlier this page
|
|
// isn't queryable yet), then the database.
|
|
var tracked = _db.ChangeTracker.Entries<SkinInstance>()
|
|
.Select(e => e.Entity)
|
|
.FirstOrDefault(i => i.SkinId == skinId && i.FloatValue == l.FloatValue
|
|
&& i.PaintSeed == seed && i.StatTrak == l.IsStatTrak && i.Souvenir == l.IsSouvenir);
|
|
if (tracked is not null)
|
|
{
|
|
tracked.LastSeenAt = now;
|
|
return tracked;
|
|
}
|
|
|
|
var instance = await _db.SkinInstances.FirstOrDefaultAsync(
|
|
i => i.SkinId == skinId && i.FloatValue == l.FloatValue
|
|
&& i.PaintSeed == seed && i.StatTrak == l.IsStatTrak && i.Souvenir == l.IsSouvenir,
|
|
ct);
|
|
|
|
if (instance is not null)
|
|
{
|
|
instance.LastSeenAt = now;
|
|
return instance;
|
|
}
|
|
|
|
instance = new SkinInstance
|
|
{
|
|
SkinId = skinId,
|
|
FloatValue = l.FloatValue,
|
|
PaintSeed = seed,
|
|
StatTrak = l.IsStatTrak,
|
|
Souvenir = l.IsSouvenir,
|
|
FirstSeenAt = now,
|
|
LastSeenAt = now,
|
|
};
|
|
_db.SkinInstances.Add(instance);
|
|
return instance;
|
|
}
|
|
|
|
private static Listing MapToEntity(CsFloatListing l, int? skinId, DateTimeOffset now) => new()
|
|
{
|
|
CsFloatListingId = l.ListingId,
|
|
Type = l.Type,
|
|
Price = l.Price,
|
|
ListedAt = l.CreatedAt,
|
|
AssetId = l.AssetId,
|
|
DefIndex = l.DefIndex,
|
|
PaintIndex = l.PaintIndex,
|
|
MarketHashName = l.MarketHashName,
|
|
WearName = l.WearName,
|
|
FloatValue = l.FloatValue,
|
|
PaintSeed = l.PaintSeed,
|
|
IsStatTrak = l.IsStatTrak,
|
|
IsSouvenir = l.IsSouvenir,
|
|
StickerCount = l.StickerCount,
|
|
SellerSteamId = l.SellerSteamId,
|
|
InspectLink = l.InspectLink,
|
|
SkinId = skinId,
|
|
FirstSeenAt = now,
|
|
LastSeenAt = now,
|
|
Status = ListingStatus.Active,
|
|
};
|
|
|
|
// Flag every currently-Active listing we did NOT see this run as Removed.
|
|
// Only called after a complete pass. Done in a single set-based update to
|
|
// avoid loading the whole table.
|
|
private async Task<int> MarkRemovedAsync(
|
|
HashSet<string> touchedIds, DateTimeOffset now, CancellationToken ct)
|
|
{
|
|
return await _db.Listings
|
|
.Where(l => l.Status == ListingStatus.Active && !touchedIds.Contains(l.CsFloatListingId))
|
|
.ExecuteUpdateAsync(
|
|
setters => setters
|
|
.SetProperty(l => l.Status, ListingStatus.Removed)
|
|
.SetProperty(l => l.RemovedAt, now),
|
|
ct);
|
|
}
|
|
|
|
// Dupe detection. For each instance touched this run, count the DISTINCT
|
|
// asset ids among its currently-Active listings. Two or more means the same
|
|
// fingerprint (skin+float+seed+ST+souvenir) is live under multiple Steam
|
|
// assets at once — the signature of a duplicated item, as opposed to an
|
|
// ordinary trade (which retires the old listing before the new one appears,
|
|
// leaving a single active asset). Flags freshly-detected dupes and stamps
|
|
// when first seen, enabling "alert on fresh duping" downstream.
|
|
private async Task FlagDupesAsync(
|
|
HashSet<int> instanceIds, DateTimeOffset now, CancellationToken ct)
|
|
{
|
|
if (instanceIds.Count == 0)
|
|
return;
|
|
|
|
// Instances (among those touched) with 2+ distinct active asset ids.
|
|
var dupeInstanceIds = await _db.Listings
|
|
.Where(l => l.SkinInstanceId != null
|
|
&& instanceIds.Contains(l.SkinInstanceId!.Value)
|
|
&& l.Status == ListingStatus.Active
|
|
&& l.AssetId != null)
|
|
.GroupBy(l => l.SkinInstanceId!.Value)
|
|
.Where(g => g.Select(l => l.AssetId).Distinct().Count() >= 2)
|
|
.Select(g => g.Key)
|
|
.ToListAsync(ct);
|
|
|
|
if (dupeInstanceIds.Count == 0)
|
|
return;
|
|
|
|
// Flag only those not already flagged, stamping first-seen once. Instances
|
|
// already marked stay marked (they're excluded by the !SuspectedDupe filter).
|
|
var newlyFlagged = await _db.SkinInstances
|
|
.Where(i => dupeInstanceIds.Contains(i.Id) && !i.SuspectedDupe)
|
|
.ExecuteUpdateAsync(
|
|
setters => setters
|
|
.SetProperty(i => i.SuspectedDupe, true)
|
|
.SetProperty(i => i.DupeFirstSeenAt, now),
|
|
ct);
|
|
|
|
if (newlyFlagged > 0)
|
|
_logger.LogWarning(
|
|
"Dupe detection: {Count} instance(s) newly flagged as suspected dupes.", newlyFlagged);
|
|
}
|
|
|
|
// Pace requests against the rate limit: if the bucket is nearly empty, sleep
|
|
// until the reset epoch. Otherwise apply only the optional courtesy delay.
|
|
private async Task PaceAsync(TimeSpan? delay, CancellationToken ct)
|
|
{
|
|
var rate = _client.LastRateLimit;
|
|
if (rate.Remaining is { } remaining && remaining <= RateLimitSafetyMargin
|
|
&& long.TryParse(rate.Reset, out var resetEpoch))
|
|
{
|
|
var resetAt = DateTimeOffset.FromUnixTimeSeconds(resetEpoch);
|
|
var wait = resetAt - DateTimeOffset.UtcNow;
|
|
if (wait > TimeSpan.Zero)
|
|
{
|
|
_logger.LogWarning(
|
|
"Rate limit nearly exhausted ({Remaining} left); sleeping {Seconds:0}s until reset.",
|
|
remaining, wait.TotalSeconds);
|
|
await Task.Delay(wait, ct);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (delay is { } d && d > TimeSpan.Zero)
|
|
await Task.Delay(d, ct);
|
|
}
|
|
}
|