diff --git a/docs/SCRAPE_RUNBOOK.md b/docs/SCRAPE_RUNBOOK.md new file mode 100644 index 0000000..53f0d99 --- /dev/null +++ b/docs/SCRAPE_RUNBOOK.md @@ -0,0 +1,67 @@ +# Live-scrape runbook — when a playtest is online + +Everything below is read-only and runs outside the game process (no BattlEye interaction). +Tooling is built and unit-tested; the only thing that needs a live backend is the data itself. + +## 0. Capture (once servers are up) + +1. `ipconfig /flushdns` (so hostnames show as clean DNS queries, incl. the PlayFab TitleId). +2. Start a packet capture on the game NIC (Wireshark, or `pktmon`/`dumpcap`). Save as `.pcapng`. + - Master-server traffic is **cleartext `ws://` on port 80** — Wireshark reads it directly, + **no MITM/cert needed**. + - PlayFab is HTTPS/443 — to read its bodies you need your MITM (cert already installed) on 443, + or use the REST scraper (step 3) instead. +3. Launch SAND, **click through past the "no servers"/welcome dialog and let it log in**, then open + the screens whose data you want (walker editor → compartment defs; research tree; store → prices). + Keep capturing through it. Stop the capture. + +## 1. Triage the capture → get the TitleId + confirm the master server + +```bash +venv/bin/python reverse/capture_hosts.py +``` +Prints DNS/SNI/endpoints in order and a **BACKENDS DETECTED** block: +- `PlayFab host=.playfabapi.com ** TitleId = **` ← the one constant the REST scraper needs +- `Master server host=.hologryph.com (ws://80 cleartext)` +- `Config CDN host=sandconfigstorage…` + +## 2. Master server (compartments + research tree + server list) — cleartext, no auth replay + +```bash +venv/bin/python reverse/ws_scrape.py --out extracted/master_ws.json +``` +Reassembles the port-80 WebSocket to `*.hologryph.com/gameclient/`, parses RFC-6455 frames, and +decodes each message (tries JSON → BSON → MessagePack — the game's `IDataSerializer` is JSON-likely). +Messages are auto-tagged when their shape matches a known DTO: +`ServerDto`, `RegionInfo`, **`CompartmentDefinitionDto`** (HP/Weight/Properties/prices), +**`ResearchNodeJsonDto`** (connections via `RequiredNodesIds`/`DependentNodesIds`, costs via +`ResearchPrice`), `ItemDto`/`ShopItemDto`/`PriceDto`, `OperationResult`, `IClientEvent`. +If it finds no WS stream, the capture didn't span the master-server connection (re-capture through +the login), or try `--port`/`--host`. + +> First run, eyeball one frame to confirm the encoding (JSON vs BSON). The decoder already handles +> both; this is just a sanity check. + +## 3. PlayFab prices / catalog / inventory + +Either read them from the MITM'd 443 capture, **or** pull them directly (cleaner, gets the *full* +catalog, more than the client requests): + +```bash +# with a Steam auth ticket (captured, or minted via Steamworks GetAuthSessionTicket): +venv/bin/python reverse/playfab_scrape.py --title-id --steam-ticket --catalog --inventory + +# or skip login with an EntityToken lifted from your MITM capture: +venv/bin/python reverse/playfab_scrape.py --title-id --entity-token --catalog +``` +`--catalog` → `extracted/playfab_catalog.json`: every item with `PriceOptions` (→ currency-item + +amount, names resolved via `extracted/item_names.json`) and `DisplayProperties` (check here for any +catalog-authored base stats). `--inventory` → wallet + items + transaction history. `--titledata` +→ `Client/GetTitleData` config blobs. Read-only endpoints only — no write/purchase calls. + +## Notes / unknowns to confirm live +- **WS payload encoding** (JSON vs BSON): decoder handles both; confirm on first capture. +- **Steam ticket reuse**: tickets are short-lived/single-use — if `--steam-ticket` fails, lift an + `EntityToken` from the MITM capture and use `--entity-token` instead. +- **Damage**: still server-computed; check `DisplayProperties` (catalog) and + `CompartmentDefinitionDto.Properties` (master server) for any base values — don't assume present. diff --git a/reverse/capture_hosts.py b/reverse/capture_hosts.py new file mode 100644 index 0000000..b849bf9 --- /dev/null +++ b/reverse/capture_hosts.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +"""Quick triage of a SAND network capture: every host the client touched, in order, +with the bits we care about highlighted. + +Pulls: + - DNS queries (order = startup sequence; flush DNS first for a clean list) + - TLS SNI (HTTPS hostnames even when DNS was cached) + - TCP/UDP endpoints +and flags the three backends we care about: + - .playfabapi.com -> prints the **PlayFab TitleId** (the one constant the + REST scraper needs) + - .hologryph.com -> the master-server region (ws://, port 80, cleartext) + - sandconfigstorage.blob... -> the anonymous config CDN + +Usage: venv/bin/python reverse/capture_hosts.py +""" +import sys, re +from scapy.all import rdpcap, DNS, DNSQR, DNSRR, IP, IPv6, TCP, UDP, Raw + + +def tls_sni(b): + """Extract SNI from a TLS ClientHello payload (bytes). None if not a ClientHello.""" + try: + if len(b) < 6 or b[0] != 0x16 or b[5] != 0x01: + return None + i = 5 + 4 + 2 + 32 # rec hdr + hs hdr + version + random + i += 1 + b[i] # session id + i += 2 + int.from_bytes(b[i:i + 2], "big") # cipher suites + i += 1 + b[i] # compression methods + end = i + 2 + int.from_bytes(b[i:i + 2], "big") + i += 2 + while i + 4 <= end: + et = int.from_bytes(b[i:i + 2], "big") + el = int.from_bytes(b[i + 2:i + 4], "big") + i += 4 + if et == 0: # server_name + j = i + 2 + nl = int.from_bytes(b[j + 1:j + 3], "big") + return b[j + 3:j + 3 + nl].decode(errors="replace") + i += el + except Exception: + return None + return None + + +PLAYFAB = re.compile(r"^([0-9A-Fa-f]{4,7})\.playfabapi\.com$") +HOLOGRYPH = re.compile(r"^([a-z0-9-]+)\.hologryph\.com$", re.I) + + +def main(): + if len(sys.argv) < 2: + sys.exit("usage: capture_hosts.py ") + pk = rdpcap(sys.argv[1]) + t0 = float(pk[0].time) + + dns_order, dns_seen = [], set() + ip2host, snis = {}, {} + tcp_first, udp_first = {}, {} + + for p in pk: + if p.haslayer(DNS): + d = p[DNS] + try: + qn = p[DNSQR].qname.decode(errors="replace").rstrip(".") + except Exception: + qn = None + if qn and d.qr == 0 and not qn.endswith(".local") and qn != "wpad.localdomain": + if qn not in dns_seen: + dns_seen.add(qn) + dns_order.append((float(p.time) - t0, qn)) + if qn and d.qr == 1 and d.ancount: + for k in range(d.ancount): + rr = d.an[k] + if rr.type in (1, 28): + try: + ip2host[str(rr.rdata)] = qn + except Exception: + pass + ipl = p[IP] if p.haslayer(IP) else (p[IPv6] if p.haslayer(IPv6) else None) + if ipl is None: + continue + if p.haslayer(TCP): + t = p[TCP] + if t.flags & 0x02 and not t.flags & 0x10: + tcp_first.setdefault((ipl.dst, t.dport), float(p.time) - t0) + if p.haslayer(Raw) and t.dport == 443: + s = tls_sni(bytes(p[Raw].load)) + if s: + snis[ipl.dst] = s + elif p.haslayer(UDP): + u = p[UDP] + if u.dport not in (53, 5353, 1900, 5355, 137) and u.sport not in (53, 5353): + key = (ipl.dst, u.dport) if u.dport < u.sport else (ipl.src, u.sport) + udp_first.setdefault(key, float(p.time) - t0) + + def label(ip): + return snis.get(ip) or ip2host.get(ip, "") + + print("=== DNS queries (in order) ===") + for ts, q in dns_order: + print(" +%7.2fs %s" % (ts, q)) + + print("\n=== TCP destinations (first SYN) ===") + for (ip, port), ts in sorted(tcp_first.items(), key=lambda x: x[1]): + print(" +%7.2fs %-17s :%-5s %s" % (ts, ip, port, label(ip))) + + print("\n=== UDP destinations ===") + for (ip, port), ts in sorted(udp_first.items(), key=lambda x: x[1])[:20]: + print(" +%7.2fs %-17s :%-5s %s" % (ts, ip, port, label(ip))) + + # ---- the three backends we care about ---- + print("\n=== BACKENDS DETECTED ===") + allhosts = set(q for _, q in dns_order) | set(snis.values()) | set(ip2host.values()) + found = False + for h in sorted(allhosts): + m = PLAYFAB.match(h) + if m: + print(" PlayFab host=%s ** TitleId = %s **" % (h, m.group(1).upper())) + found = True + elif HOLOGRYPH.match(h) and "gameclient" not in h: + print(" Master server host=%s (region=%s, ws://80 cleartext)" + % (h, HOLOGRYPH.match(h).group(1))) + found = True + elif "sandconfigstorage" in h: + print(" Config CDN host=%s (anonymous HTTPS)" % h) + found = True + if not found: + print(" (none of PlayFab / hologryph / config-blob seen — backend not contacted)") + + +if __name__ == "__main__": + main() diff --git a/reverse/playfab_scrape.py b/reverse/playfab_scrape.py new file mode 100644 index 0000000..0a57c56 --- /dev/null +++ b/reverse/playfab_scrape.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +"""Scrape SAND's PlayFab Economy (catalog + prices + inventory) via the public REST API. + +Backend is Azure PlayFab (Economy v2). Auth is `LoginWithSteam`. Everything here is the +documented PlayFab REST surface (https://learn.microsoft.com/gaming/playfab/), run from +*outside* the game process — no BattlEye interaction. + +Two ways in: + --steam-ticket do Client/LoginWithSteam to get a fresh SessionTicket + EntityToken + --entity-token skip login, use an EntityToken you captured (e.g. from your MITM) + +The only constant you must supply is --title-id (the PlayFab TitleId). Get it from a live +capture: `reverse/capture_hosts.py` prints it from the `.playfabapi.com` host. + +Modes (combine freely): + --catalog Catalog/SearchItems -> every item with PriceOptions + DisplayProperties + --inventory Inventory/GetInventoryItems + GetTransactionHistory (your wallet/items) + --titledata Client/GetTitleData (config key/values, if any) + +Read-only endpoints only. Output -> extracted/playfab_.json, item ids resolved to names. + +Example: + venv/bin/python reverse/playfab_scrape.py --title-id ABCDE --steam-ticket 14000000... --catalog +""" +import sys, os, json, argparse +import requests + +ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +NAMES_PATH = os.path.join(ROOT, "extracted", "item_names.json") + + +def load_names(): + try: + d = json.load(open(NAMES_PATH))["items"] + return {k: (v.get("name") or k) for k, v in d.items()} + except Exception: + return {} + + +class PlayFab: + def __init__(self, title_id): + self.title = title_id + self.base = "https://%s.playfabapi.com" % title_id + self.session_ticket = None + self.entity_token = None + self.s = requests.Session() + self.s.headers["Content-Type"] = "application/json" + + def _post(self, path, body, headers=None): + r = self.s.post(self.base + path, data=json.dumps(body), + headers=headers or {}, timeout=30) + try: + j = r.json() + except Exception: + r.raise_for_status() + raise + if r.status_code != 200 or j.get("code") not in (200, None): + raise RuntimeError("%s -> %s %s" % (path, r.status_code, + j.get("errorMessage") or j.get("error") or j)) + return j.get("data", j) + + def login_steam(self, ticket_hex, service_specific=False): + body = {"TitleId": self.title, "SteamTicket": ticket_hex, + "CreateAccount": False, "TicketIsServiceSpecific": service_specific} + d = self._post("/Client/LoginWithSteam", body) + self.session_ticket = d.get("SessionTicket") + et = d.get("EntityToken") or {} + self.entity_token = et.get("EntityToken") + print(" logged in: PlayFabId=%s entity=%s" % + (d.get("PlayFabId"), (et.get("Entity") or {}).get("Id"))) + return d + + # ---- Economy v2 (Entity API; needs X-EntityToken) ---- + def search_items(self): + items, token = [], None + while True: + body = {"Count": 50, "Filter": "", "Search": ""} + if token: + body["ContinuationToken"] = token + d = self._post("/Catalog/SearchItems", body, + headers={"X-EntityToken": self.entity_token}) + items += d.get("Items", []) + token = d.get("ContinuationToken") + print(" catalog: %d items so far..." % len(items)) + if not token: + break + return items + + def inventory(self): + out = {} + out["items"] = self._post("/Inventory/GetInventoryItems", {"Count": 50}, + headers={"X-EntityToken": self.entity_token}).get("Items", []) + try: + out["transactions"] = self._post("/Inventory/GetTransactionHistory", {"Count": 50}, + headers={"X-EntityToken": self.entity_token}).get("Transactions", []) + except Exception as e: + out["transactions_error"] = str(e) + return out + + def title_data(self): + return self._post("/Client/GetTitleData", {}, + headers={"X-Authorization": self.session_ticket}).get("Data", {}) + + +def summarize_catalog(items, names): + """Flatten each catalog item to {id, friendlyId, name, type, prices, displayProperties}.""" + rows = [] + for it in items: + alt = {a.get("Type"): a.get("Value") for a in it.get("AlternateIds", [])} + fid = alt.get("FriendlyId") or it.get("Id") + prices = [] + for po in (it.get("PriceOptions") or {}).get("Prices", []): + legs = [{"currency": a.get("ItemId"), + "currencyName": names.get(a.get("ItemId"), a.get("ItemId")), + "amount": a.get("Amount")} for a in po.get("Amounts", [])] + prices.append(legs) + title = it.get("Title", {}) + rows.append({ + "id": it.get("Id"), + "friendlyId": fid, + "name": (title.get("NEUTRAL") or names.get(fid) or fid), + "type": it.get("Type"), + "contentType": it.get("ContentType"), + "prices": prices, + "displayProperties": it.get("DisplayProperties"), + }) + return rows + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--title-id", required=True) + ap.add_argument("--steam-ticket") + ap.add_argument("--entity-token") + ap.add_argument("--ticket-service-specific", action="store_true") + ap.add_argument("--catalog", action="store_true") + ap.add_argument("--inventory", action="store_true") + ap.add_argument("--titledata", action="store_true") + args = ap.parse_args() + if not (args.steam_ticket or args.entity_token): + sys.exit("need --steam-ticket OR --entity-token") + if not (args.catalog or args.inventory or args.titledata): + args.catalog = True # default + + names = load_names() + pf = PlayFab(args.title_id) + if args.entity_token: + pf.entity_token = args.entity_token + print(" using captured EntityToken") + if args.steam_ticket: + pf.login_steam(args.steam_ticket, args.ticket_service_specific) + + os.makedirs(os.path.join(ROOT, "extracted"), exist_ok=True) + + if args.catalog: + items = pf.search_items() + rows = summarize_catalog(items, names) + out = os.path.join(ROOT, "extracted", "playfab_catalog.json") + json.dump({"_source": "PlayFab Catalog/SearchItems", "count": len(rows), + "items_raw": items, "items": rows}, open(out, "w"), + indent=1, ensure_ascii=False) + print("wrote %s (%d items)" % (out, len(rows))) + priced = [r for r in rows if r["prices"]] + print(" %d items carry a price. sample:" % len(priced)) + for r in priced[:10]: + pr = " | ".join(" + ".join("%s %s" % (l["amount"], l["currencyName"]) for l in legs) + for legs in r["prices"]) + print(" %-32s %s" % (r["name"], pr)) + + if args.inventory and pf.entity_token: + inv = pf.inventory() + out = os.path.join(ROOT, "extracted", "playfab_inventory.json") + json.dump(inv, open(out, "w"), indent=1, ensure_ascii=False) + print("wrote", out) + + if args.titledata and pf.session_ticket: + td = pf.title_data() + out = os.path.join(ROOT, "extracted", "playfab_titledata.json") + json.dump(td, open(out, "w"), indent=1, ensure_ascii=False) + print("wrote %s (%d keys)" % (out, len(td))) + + +if __name__ == "__main__": + main() diff --git a/reverse/ws_scrape.py b/reverse/ws_scrape.py new file mode 100644 index 0000000..0e37125 --- /dev/null +++ b/reverse/ws_scrape.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +"""Decode the SAND master-server WebSocket stream out of a packet capture. + +The master server is a .NET `ClientWebSocket` to `ws://.hologryph.com/gameclient/` +(port 80, **cleartext** — confirmed from the IL2CPP metadata: only `ws://` literals, no `wss`). +Messages are request/response `OperationResult` plus server-push `IClientEvent`s, serialized +by the game's `IDataSerializer` (JsonDataSerializer / BsonDataSerializer — JSON is the likely +default; this decoder tries JSON, BSON and MessagePack so the exact encoding doesn't block us). + +What it does: + 1. groups packets into TCP streams, reassembles each direction by sequence number + 2. finds the WebSocket HTTP upgrade (`GET /gameclient/ ... Upgrade: websocket` / `101`) + 3. parses RFC-6455 frames (handles masking + continuation), per direction + 4. decodes each message payload (JSON -> BSON -> MessagePack -> hex) and prints it + 5. tags messages whose shape matches a known DTO (ServerDto, RegionInfo, + CompartmentDefinitionDto, ResearchNode*, OperationResult, IClientEvent...) + +Because it's cleartext, NO MITM/cert is needed for this channel — just capture port 80. + +Usage: + venv/bin/python reverse/ws_scrape.py [--port 80] [--host hologryph] + [--out extracted/master_ws.json] +""" +import sys, json, struct, argparse +from collections import defaultdict +from scapy.all import rdpcap, IP, IPv6, TCP, Raw + +try: + import msgpack +except Exception: + msgpack = None + +# ---- known DTO field-sets, to label decoded objects (from il2cpp/dump.cs) ---- +DTO_SIGNATURES = { + "ServerDto": {"Name", "Description", "UpTime", "Ip", "Port"}, + "WorldEndpointData": {"worldName", "address", "port", "custom"}, + "CompartmentDefinitionDto": {"EpbId", "HP", "Weight", "Properties", "CrownPrice"}, + "ResearchNodeJsonDto": {"Id", "Tier", "ResearchPrice", "RequiredNodesIds", "DependentNodesIds"}, + "ResearchTreeJsonDto": {"Roots", "Nodes"}, + "ItemDto": {"DefinitionName", "SellPrice", "Outfitable", "IsLarge"}, + "ShopItemDto": {"DefinitionName", "BuyPrice", "Amount"}, + "PriceDto": {"ItemDefinition", "Amount"}, + "UserDto": {"Id", "DatabaseId"}, + "OperationResult": {"IsSucceed", "Error", "Status"}, + "LoginProcessResult": {"IsSuccess", "Error"}, +} + + +def label_obj(o, depth=0): + """Best-effort DTO name for a decoded dict (and recurse a little).""" + names = [] + if isinstance(o, dict): + keys = set(o.keys()) + for name, sig in DTO_SIGNATURES.items(): + if sig <= keys: + names.append(name) + if depth < 2: + for v in o.values(): + names += label_obj(v, depth + 1) + elif isinstance(o, list) and o and depth < 2: + names += label_obj(o[0], depth + 1) + return names + + +# ---------- minimal BSON decoder (Newtonsoft.Json.Bson wire format) ---------- +def _bson_cstring(b, i): + j = b.index(0, i) + return b[i:j].decode("utf-8", "replace"), j + 1 + + +def _bson_doc(b, i): + ln = struct.unpack_from("= 5: + try: + ln = struct.unpack_from("= 0 else 0 + b = stream + n = len(b) + cur_op = None + cur = bytearray() + while i + 2 <= n: + b0 = b[i]; b1 = b[i + 1] + fin = b0 & 0x80 + op = b0 & 0x0F + masked = b1 & 0x80 + ln = b1 & 0x7F + i += 2 + if ln == 126: + if i + 2 > n: + break + ln = struct.unpack_from(">H", b, i)[0]; i += 2 + elif ln == 127: + if i + 8 > n: + break + ln = struct.unpack_from(">Q", b, i)[0]; i += 8 + mask = b"" + if masked: + if i + 4 > n: + break + mask = b[i:i + 4]; i += 4 + if i + ln > n: + break + data = bytearray(b[i:i + ln]); i += ln + if masked: + for k in range(len(data)): + data[k] ^= mask[k & 3] + if op == 0x8: # close + break + if op in (0x9, 0xA): # ping/pong + continue + if op == 0x0: # continuation + cur += data + else: + if cur_op is not None: + pass + cur_op = op + cur = bytearray(data) + if fin and cur_op in (0x1, 0x2): + yield cur_op, bytes(cur) + cur_op = None + cur = bytearray() + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("pcap") + ap.add_argument("--port", type=int, default=80) + ap.add_argument("--host", default="hologryph", help="handshake Host substring filter ('' = any)") + ap.add_argument("--out", default=None) + args = ap.parse_args() + + pk = rdpcap(args.pcap) + messages = [] + nstreams = 0 + for key, cbytes, sbytes in reassemble(pk, args.port, args.host): + nstreams += 1 + # show the handshake request line + host + line0 = cbytes.split(b"\r\n", 1)[0].decode(errors="replace") + host = "" + for ln in cbytes[:cbytes.find(b"\r\n\r\n") + 4].split(b"\r\n"): + if ln.lower().startswith(b"host:"): + host = ln.decode(errors="replace") + print("### WS stream %s %s %s" % (key, line0, host)) + for direction, stream in (("S->C", sbytes), ("C->S", cbytes)): + for op, payload in ws_frames(stream): + enc, obj = try_decode(payload) + rec = {"stream": str(key), "dir": direction, + "opcode": ("text" if op == 1 else "binary"), + "encoding": enc, "len": len(payload)} + if obj is not None: + rec["dto"] = sorted(set(label_obj(obj))) + rec["data"] = obj + else: + rec["raw_prefix"] = payload[:48].hex() + messages.append(rec) + tag = ("[" + ",".join(rec.get("dto", [])) + "] ") if rec.get("dto") else "" + preview = json.dumps(obj, default=str)[:160] if obj is not None else rec["raw_prefix"] + print(" %s %-7s %-8s %s%s" % (direction, rec["opcode"], enc, tag, preview)) + + print("\n%d WS stream(s), %d message(s)" % (nstreams, len(messages))) + if nstreams == 0: + print("No WebSocket streams found. If the backend was online, check --port/--host, " + "or the capture may not span the master-server connection.") + if args.out: + json.dump({"_source": args.pcap, "messages": messages}, open(args.out, "w"), + indent=1, default=str, ensure_ascii=False) + print("wrote", args.out) + + +if __name__ == "__main__": + main()