master-server replay + trampler RE: protocol, hashes, footprints, map renderer

- master_scrape.py: live master-server (ger.hologryph.com) ClientMessage replay over the
  two-socket /login + /connect handshake (PlayFab ticket auth). Pulled compartment defs,
  shop prices, research tree, storage, characters, expedition -> extracted/master_*.json
- PlayFab confirmed auth-only for this title (Economy disabled); docs corrected
- trampler_hashes.py: blueprint hash algo MD5(UTF8(compact-JSON)); CompartmentsHash(#1) and
  ConnectionsHash(#3) verified & generatable from scratch
- walkerdto_to_blueprint.py: WalkerDto(expedition) -> WalkerBlueprintDto, enum int<->name,
  verified by storage->WS->storage round-trip
- render_trampler.py: per-floor map from CompartmentsDatabase cell footprints (rotation solved
  via overlap check) + doors/hatches from Connections + turret arcs + cargo C1-C8 in game order
- docs/MASTER_SERVER.md, docs/TRAMPLER.md; ghidra address-offset bug fixed (no -0x1000)
This commit is contained in:
DownloadPizza
2026-06-16 00:35:17 +02:00
parent 3df0797acc
commit fc6b270fa8
29 changed files with 61574 additions and 0 deletions

265
reverse/master_scrape.py Normal file
View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""Scrape SAND's master server by replaying its protocol (2026-06-15 build) from an external
process - no game, no BattlEye interaction (like playfab_scrape.py).
HANDSHAKE (RE'd from ghidra decompile of GameAssembly.dll + live Player.log; build 23737037):
TWO sequential WebSocket connections, both speaking ClientMessage JSON envelopes:
envelope = {"Id":<long>, "ClientMessageType":<int>, "Action":<int>, "Message":<string|null>}
ClientMessageType: Error=0, Ping=1, Action=2, Event=3 (requests use 2)
Action (ClientAction): Login=0, Connect=1, GetCharacters=2, GetStorage=20, GetNews=21,
GetCompartmentDefinitions=23, GetDatabaseGuid=26, GetDefaultWalkerBlueprints=28,
GetServers=29, GetResearchTree=32, GetShopItems=39, ...
`Message` for Login is the LoginRequest serialized as a JSON *string* (nested); for Connect it
is the bool isDebug stringified ("False"); for parameterless gets it is null.
Serialization: Newtonsoft default -> PascalCase fields, enums as integers, sent as TEXT frames.
Replies: a ClientMessage with the same Id; its `Message` is JSON of OperationResult<T> =
{"IsSucceed":bool,"Error":string|null,"Status":int|null,"Result":<T>}. Correlate by Id.
Step 1 /login (no auth header): connect wss://<region>.hologryph.com/gameclient/login,
send ClientMessage{Action=Login(0), Message=JSON(LoginRequest{SessionTicket=<PlayFab>,
Title=56693, Platform=STEAM(1), ClientVersion=<git hash>})}; reply Result.SessionTicket
is the SERVER session ticket. Close.
Step 2 /connect (persistent): connect wss://<region>.hologryph.com/gameclient/connect WITH
HTTP upgrade header Authorization: <server SessionTicket> ; send Connect(Action=1,
Message="False"); then all data ops over this same socket.
SAFETY: does nothing over the network without --go. Token from reverse/.secrets/playfab_token.json.
Usage:
venv/bin/python reverse/master_scrape.py --selftest # offline
venv/bin/python reverse/master_scrape.py # dry run (plan only)
venv/bin/python reverse/master_scrape.py --go --data # ARMED: full handshake + data ops
"""
import asyncio, json, os, argparse, ssl, sys
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SECRETS = os.path.join(ROOT, "reverse", ".secrets", "playfab_token.json")
NAMES_PATH = os.path.join(ROOT, "extracted", "item_names.json")
OUTDIR = os.path.join(ROOT, "extracted")
REGION_BASES = {r: "wss://%s.hologryph.com/gameclient/" % r
for r in ("ger", "eus", "westus", "sin", "bra", "aus", "sko", "uae")}
MT_ACTION = 2
ACTIONS = {"Login": 0, "Connect": 1, "GetCharacters": 2, "SelectCharacter": 4, "GetTrampler": 16,
"UpsertTrampler": 17, "GetStorage": 20, "GetNews": 21, "GetCompartmentDefinitions": 23,
"GetDatabaseGuid": 26, "GetDefaultWalkerBlueprints": 28, "GetServers": 29,
"GetResearchTree": 32, "GetShopItems": 39}
PLATFORM_STEAM = 1
DATA_OPS = ["GetCompartmentDefinitions", "GetShopItems", "GetResearchTree", "GetServers",
"GetDefaultWalkerBlueprints", "GetNews", "GetDatabaseGuid"]
USER_OPS = ["GetStorage", "GetCharacters"]
def load_names():
try:
d = json.load(open(NAMES_PATH))["items"]
return {k: (v.get("name") or k) for k, v in d.items()}
except Exception:
return {}
def load_token():
try:
return json.load(open(SECRETS))
except Exception as e:
return {"_error": str(e)}
def mask(s):
return (s[:6] + "..." + s[-4:]) if s and len(s) > 12 else "<none>"
def login_request(tok, version):
return {"SessionTicket": tok["SessionTicket"], "Title": tok.get("TitleId", "56693"),
"Platform": PLATFORM_STEAM, "ClientVersion": version}
class WS:
"""One ClientMessage WebSocket: connect (opt. Authorization header) -> id-correlated requests."""
def __init__(self, url, auth=None, insecure=False):
self.url = url
self.auth = auth
self.ssl = ssl.create_default_context()
if insecure:
self.ssl.check_hostname = False
self.ssl.verify_mode = ssl.CERT_NONE
self._id = 0
self._pending = {}
self.events = []
self.ws = None
async def connect(self):
import websockets
headers = {"Authorization": self.auth} if self.auth else None
self.ws = await websockets.connect(self.url, ssl=self.ssl, open_timeout=15,
max_size=None, ping_interval=None,
additional_headers=headers)
self._task = asyncio.create_task(self._recv())
async def _recv(self):
try:
async for raw in self.ws:
if isinstance(raw, bytes):
raw = raw.decode("utf-8", "replace")
try:
msg = json.loads(raw)
except Exception:
self.events.append({"_unparsed": raw[:300]}); continue
if msg.get("ClientMessageType") == 1: # Ping -> echo
try:
await self.ws.send(json.dumps(msg))
except Exception:
pass
continue
rid = msg.get("Id")
if rid in self._pending and not self._pending[rid].done():
self._pending[rid].set_result(msg)
else:
self.events.append(msg)
except Exception as e:
for f in self._pending.values():
if not f.done():
f.set_exception(e)
async def request(self, action_name, payload=None, raw_message=None, timeout=30):
self._id += 1
rid = self._id
if raw_message is not None:
message = raw_message
elif payload is not None:
message = json.dumps(payload)
else:
message = None
fut = asyncio.get_event_loop().create_future()
self._pending[rid] = fut
await self.ws.send(json.dumps({"Id": rid, "ClientMessageType": MT_ACTION,
"Action": ACTIONS[action_name], "Message": message}))
reply = await asyncio.wait_for(fut, timeout=timeout)
inner = reply.get("Message")
if not isinstance(inner, str) or not inner:
return inner
try:
return json.loads(inner)
except ValueError:
return inner # non-JSON Message (e.g. plain string / empty result)
async def close(self):
if self.ws:
await self.ws.close()
def resolve(obj, names):
if isinstance(obj, dict):
out = {}
for k, v in obj.items():
out[k] = resolve(v, names)
if k in ("DefinitionName", "ItemDefinition", "EpbId") and isinstance(v, str) and v in names:
out["_name"] = names[v]
return out
if isinstance(obj, list):
return [resolve(x, names) for x in obj]
return obj
def unwrap(msg):
"""Replies are either the DTO directly, or an OperationResult{IsSucceed,Result}. Normalize."""
if isinstance(msg, dict) and "IsSucceed" in msg and "Result" in msg:
return msg["Result"]
return msg
def save(op_name, result, names):
out = os.path.join(OUTDIR, "master_%s.json" % op_name)
json.dump({"_op": op_name, "Result": resolve(result, names)},
open(out, "w"), indent=1, ensure_ascii=False, default=str)
print(" wrote %s (%s)" % (out, len(result) if isinstance(result, list) else 1))
async def run(args):
base = REGION_BASES[args.region]
names = load_names()
tok = load_token()
version = args.client_version or tok.get("ClientVersion", "")
# --- Step 1: /login (no header) -> server SessionTicket ---
print("STEP 1 /login %slogin" % base)
lg = WS(base + "login", insecure=args.insecure)
await lg.connect()
res = await lg.request("Login", payload=login_request(tok, version)) # Message = LoginUserResultDto
await lg.close()
res = unwrap(res)
srv_ticket = res.get("SessionTicket") if isinstance(res, dict) else None
if not srv_ticket:
print(" login failed; reply:", json.dumps(res)[:300]); return
user = res.get("User", {})
print(" Login OK. User.Id=%s name=%s -> server ticket %s" %
(user.get("Id"), user.get("DisplayName"), mask(srv_ticket)))
save("Login", res, names)
# --- Step 2: /connect (Authorization header) -> persistent socket ---
print("STEP 2 /connect %sconnect (Authorization: <server ticket>)" % base)
cx = WS(base + "connect", auth=srv_ticket, insecure=args.insecure)
await cx.connect()
cop = await cx.request("Connect", raw_message="False")
print(" Connect ->", json.dumps(cop)[:160] if cop is not None else "(no body)")
ops = (DATA_OPS if args.data or not args.user else []) + (USER_OPS if args.user else [])
for name in ops:
try:
res = unwrap(await cx.request(name))
n = len(res) if isinstance(res, list) else (1 if res else 0)
print(" %-28s -> %s entr%s" % (name, n, "y" if n == 1 else "ies"))
if res is not None:
save(name, res, names)
except Exception as e:
print(" %-28s ERROR %s: %s" % (name, type(e).__name__, e))
await cx.close()
print("done. events: %d" % len(cx.events))
def print_plan(args):
base = REGION_BASES[args.region]
tok = load_token()
ver = args.client_version or tok.get("ClientVersion", "<VERSION>")
print("=== DRY RUN (no network). Pass --go to connect. ===")
print("STEP 1 connect %slogin (no auth header)" % base)
lr = login_request({"SessionTicket": "<PLAYFAB_TICKET>", "TitleId": tok.get("TitleId", "56693")}, ver)
print(" send {\"Id\":1,\"ClientMessageType\":2,\"Action\":0,\"Message\":%s}" % json.dumps(json.dumps(lr)))
print(" (real PlayFab ticket %s) -> reply Result.SessionTicket = server ticket" %
mask(tok.get("SessionTicket", "")))
print("STEP 2 connect %sconnect HEADER Authorization: <server ticket>" % base)
print(" send {\"Id\":1,\"ClientMessageType\":2,\"Action\":1,\"Message\":\"False\"}")
for i, name in enumerate(DATA_OPS, 2):
print(" then {\"Id\":%d,\"ClientMessageType\":2,\"Action\":%d,\"Message\":null} (%s)"
% (i, ACTIONS[name], name))
print("\nNothing sent.")
def selftest():
lr = login_request({"SessionTicket": "T", "TitleId": "56693"}, "abc")
assert lr == {"SessionTicket": "T", "Title": "56693", "Platform": 1, "ClientVersion": "abc"}, lr
assert ACTIONS["Login"] == 0 and ACTIONS["Connect"] == 1 and ACTIONS["GetCompartmentDefinitions"] == 23
names = {"item_coinCrown": "Crowns"}
assert resolve({"ItemDefinition": "item_coinCrown"}, names)["_name"] == "Crowns"
print("selftest OK: login payload, actions, name resolution")
def main():
ap = argparse.ArgumentParser(description="SAND master-server replay (two-socket ClientMessage)")
ap.add_argument("--region", default="ger", choices=list(REGION_BASES))
ap.add_argument("--go", action="store_true")
ap.add_argument("--data", action="store_true")
ap.add_argument("--user", action="store_true")
ap.add_argument("--client-version", default="")
ap.add_argument("--insecure", action="store_true")
ap.add_argument("--selftest", action="store_true")
args = ap.parse_args()
if args.selftest:
selftest(); return
if not args.go:
print_plan(args); return
asyncio.run(run(args))
if __name__ == "__main__":
main()

108
reverse/noise_filter.py Normal file
View File

@@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""Baseline-subtraction noise filter for SAND captures.
Idea: capture a short "noise baseline" with SAND NOT running, then capture the real
session with SAND running. Every IP/host in the baseline is pre-SAND noise; subtract it
and what's left is (almost entirely) the game's traffic.
Usage:
# 1) just list the noise in a baseline:
venv/bin/python reverse/noise_filter.py baseline.pcapng
# 2) baseline + session -> hosts unique to the session + a ready Wireshark filter:
venv/bin/python reverse/noise_filter.py baseline.pcapng session.pcapng
Outputs, for the session run:
- the new (non-noise) IPs/hosts, sorted by traffic volume
- a Wireshark *display* filter: ip.addr==X or ip.addr==Y ...
- a *capture* (BPF) filter to EXCLUDE noise next time: not (host X or host Y ...)
"""
import sys
from collections import defaultdict
from scapy.all import rdpcap, DNS, DNSQR, IP, IPv6, TCP, UDP, Raw
sys.path.insert(0, __file__.rsplit("/", 1)[0])
from capture_hosts import tls_sni # reuse the SNI parser
# hosts/IPs that are never the game, even if they appear only in the session
ALWAYS_NOISE_SUBSTR = ("anthropic.com", "datadoghq.com", "windowsupdate", "msftncsi",
"msftconnecttest", "ntp.", ".pool.ntp.org")
def scan(path):
"""Return (ip_volume, ip2host) for a pcap.
ip_volume[ip] = packet count to/from that remote ip; ip2host[ip] = best label."""
pk = rdpcap(path)
vol = defaultdict(int)
ip2host, dns = {}, {}
# learn DNS answers (qname for an ip) and SNI
for p in pk:
if p.haslayer(DNS) and p[DNS].qr == 1 and p[DNS].ancount:
try:
qn = p[DNSQR].qname.decode(errors="replace").rstrip(".")
for k in range(p[DNS].ancount):
rr = p[DNS].an[k]
if rr.type in (1, 28):
dns[str(rr.rdata)] = qn
except Exception:
pass
if p.haslayer(TCP) and p.haslayer(Raw):
s = tls_sni(bytes(p[Raw].load))
if s and (p.haslayer(IP) or p.haslayer(IPv6)):
ipl = p[IP] if p.haslayer(IP) else p[IPv6]
ip2host[ipl.dst] = s
ipl = p[IP] if p.haslayer(IP) else (p[IPv6] if p.haslayer(IPv6) else None)
if ipl is None:
continue
for ip in (ipl.src, ipl.dst):
if not is_local(ip):
vol[ip] += 1
for ip in vol:
ip2host.setdefault(ip, dns.get(ip, ""))
return vol, ip2host
def is_local(ip):
return (ip.startswith(("10.", "192.168.", "127.", "169.254.", "fe80:", "ff", "::1"))
or ip.startswith("172.") and 16 <= int(ip.split(".")[1] or 0) <= 31
or ip in ("0.0.0.0",) or ip.endswith(".255"))
def main():
if len(sys.argv) < 2:
sys.exit(__doc__)
base_vol, base_host = scan(sys.argv[1])
noise = set(base_vol)
print("=== baseline noise: %d remote IPs ===" % len(noise))
for ip, n in sorted(base_vol.items(), key=lambda x: -x[1]):
print(" %-16s %-6d %s" % (ip, n, base_host.get(ip, "")))
if len(sys.argv) < 3:
print("\n(pass a second pcap to diff a real session against this baseline)")
return
sess_vol, sess_host = scan(sys.argv[2])
# a session ip is "game" if not in baseline and not on the always-noise list
def always_noise(ip):
h = sess_host.get(ip, "")
return any(s in h for s in ALWAYS_NOISE_SUBSTR)
new = {ip: n for ip, n in sess_vol.items()
if ip not in noise and not always_noise(ip)}
print("\n=== session-only hosts (candidate SAND backends) ===")
for ip, n in sorted(new.items(), key=lambda x: -x[1]):
print(" %-16s %-6d %s" % (ip, n, sess_host.get(ip, "")))
if not new:
print(" (nothing new — either SAND made no new connections, or it reused a "
"baseline IP/CDN; widen the gap or capture longer)")
return
ips = sorted(new)
print("\n--- Wireshark DISPLAY filter (keep only SAND) ---")
print(" " + " or ".join("ip.addr==%s" % ip for ip in ips))
print("\n--- Wireshark CAPTURE filter (BPF, EXCLUDE noise next time) ---")
print(" not (" + " or ".join("host %s" % ip for ip in sorted(noise)) + ")")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
# Strictly data-derived: footprints (CompartmentsDatabase cells + blueprint rotation/origin),
# doors/hatches (blueprint Connections), single-tile guns. NO firing arcs: the facing/aim is
# NOT present in the data (no forward/aim/angle field), so it is intentionally omitted.
import json, collections, os, re
import math
from PIL import Image, ImageDraw, ImageFont
ROOT=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
bp=json.load(open(ROOT+"/extracted/host_trampler_blueprint.json"))
db={c["entityId"]:c for c in json.load(open(ROOT+"/extracted/CompartmentsDatabase.json"))["compartments"]}
def rot(x,z,deg):
deg%=360; return {0:(x,z),90:(z,-x),180:(-x,-z),270:(-z,x)}[deg]
def cat(e):
m=re.match(r"walker_comp([A-Za-z]+)_",e or ""); return m.group(1) if m else "?"
CC={"Chassis":(90,90,100),"Reactor":(255,140,0),"Engine":(210,60,45),"Weapon":(170,40,60),
"Armor":(70,110,165),"Cargo":(210,170,60),"Crew":(70,165,85),"CaptainCrew":(30,110,55),
"Deck":(205,200,185),"Corridor":(160,205,228),"Special":(150,80,190),"Crafting":(150,100,60),
"Steering":(40,165,165),"Structure":(150,150,160),"Balcony":(190,170,140)}
LET={k:k[0] for k in CC}; LET.update({"CaptainCrew":"K","Corridor":"+","Crew":"c","Crafting":"F","Steering":"T","Chassis":"#","Weapon":"W"})
parts=[bp["Chassis"]]+bp["Compartments"]
def is_turret(e): return e.startswith("walker_compWeapon_TurretSlot")
def is_ram(e): return "BattleRam" in e
floors=collections.defaultdict(dict); labels=collections.defaultdict(list); turrets=[]; cargo_n=0
for p in parts:
e=db.get(p["EpbId"]);
if not e: continue
eid=p["EpbId"]; ox,oy,oz=p["CellCoordinate"]["x"],p["CellCoordinate"]["y"],p["CellCoordinate"]["z"]; deg=int(p.get("Rotation",0)); c=cat(eid)
if is_turret(eid):
floors[oy][(ox,oz)]=(c,True); labels[oy].append((ox,oz,"W"))
base=135 if "Corner" in eid else 90 # rot0: corner=SW, straight=S (your calibration)
turrets.append((oy,ox,oz,(base-deg)%360)); continue
cells=[]
for cl in e["cells"]:
rx,rz=rot(cl["position"]["x"],cl["position"]["z"],deg); wy=oy+cl["position"]["y"]
floors[wy].setdefault((ox+rx,oz+rz),(c,bool(cl.get("volumeOccupied"))))
if cl.get("volumeOccupied") and wy==oy: cells.append((ox+rx,oz+rz))
if is_ram(eid):
for (x,z) in cells: floors[oy][(x,z)]=(c,True)
cx=sum(x for x,z in cells)/max(1,len(cells)); cz=sum(z for x,z in cells)/max(1,len(cells)); labels[oy].append((cx,cz,"R"))
elif c=="Cargo":
cargo_n+=1; labels[oy].append((ox,oz,"C%d"%cargo_n))
else:
labels[oy].append((ox,oz,LET.get(c,"?")))
doors=collections.defaultdict(list); hatches=collections.defaultdict(list)
for cn in bp["Connections"]:
g=cn["GridCoordinate"]; d=cn["ActualDirection"]; sl=cn["SlotType"]; st=cn["State"]
if sl=="DOOR" and st in ("DOOR","OPEN") and d["y"]==0: doors[g["y"]].append((g["x"],g["z"],d["x"],d["z"],st))
if sl=="HATCH" and st=="DOOR": hatches[g["y"]].append((g["x"],g["z"],d["y"]))
allc=[(x,z) for y in floors if -1<=y<=3 for (x,z) in floors[y]]
xmin=min(x for x,z in allc); xmax=max(x for x,z in allc); zmin=min(z for x,z in allc); zmax=max(z for x,z in allc)
W=xmax-xmin+1; H=zmax-zmin+1; CELL=46; PAD=14; GAP=22; TITLEH=24
order=[y for y in sorted(floors,reverse=True) if -1<=y<=3]
FN={3:"deck 3",2:"deck 2",1:"deck 1",0:"deck 0",-1:"base / hull"}
panelW=W*CELL; panelH=H*CELL; cols=len(order)
imgW=PAD+cols*(panelW+GAP); imgH=PAD+TITLEH*2+panelH+120
img=Image.new("RGBA",(imgW,imgH),(245,245,248,255)); ov=Image.new("RGBA",img.size,(0,0,0,0)); d=ImageDraw.Draw(img); od=ImageDraw.Draw(ov)
f=ImageFont.load_default(size=18); fs=ImageFont.load_default(size=12); ft=ImageFont.load_default(size=15)
d.text((PAD,4),"Host trampler - data-derived: footprints, doors/hatches, single-tile guns (W). Gun arc=rotation field (corner=45/straight=cardinal, +-90). N=up",fill=(20,20,30),font=ft)
for i,y in enumerate(order):
ox0=PAD+i*(panelW+GAP); oy0=PAD+TITLEH*2
d.text((ox0,oy0-18),FN.get(y,"y%d"%y),fill=(20,20,30),font=ft)
def px(x,z): return ox0+(x-xmin)*CELL, oy0+(z-zmin)*CELL
for zi in range(H):
for xi in range(W):
X0=ox0+xi*CELL; Y0=oy0+zi*CELL; d.rectangle([X0,Y0,X0+CELL,Y0+CELL],outline=(225,225,228))
for (x,z),(c,solid) in floors[y].items():
X0,Y0=px(x,z); col=CC.get(c,(120,120,120))
if solid: d.rectangle([X0+1,Y0+1,X0+CELL-1,Y0+CELL-1],fill=col,outline=(40,40,40))
else: d.rectangle([X0+4,Y0+4,X0+CELL-4,Y0+CELL-4],fill=tuple(min(255,v+70) for v in col),outline=col)
for (cx,cz,let) in labels[y]:
X0,Y0=px(cx,cz); d.text((X0+CELL/2-5*len(let),Y0+CELL/2-9),let,fill=(255,255,255),font=f)
for (x,z,dx,dz,st) in doors.get(y,[]):
X0,Y0=px(x,z); col=(60,200,90) if st=="OPEN" else (130,80,30)
if dx==1: d.line([X0+CELL,Y0+8,X0+CELL,Y0+CELL-8],fill=col,width=5)
if dx==-1: d.line([X0,Y0+8,X0,Y0+CELL-8],fill=col,width=5)
if dz==1: d.line([X0+8,Y0+CELL,X0+CELL-8,Y0+CELL],fill=col,width=5)
if dz==-1: d.line([X0+8,Y0,X0+CELL-8,Y0],fill=col,width=5)
for (x,z,dy) in hatches.get(y,[]):
X0,Y0=px(x,z); cx,cy=X0+CELL/2,Y0+CELL/2; d.ellipse([cx-8,cy-8,cx+8,cy+8],outline=(20,20,140),width=2)
d.text((cx-3,cy-7),"^" if dy==1 else "v",fill=(20,20,140),font=fs)
for (ty,tx,tz,face) in turrets:
if ty!=y: continue
X0,Y0=px(tx,tz); cx,cy=X0+CELL/2,Y0+CELL/2; R=CELL*0.75
od.pieslice([cx-R,cy-R,cx+R,cy+R],face-90,face+90,fill=(255,210,40,75),outline=(230,160,0,200))
img=Image.alpha_composite(img,ov); d=ImageDraw.Draw(img)
ly=PAD+TITLEH*2+panelH+10; d.text((PAD,ly-2),"LEGEND:",fill=(20,20,30),font=ft); ly+=18; lx=PAD
for k in CC:
d.rectangle([lx,ly,lx+20,ly+20],fill=CC[k],outline=(30,30,30)); d.text((lx+4,ly+4),LET.get(k,"?"),fill=(255,255,255),font=fs)
d.text((lx+26,ly+4),k,fill=(20,20,30),font=fs); lx+=26+len(k)*7+18
if lx>imgW-150: lx=PAD; ly+=26
ly+=8
d.line([PAD,ly,PAD+40,ly],fill=(130,80,30),width=5); d.text((PAD+48,ly-7),"door",fill=(20,20,30),font=fs)
d.line([PAD+100,ly,PAD+140,ly],fill=(60,200,90),width=5); d.text((PAD+148,ly-7),"open door",fill=(20,20,30),font=fs)
d.ellipse([PAD+230,ly-8,PAD+246,ly+8],outline=(20,20,140),width=2); d.text((PAD+254,ly-7),"hatch (^/v) W=gun R=ram yellow wedge=firing arc (+-90) faded=non-solid",fill=(20,20,30),font=fs)
img.convert("RGB").save(ROOT+"/extracted/host_trampler_nice.png"); print("saved with rotation arcs, turrets=%d"%len(turrets))

66
reverse/resolve_decomp.py Normal file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""Annotate ghidra/decomp.c: resolve absolute VAs (0x180........) to IL2CPP symbol names
(methods.tsv) and string literals (strings.tsv). Ghidra image base = 0x180000000.
methods.tsv : <rva-decimal>\t<symbol> (rva = scriptAddress - 0x1000)
strings.tsv : <scriptAddress-decimal>\t<str> (raw ScriptString.Address)
Usage:
venv/bin/python reverse/resolve_decomp.py # writes ghidra/decomp.annotated.c
venv/bin/python reverse/resolve_decomp.py <substr> # print only funcs whose name matches substr
"""
import re, sys, os
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
G = os.path.join(ROOT, "ghidra")
BASE = 0x180000000
def load_methods():
m = {}
for l in open(os.path.join(G, "methods.tsv")):
if "\t" in l:
rva, name = l.rstrip("\n").split("\t", 1)
m[int(rva)] = name
return m
def load_strings():
s = {}
p = os.path.join(G, "strings.tsv")
if os.path.exists(p):
for l in open(p):
if "\t" in l:
a, v = l.rstrip("\n").split("\t", 1)
s[int(a)] = v
return s
def main():
methods = load_methods()
strings = load_strings()
src = open(os.path.join(G, "decomp.c"), encoding="utf-8", errors="replace").read()
hexre = re.compile(r"0x(1[0-9a-fA-F]{8,9})")
def repl(m):
va = int(m.group(1), 16)
rva = va - BASE
if rva in methods:
return "%s/*%s*/" % (m.group(0), methods[rva])
# string literal: try raw rva, rva+0x1000 (scriptAddress), and the table conventions
for cand in (rva, rva + 0x1000, va, va - BASE + 0x1000):
if cand in strings:
return '%s/*"%s"*/' % (m.group(0), strings[cand][:60])
return m.group(0)
out = hexre.sub(repl, src)
outp = os.path.join(G, "decomp.annotated.c")
open(outp, "w", encoding="utf-8").write(out)
nres = out.count("/*")
print("wrote %s (%d annotations)" % (outp, nres))
if len(sys.argv) > 1:
sub = sys.argv[1]
blocks = re.split(r"(?=// ==== )", out)
for b in blocks:
if sub.lower() in b[:200].lower():
print(b)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
"""Generate the SAND trampler/walker blueprint hashes FROM SCRATCH (no harvesting needed).
RE'd from GameAssembly.dll (2026-06-15 build):
MD5Utility.ComputeHash(obj) = MD5( UTF8( JsonConvert.SerializeObject(obj) ) ), formatted as
uppercase hex with no separators (each byte -> ToString("X2")). [ghidra: MD5Utility$$ObjectToByteArray,
$$ComputeHash]
The three top-level WalkerBlueprintDto hashes are MD5 of the *compact* JSON (Newtonsoft default:
no whitespace, PascalCase keys in C# declaration order, nulls included, enums as STRING names,
CellCoordinate as {x,y,z}) of:
CompartmentsHash = ComputeHash(blueprint.Compartments) [List<CompartmentBlueprintDto>] VERIFIED
ConnectionsHash = ComputeHash(blueprint.Connections) [List<ConnectionBlueprintDto>] VERIFIED <- "hash #3"
DefinitionsHash = ComputeHash(<the CompartmentDefinitionDto list for the compartments>) PROVISIONAL
(hashes the actual definitions, not blueprint-internal fields; verify live
against a fresh GetTrampler + GetCompartmentDefinitions once unfrozen.)
To match Newtonsoft byte-for-byte, build the dicts with keys already in declaration order and
serialize with json.dumps(obj, separators=(",",":"), ensure_ascii=False). Do NOT sort keys.
"""
import json, hashlib
def md5_hash(obj) -> str:
"""MD5Utility.ComputeHash(obj): MD5 of compact-JSON(obj), uppercase hex."""
blob = json.dumps(obj, separators=(",", ":"), ensure_ascii=False)
return hashlib.md5(blob.encode("utf-8")).hexdigest().upper()
def compartments_hash(compartments: list) -> str:
"""#1 CompartmentsHash = MD5(JSON(Compartments list)). VERIFIED."""
return md5_hash(compartments)
def connections_hash(connections: list) -> str:
"""#3 ConnectionsHash = MD5(JSON(Connections list)). VERIFIED.
Each connection must serialize as (declaration order):
{"Id": int, "EpbId": str|None, "ActualDirection": {"x":,"y":,"z":},
"GridCoordinate": {"x":,"y":,"z":}, "SlotType": "<name>", "State": "<name>"}
"""
return md5_hash(connections)
def definitions_hash(definitions: list) -> str:
"""#2 DefinitionsHash = MD5(JSON(<CompartmentDefinitionDto list>)). PROVISIONAL — verify live."""
return md5_hash(definitions)
def apply_hashes(blueprint: dict, definitions: list = None) -> dict:
"""Set the three top-level hashes on a blueprint dict in place. Returns it.
definitions: ordered CompartmentDefinitionDto list for #2 (optional until verified)."""
blueprint["CompartmentsHash"] = compartments_hash(blueprint["Compartments"])
blueprint["ConnectionsHash"] = connections_hash(blueprint["Connections"])
if definitions is not None:
blueprint["DefinitionsHash"] = definitions_hash(definitions)
return blueprint
def _selftest():
import os
p = os.path.join(os.path.dirname(__file__), "..", "extracted",
"playfab_titledata_TramplerBlueprint3.json")
b = json.load(open(p))
c1 = compartments_hash(b["Compartments"])
c3 = connections_hash(b["Connections"])
assert c1 == b["CompartmentsHash"], "CompartmentsHash %s != %s" % (c1, b["CompartmentsHash"])
assert c3 == b["ConnectionsHash"], "ConnectionsHash %s != %s" % (c3, b["ConnectionsHash"])
print("selftest OK:")
print(" CompartmentsHash #1:", c1, "== stored")
print(" ConnectionsHash #3:", c3, "== stored <- generatable from scratch")
print(" DefinitionsHash #2: provisional (verify live vs current GetTrampler).")
if __name__ == "__main__":
_selftest()

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""Convert a master-server WalkerDto (e.g. ExpeditionDto.Trampler, from GetExpedition /
GetExpeditionWalker) into a loadable WalkerBlueprintDto, recomputing the 3 top-level hashes.
WHY a transform is needed: the WS channel serializes enums as INTEGERS and omits null EpbId on
connections; the blueprint/hash ("storage") form uses enum NAME strings and includes EpbId:null.
ObjectToByteArray hashes the storage form (verified in trampler_hashes.py). So WS-form -> storage-form
before hashing.
Enum maps (from il2cpp/dump.cs):
ConnectionSlotType: DOOR=0 HATCH=1 STRUCTURE=2 BALCONY=3 DECK=4
ConnectionState : DEFAULT=0 DOOR=1 OPEN=2 (MasterserverDtos.ConnectionState, 35406)
ConnectionsCount : FULL=0 PARTIAL=1 ERROR=2
Self-verifies offline via round-trip on the known sample (storage->WS->storage is identity and
reproduces the stored CompartmentsHash/ConnectionsHash).
"""
import json, os, sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from trampler_hashes import compartments_hash, connections_hash, definitions_hash
SLOT = ["DOOR", "HATCH", "STRUCTURE", "BALCONY", "DECK"] # ConnectionSlotType
STATE = ["DEFAULT", "DOOR", "OPEN"] # ConnectionState
COUNT = ["FULL", "PARTIAL", "ERROR"] # ConnectionsCount
SLOT_I = {n: i for i, n in enumerate(SLOT)}
STATE_I = {n: i for i, n in enumerate(STATE)}
COUNT_I = {n: i for i, n in enumerate(COUNT)}
def _od(*pairs):
"""build a dict in explicit (declaration) order."""
d = {}
for k, v in pairs:
d[k] = v
return d
def _decor_i2s(decor):
"""DecorationsInfo: socket enums int->name (storage form)."""
if decor is None:
return None
socks = []
for s in decor.get("Sockets", []):
vals = []
for kv in s.get("Value", []):
v = kv.get("Value") or {}
vals.append(_od(("Key", SLOT[kv["Key"]]),
("Value", _od(("state", STATE[v["state"]]), ("count", COUNT[v["count"]])))))
socks.append(_od(("Key", s["Key"]), ("Value", vals)))
return _od(("Sockets", socks))
def _decor_s2i(decor):
"""inverse, for round-trip self-test only."""
if decor is None:
return None
socks = []
for s in decor.get("Sockets", []):
vals = []
for kv in s.get("Value", []):
v = kv.get("Value") or {}
vals.append(_od(("Key", SLOT_I[kv["Key"]]),
("Value", _od(("state", STATE_I[v["state"]]), ("count", COUNT_I[v["count"]])))))
socks.append(_od(("Key", s["Key"]), ("Value", vals)))
return _od(("Sockets", socks))
def comp_i2s(bp):
"""one CompartmentBlueprintDto WS->storage (CompartmentBlueprintDto field order)."""
return _od(("Id", bp["Id"]), ("EpbId", bp.get("EpbId")), ("CellCoordinate", bp["CellCoordinate"]),
("DecorationsInfo", _decor_i2s(bp.get("DecorationsInfo"))), ("Rotation", bp["Rotation"]),
("CompartmentHash", bp.get("CompartmentHash")), ("DefinitionHash", bp.get("DefinitionHash")))
def comp_s2i(bp):
return _od(("Id", bp["Id"]), ("EpbId", bp.get("EpbId")), ("CellCoordinate", bp["CellCoordinate"]),
("DecorationsInfo", _decor_s2i(bp.get("DecorationsInfo"))), ("Rotation", bp["Rotation"]),
("CompartmentHash", bp.get("CompartmentHash")), ("DefinitionHash", bp.get("DefinitionHash")))
def conn_i2s(bp):
"""one ConnectionBlueprintDto WS->storage: add EpbId:null, enums int->name (declaration order)."""
return _od(("Id", bp["Id"]), ("EpbId", bp.get("EpbId")), ("ActualDirection", bp["ActualDirection"]),
("GridCoordinate", bp["GridCoordinate"]), ("SlotType", SLOT[bp["SlotType"]]),
("State", STATE[bp["State"]]))
def conn_s2i(c):
"""inverse: drop EpbId, enums name->int (round-trip self-test only)."""
return _od(("Id", c["Id"]), ("ActualDirection", c["ActualDirection"]),
("GridCoordinate", c["GridCoordinate"]), ("SlotType", SLOT_I[c["SlotType"]]),
("State", STATE_I[c["State"]]))
def walkerdto_to_blueprint(walker, definitions=None, version=1):
"""ExpeditionDto.Trampler / WalkerDto -> WalkerBlueprintDto with recomputed hashes."""
chassis = comp_i2s(walker["Chassis"]["Blueprint"])
comps = [comp_i2s(c["Blueprint"]) for c in walker["Compartments"]]
conns = [conn_i2s(c["Blueprint"]) for c in walker["Connections"]]
bp = _od(("Id", walker.get("BlueprintId")), ("UniqueId", walker.get("BlueprintUniqueId")),
("Version", version), ("Chassis", chassis), ("Compartments", comps), ("Connections", conns),
("CompartmentsHash", compartments_hash(comps)), ("DefinitionsHash", ""),
("ConnectionsHash", connections_hash(conns)))
if definitions is not None:
bp["DefinitionsHash"] = definitions_hash(definitions)
return bp
def _verify_roundtrip():
"""storage -> WS -> storage must be identity (and reproduce stored hashes) on the sample."""
p = os.path.join(os.path.dirname(__file__), "..", "extracted", "playfab_titledata_TramplerBlueprint3.json")
s = json.load(open(p))
comps_ws = [comp_s2i(c) for c in s["Compartments"]]
conns_ws = [conn_s2i(c) for c in s["Connections"]]
comps_back = [comp_i2s(c) for c in comps_ws]
conns_back = [conn_i2s(c) for c in conns_ws]
cj = lambda o: json.dumps(o, separators=(",", ":"))
assert cj(comps_back) == cj(s["Compartments"]), "compartment round-trip not identity"
assert cj(conns_back) == cj(s["Connections"]), "connection round-trip not identity"
assert compartments_hash(comps_back) == s["CompartmentsHash"], "CompartmentsHash mismatch"
assert connections_hash(conns_back) == s["ConnectionsHash"], "ConnectionsHash mismatch"
print("round-trip OK: storage->WS->storage is byte-identical; #1 & #3 hashes reproduce.")
if __name__ == "__main__":
_verify_roundtrip()
exp = os.path.join(os.path.dirname(__file__), "..", "extracted", "master_GetExpedition.json")
if os.path.exists(exp):
tr = json.load(open(exp)).get("Trampler")
if tr:
bp = walkerdto_to_blueprint(tr)
out = os.path.join(os.path.dirname(__file__), "..", "extracted", "host_trampler_blueprint.json")
json.dump(bp, open(out, "w"), indent=1, ensure_ascii=False)
print("converted host trampler -> %s" % out)
print(" chassis=%s comps=%d conns=%d" % (bp["Chassis"]["EpbId"], len(bp["Compartments"]), len(bp["Connections"])))
print(" CompartmentsHash=%s" % bp["CompartmentsHash"])
print(" ConnectionsHash =%s" % bp["ConnectionsHash"])
print(" DefinitionsHash =%s (provisional - needs live verify)" % (bp["DefinitionsHash"] or "<empty>"))