Built and unit-tested ahead of a live playtest window: - reverse/capture_hosts.py: pcap -> DNS/SNI/endpoints in order; extracts PlayFab TitleId, flags hologryph master-server region + config CDN. - reverse/ws_scrape.py: TCP reassembly + RFC-6455 framing for the cleartext ws://<region>. hologryph.com/gameclient/ stream; decodes JSON/BSON/MessagePack; auto-labels ServerDto, CompartmentDefinitionDto, ResearchNodeJsonDto, OperationResult, etc. No MITM needed. - reverse/playfab_scrape.py: LoginWithSteam (or captured EntityToken) -> Catalog/SearchItems (+ Inventory/TitleData); prices resolved to item names. Read-only. - docs/SCRAPE_RUNBOOK.md: turnkey steps for when servers are online.
271 lines
10 KiB
Python
271 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Decode the SAND master-server WebSocket stream out of a packet capture.
|
|
|
|
The master server is a .NET `ClientWebSocket` to `ws://<region>.hologryph.com/gameclient/`
|
|
(port 80, **cleartext** — confirmed from the IL2CPP metadata: only `ws://` literals, no `wss`).
|
|
Messages are request/response `OperationResult<T>` plus server-push `IClientEvent`s, serialized
|
|
by the game's `IDataSerializer` (JsonDataSerializer / BsonDataSerializer — JSON is the likely
|
|
default; this decoder tries JSON, BSON and MessagePack so the exact encoding doesn't block us).
|
|
|
|
What it does:
|
|
1. groups packets into TCP streams, reassembles each direction by sequence number
|
|
2. finds the WebSocket HTTP upgrade (`GET /gameclient/ ... Upgrade: websocket` / `101`)
|
|
3. parses RFC-6455 frames (handles masking + continuation), per direction
|
|
4. decodes each message payload (JSON -> BSON -> MessagePack -> hex) and prints it
|
|
5. tags messages whose shape matches a known DTO (ServerDto, RegionInfo,
|
|
CompartmentDefinitionDto, ResearchNode*, OperationResult, IClientEvent...)
|
|
|
|
Because it's cleartext, NO MITM/cert is needed for this channel — just capture port 80.
|
|
|
|
Usage:
|
|
venv/bin/python reverse/ws_scrape.py <capture.pcapng> [--port 80] [--host hologryph]
|
|
[--out extracted/master_ws.json]
|
|
"""
|
|
import sys, json, struct, argparse
|
|
from collections import defaultdict
|
|
from scapy.all import rdpcap, IP, IPv6, TCP, Raw
|
|
|
|
try:
|
|
import msgpack
|
|
except Exception:
|
|
msgpack = None
|
|
|
|
# ---- known DTO field-sets, to label decoded objects (from il2cpp/dump.cs) ----
|
|
DTO_SIGNATURES = {
|
|
"ServerDto": {"Name", "Description", "UpTime", "Ip", "Port"},
|
|
"WorldEndpointData": {"worldName", "address", "port", "custom"},
|
|
"CompartmentDefinitionDto": {"EpbId", "HP", "Weight", "Properties", "CrownPrice"},
|
|
"ResearchNodeJsonDto": {"Id", "Tier", "ResearchPrice", "RequiredNodesIds", "DependentNodesIds"},
|
|
"ResearchTreeJsonDto": {"Roots", "Nodes"},
|
|
"ItemDto": {"DefinitionName", "SellPrice", "Outfitable", "IsLarge"},
|
|
"ShopItemDto": {"DefinitionName", "BuyPrice", "Amount"},
|
|
"PriceDto": {"ItemDefinition", "Amount"},
|
|
"UserDto": {"Id", "DatabaseId"},
|
|
"OperationResult": {"IsSucceed", "Error", "Status"},
|
|
"LoginProcessResult": {"IsSuccess", "Error"},
|
|
}
|
|
|
|
|
|
def label_obj(o, depth=0):
|
|
"""Best-effort DTO name for a decoded dict (and recurse a little)."""
|
|
names = []
|
|
if isinstance(o, dict):
|
|
keys = set(o.keys())
|
|
for name, sig in DTO_SIGNATURES.items():
|
|
if sig <= keys:
|
|
names.append(name)
|
|
if depth < 2:
|
|
for v in o.values():
|
|
names += label_obj(v, depth + 1)
|
|
elif isinstance(o, list) and o and depth < 2:
|
|
names += label_obj(o[0], depth + 1)
|
|
return names
|
|
|
|
|
|
# ---------- minimal BSON decoder (Newtonsoft.Json.Bson wire format) ----------
|
|
def _bson_cstring(b, i):
|
|
j = b.index(0, i)
|
|
return b[i:j].decode("utf-8", "replace"), j + 1
|
|
|
|
|
|
def _bson_doc(b, i):
|
|
ln = struct.unpack_from("<i", b, i)[0]
|
|
end = i + ln
|
|
i += 4
|
|
out = {}
|
|
while i < end - 1:
|
|
t = b[i]; i += 1
|
|
name, i = _bson_cstring(b, i)
|
|
if t == 0x01: # double
|
|
out[name] = struct.unpack_from("<d", b, i)[0]; i += 8
|
|
elif t == 0x02: # string
|
|
sl = struct.unpack_from("<i", b, i)[0]; i += 4
|
|
out[name] = b[i:i + sl - 1].decode("utf-8", "replace"); i += sl
|
|
elif t == 0x03: # embedded doc
|
|
out[name], i = _bson_doc(b, i)
|
|
elif t == 0x04: # array
|
|
d, i = _bson_doc(b, i)
|
|
out[name] = [d[k] for k in sorted(d, key=lambda x: int(x) if x.isdigit() else 0)]
|
|
elif t == 0x08: # bool
|
|
out[name] = b[i] != 0; i += 1
|
|
elif t == 0x0A: # null
|
|
out[name] = None
|
|
elif t == 0x10: # int32
|
|
out[name] = struct.unpack_from("<i", b, i)[0]; i += 4
|
|
elif t == 0x12: # int64
|
|
out[name] = struct.unpack_from("<q", b, i)[0]; i += 8
|
|
else:
|
|
raise ValueError("bson type 0x%02x" % t)
|
|
return out, end
|
|
|
|
|
|
def try_decode(payload):
|
|
"""Return (encoding, obj) or (None, None)."""
|
|
if not payload:
|
|
return None, None
|
|
# JSON (text frames / utf8)
|
|
s = payload.lstrip()
|
|
if s[:1] in (b"{", b"[", b'"') or s[:1].isdigit():
|
|
try:
|
|
return "json", json.loads(payload.decode("utf-8"))
|
|
except Exception:
|
|
pass
|
|
# BSON (starts with int32 length == len(payload))
|
|
if len(payload) >= 5:
|
|
try:
|
|
ln = struct.unpack_from("<i", payload, 0)[0]
|
|
if ln == len(payload):
|
|
obj, _ = _bson_doc(payload, 0)
|
|
return "bson", obj
|
|
except Exception:
|
|
pass
|
|
# MessagePack
|
|
if msgpack is not None:
|
|
try:
|
|
return "msgpack", msgpack.unpackb(payload, raw=False, strict_map_key=False)
|
|
except Exception:
|
|
pass
|
|
return None, None
|
|
|
|
|
|
# ---------------------- TCP reassembly + WS framing ----------------------
|
|
def reassemble(packets, port, host_filter):
|
|
"""Yield (stream_key, client_bytes, server_bytes) for TCP streams on `port`
|
|
whose handshake Host matches host_filter (substring, case-insensitive)."""
|
|
flows = defaultdict(lambda: {"a": {}, "b": {}, "ips": None})
|
|
for p in packets:
|
|
if not p.haslayer(TCP):
|
|
continue
|
|
ipl = p[IP] if p.haslayer(IP) else (p[IPv6] if p.haslayer(IPv6) else None)
|
|
if ipl is None:
|
|
continue
|
|
t = p[TCP]
|
|
if t.dport != port and t.sport != port:
|
|
continue
|
|
a = (ipl.src, t.sport)
|
|
b = (ipl.dst, t.dport)
|
|
key = tuple(sorted([a, b]))
|
|
f = flows[key]
|
|
f["ips"] = key
|
|
side = "a" if (a == key[0]) else "b"
|
|
if p.haslayer(Raw):
|
|
f[side][t.seq] = bytes(p[Raw].load)
|
|
|
|
for key, f in flows.items():
|
|
# the side whose endpoint port == `port` is the server
|
|
(e0, e1) = key
|
|
server_side = "a" if e0[1] == port else "b"
|
|
client_side = "b" if server_side == "a" else "a"
|
|
cbytes = b"".join(f[client_side][s] for s in sorted(f[client_side]))
|
|
sbytes = b"".join(f[server_side][s] for s in sorted(f[server_side]))
|
|
# require a WS handshake, optionally host-filtered
|
|
if b"Upgrade: websocket" not in cbytes and b"upgrade: websocket" not in cbytes:
|
|
continue
|
|
if host_filter:
|
|
hdr = cbytes[:cbytes.find(b"\r\n\r\n") + 4].lower()
|
|
if host_filter.lower().encode() not in hdr:
|
|
continue
|
|
yield key, cbytes, sbytes
|
|
|
|
|
|
def ws_frames(stream):
|
|
"""Parse RFC-6455 frames from a byte stream that starts AFTER the HTTP handshake.
|
|
Yields (opcode, payload_bytes). Reassembles continuation frames."""
|
|
i = stream.find(b"\r\n\r\n")
|
|
i = i + 4 if i >= 0 else 0
|
|
b = stream
|
|
n = len(b)
|
|
cur_op = None
|
|
cur = bytearray()
|
|
while i + 2 <= n:
|
|
b0 = b[i]; b1 = b[i + 1]
|
|
fin = b0 & 0x80
|
|
op = b0 & 0x0F
|
|
masked = b1 & 0x80
|
|
ln = b1 & 0x7F
|
|
i += 2
|
|
if ln == 126:
|
|
if i + 2 > n:
|
|
break
|
|
ln = struct.unpack_from(">H", b, i)[0]; i += 2
|
|
elif ln == 127:
|
|
if i + 8 > n:
|
|
break
|
|
ln = struct.unpack_from(">Q", b, i)[0]; i += 8
|
|
mask = b""
|
|
if masked:
|
|
if i + 4 > n:
|
|
break
|
|
mask = b[i:i + 4]; i += 4
|
|
if i + ln > n:
|
|
break
|
|
data = bytearray(b[i:i + ln]); i += ln
|
|
if masked:
|
|
for k in range(len(data)):
|
|
data[k] ^= mask[k & 3]
|
|
if op == 0x8: # close
|
|
break
|
|
if op in (0x9, 0xA): # ping/pong
|
|
continue
|
|
if op == 0x0: # continuation
|
|
cur += data
|
|
else:
|
|
if cur_op is not None:
|
|
pass
|
|
cur_op = op
|
|
cur = bytearray(data)
|
|
if fin and cur_op in (0x1, 0x2):
|
|
yield cur_op, bytes(cur)
|
|
cur_op = None
|
|
cur = bytearray()
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("pcap")
|
|
ap.add_argument("--port", type=int, default=80)
|
|
ap.add_argument("--host", default="hologryph", help="handshake Host substring filter ('' = any)")
|
|
ap.add_argument("--out", default=None)
|
|
args = ap.parse_args()
|
|
|
|
pk = rdpcap(args.pcap)
|
|
messages = []
|
|
nstreams = 0
|
|
for key, cbytes, sbytes in reassemble(pk, args.port, args.host):
|
|
nstreams += 1
|
|
# show the handshake request line + host
|
|
line0 = cbytes.split(b"\r\n", 1)[0].decode(errors="replace")
|
|
host = ""
|
|
for ln in cbytes[:cbytes.find(b"\r\n\r\n") + 4].split(b"\r\n"):
|
|
if ln.lower().startswith(b"host:"):
|
|
host = ln.decode(errors="replace")
|
|
print("### WS stream %s %s %s" % (key, line0, host))
|
|
for direction, stream in (("S->C", sbytes), ("C->S", cbytes)):
|
|
for op, payload in ws_frames(stream):
|
|
enc, obj = try_decode(payload)
|
|
rec = {"stream": str(key), "dir": direction,
|
|
"opcode": ("text" if op == 1 else "binary"),
|
|
"encoding": enc, "len": len(payload)}
|
|
if obj is not None:
|
|
rec["dto"] = sorted(set(label_obj(obj)))
|
|
rec["data"] = obj
|
|
else:
|
|
rec["raw_prefix"] = payload[:48].hex()
|
|
messages.append(rec)
|
|
tag = ("[" + ",".join(rec.get("dto", [])) + "] ") if rec.get("dto") else ""
|
|
preview = json.dumps(obj, default=str)[:160] if obj is not None else rec["raw_prefix"]
|
|
print(" %s %-7s %-8s %s%s" % (direction, rec["opcode"], enc, tag, preview))
|
|
|
|
print("\n%d WS stream(s), %d message(s)" % (nstreams, len(messages)))
|
|
if nstreams == 0:
|
|
print("No WebSocket streams found. If the backend was online, check --port/--host, "
|
|
"or the capture may not span the master-server connection.")
|
|
if args.out:
|
|
json.dump({"_source": args.pcap, "messages": messages}, open(args.out, "w"),
|
|
indent=1, default=str, ensure_ascii=False)
|
|
print("wrote", args.out)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|