Built and unit-tested ahead of a live playtest window: - reverse/capture_hosts.py: pcap -> DNS/SNI/endpoints in order; extracts PlayFab TitleId, flags hologryph master-server region + config CDN. - reverse/ws_scrape.py: TCP reassembly + RFC-6455 framing for the cleartext ws://<region>. hologryph.com/gameclient/ stream; decodes JSON/BSON/MessagePack; auto-labels ServerDto, CompartmentDefinitionDto, ResearchNodeJsonDto, OperationResult, etc. No MITM needed. - reverse/playfab_scrape.py: LoginWithSteam (or captured EntityToken) -> Catalog/SearchItems (+ Inventory/TitleData); prices resolved to item names. Read-only. - docs/SCRAPE_RUNBOOK.md: turnkey steps for when servers are online.
185 lines
7.5 KiB
Python
185 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Scrape SAND's PlayFab Economy (catalog + prices + inventory) via the public REST API.
|
|
|
|
Backend is Azure PlayFab (Economy v2). Auth is `LoginWithSteam`. Everything here is the
|
|
documented PlayFab REST surface (https://learn.microsoft.com/gaming/playfab/), run from
|
|
*outside* the game process — no BattlEye interaction.
|
|
|
|
Two ways in:
|
|
--steam-ticket <hex> do Client/LoginWithSteam to get a fresh SessionTicket + EntityToken
|
|
--entity-token <tok> skip login, use an EntityToken you captured (e.g. from your MITM)
|
|
|
|
The only constant you must supply is --title-id (the PlayFab TitleId). Get it from a live
|
|
capture: `reverse/capture_hosts.py` prints it from the `<titleId>.playfabapi.com` host.
|
|
|
|
Modes (combine freely):
|
|
--catalog Catalog/SearchItems -> every item with PriceOptions + DisplayProperties
|
|
--inventory Inventory/GetInventoryItems + GetTransactionHistory (your wallet/items)
|
|
--titledata Client/GetTitleData (config key/values, if any)
|
|
|
|
Read-only endpoints only. Output -> extracted/playfab_<mode>.json, item ids resolved to names.
|
|
|
|
Example:
|
|
venv/bin/python reverse/playfab_scrape.py --title-id ABCDE --steam-ticket 14000000... --catalog
|
|
"""
|
|
import sys, os, json, argparse
|
|
import requests
|
|
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
NAMES_PATH = os.path.join(ROOT, "extracted", "item_names.json")
|
|
|
|
|
|
def load_names():
|
|
try:
|
|
d = json.load(open(NAMES_PATH))["items"]
|
|
return {k: (v.get("name") or k) for k, v in d.items()}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
class PlayFab:
|
|
def __init__(self, title_id):
|
|
self.title = title_id
|
|
self.base = "https://%s.playfabapi.com" % title_id
|
|
self.session_ticket = None
|
|
self.entity_token = None
|
|
self.s = requests.Session()
|
|
self.s.headers["Content-Type"] = "application/json"
|
|
|
|
def _post(self, path, body, headers=None):
|
|
r = self.s.post(self.base + path, data=json.dumps(body),
|
|
headers=headers or {}, timeout=30)
|
|
try:
|
|
j = r.json()
|
|
except Exception:
|
|
r.raise_for_status()
|
|
raise
|
|
if r.status_code != 200 or j.get("code") not in (200, None):
|
|
raise RuntimeError("%s -> %s %s" % (path, r.status_code,
|
|
j.get("errorMessage") or j.get("error") or j))
|
|
return j.get("data", j)
|
|
|
|
def login_steam(self, ticket_hex, service_specific=False):
|
|
body = {"TitleId": self.title, "SteamTicket": ticket_hex,
|
|
"CreateAccount": False, "TicketIsServiceSpecific": service_specific}
|
|
d = self._post("/Client/LoginWithSteam", body)
|
|
self.session_ticket = d.get("SessionTicket")
|
|
et = d.get("EntityToken") or {}
|
|
self.entity_token = et.get("EntityToken")
|
|
print(" logged in: PlayFabId=%s entity=%s" %
|
|
(d.get("PlayFabId"), (et.get("Entity") or {}).get("Id")))
|
|
return d
|
|
|
|
# ---- Economy v2 (Entity API; needs X-EntityToken) ----
|
|
def search_items(self):
|
|
items, token = [], None
|
|
while True:
|
|
body = {"Count": 50, "Filter": "", "Search": ""}
|
|
if token:
|
|
body["ContinuationToken"] = token
|
|
d = self._post("/Catalog/SearchItems", body,
|
|
headers={"X-EntityToken": self.entity_token})
|
|
items += d.get("Items", [])
|
|
token = d.get("ContinuationToken")
|
|
print(" catalog: %d items so far..." % len(items))
|
|
if not token:
|
|
break
|
|
return items
|
|
|
|
def inventory(self):
|
|
out = {}
|
|
out["items"] = self._post("/Inventory/GetInventoryItems", {"Count": 50},
|
|
headers={"X-EntityToken": self.entity_token}).get("Items", [])
|
|
try:
|
|
out["transactions"] = self._post("/Inventory/GetTransactionHistory", {"Count": 50},
|
|
headers={"X-EntityToken": self.entity_token}).get("Transactions", [])
|
|
except Exception as e:
|
|
out["transactions_error"] = str(e)
|
|
return out
|
|
|
|
def title_data(self):
|
|
return self._post("/Client/GetTitleData", {},
|
|
headers={"X-Authorization": self.session_ticket}).get("Data", {})
|
|
|
|
|
|
def summarize_catalog(items, names):
|
|
"""Flatten each catalog item to {id, friendlyId, name, type, prices, displayProperties}."""
|
|
rows = []
|
|
for it in items:
|
|
alt = {a.get("Type"): a.get("Value") for a in it.get("AlternateIds", [])}
|
|
fid = alt.get("FriendlyId") or it.get("Id")
|
|
prices = []
|
|
for po in (it.get("PriceOptions") or {}).get("Prices", []):
|
|
legs = [{"currency": a.get("ItemId"),
|
|
"currencyName": names.get(a.get("ItemId"), a.get("ItemId")),
|
|
"amount": a.get("Amount")} for a in po.get("Amounts", [])]
|
|
prices.append(legs)
|
|
title = it.get("Title", {})
|
|
rows.append({
|
|
"id": it.get("Id"),
|
|
"friendlyId": fid,
|
|
"name": (title.get("NEUTRAL") or names.get(fid) or fid),
|
|
"type": it.get("Type"),
|
|
"contentType": it.get("ContentType"),
|
|
"prices": prices,
|
|
"displayProperties": it.get("DisplayProperties"),
|
|
})
|
|
return rows
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--title-id", required=True)
|
|
ap.add_argument("--steam-ticket")
|
|
ap.add_argument("--entity-token")
|
|
ap.add_argument("--ticket-service-specific", action="store_true")
|
|
ap.add_argument("--catalog", action="store_true")
|
|
ap.add_argument("--inventory", action="store_true")
|
|
ap.add_argument("--titledata", action="store_true")
|
|
args = ap.parse_args()
|
|
if not (args.steam_ticket or args.entity_token):
|
|
sys.exit("need --steam-ticket OR --entity-token")
|
|
if not (args.catalog or args.inventory or args.titledata):
|
|
args.catalog = True # default
|
|
|
|
names = load_names()
|
|
pf = PlayFab(args.title_id)
|
|
if args.entity_token:
|
|
pf.entity_token = args.entity_token
|
|
print(" using captured EntityToken")
|
|
if args.steam_ticket:
|
|
pf.login_steam(args.steam_ticket, args.ticket_service_specific)
|
|
|
|
os.makedirs(os.path.join(ROOT, "extracted"), exist_ok=True)
|
|
|
|
if args.catalog:
|
|
items = pf.search_items()
|
|
rows = summarize_catalog(items, names)
|
|
out = os.path.join(ROOT, "extracted", "playfab_catalog.json")
|
|
json.dump({"_source": "PlayFab Catalog/SearchItems", "count": len(rows),
|
|
"items_raw": items, "items": rows}, open(out, "w"),
|
|
indent=1, ensure_ascii=False)
|
|
print("wrote %s (%d items)" % (out, len(rows)))
|
|
priced = [r for r in rows if r["prices"]]
|
|
print(" %d items carry a price. sample:" % len(priced))
|
|
for r in priced[:10]:
|
|
pr = " | ".join(" + ".join("%s %s" % (l["amount"], l["currencyName"]) for l in legs)
|
|
for legs in r["prices"])
|
|
print(" %-32s %s" % (r["name"], pr))
|
|
|
|
if args.inventory and pf.entity_token:
|
|
inv = pf.inventory()
|
|
out = os.path.join(ROOT, "extracted", "playfab_inventory.json")
|
|
json.dump(inv, open(out, "w"), indent=1, ensure_ascii=False)
|
|
print("wrote", out)
|
|
|
|
if args.titledata and pf.session_ticket:
|
|
td = pf.title_data()
|
|
out = os.path.join(ROOT, "extracted", "playfab_titledata.json")
|
|
json.dump(td, open(out, "w"), indent=1, ensure_ascii=False)
|
|
print("wrote %s (%d keys)" % (out, len(td)))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|