refactor: group scripts into walker/ wikigen/ bundle/

Organize the 16 loose scripts by concern:
  walker/  -- .wbt save tooling (sand, build_wbt, walker_hashes,
              harvest_hashes, recover_key)
  wikigen/ -- MediaWiki page generators (make_*_wiki, render_wiki)
  bundle/  -- Unity/Odin asset extraction (unitybundle, odin_read,
              extract_*, loot_probe, dump_loot_bytes)

The only cross-script imports (build_wbt->walker_hashes,
extract_loot->odin_read) live within the same folder, so each
script's dir on sys.path[0] keeps them resolving with no code
changes. All data paths are absolute, so the moves don't affect
I/O. Named the code dir wikigen/ to avoid colliding with the
generated wiki/ output dir; ignore the regenerable wiki_site/ render.
This commit is contained in:
DownloadPizza
2026-06-11 14:49:33 +02:00
parent 2e886f31f0
commit a44e4db1c3
17 changed files with 3 additions and 0 deletions

160
walker/build_wbt.py Normal file
View File

@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""Build / edit SAND .wbt walker saves offline.
Pipeline (both directions verified byte-exact on all 5 local walkers):
load: gunzip -> XOR -> Newtonsoft-BSON decode
save: BSON encode -> XOR -> gzip
pymongo's bson.encode reproduces Newtonsoft.Bson byte-for-byte for these docs,
so a decoded->encoded round-trip is identity.
What "arbitrary" means here:
* You can freely EDIT a loaded walker: rename, move/rotate/add/remove parts and
connections, swap texture, etc. pack() recomputes the two OFFLINE hashes
(CompartmentsHash, ConnectionsHash) so the file stays self-consistent.
* DefinitionsHash (aggregate) and per-part DefinitionHash are SERVER-SOURCED.
pack() copies DefinitionsHash from the source doc and reuses each part's
DefinitionHash. That is correct as long as the *set of part definitions*
doesn't change. If you introduce a part whose EpbId we've never harvested a
DefinitionHash for, pack() raises -- we can't invent that value offline.
Known DefinitionHashes live in extracted/definition_hashes_known.json
(currently 18/126 parts; build a part in-game once to harvest more).
CLI:
build_wbt.py repack <wbt> sanity: load->pack->reload == identity
build_wbt.py rename <wbt> <first> <second> [-o out.wbt] set name indices (0-31)
build_wbt.py pack <wbt> -o out.wbt recompute hashes, write a fresh .wbt
build_wbt.py get-icon <wbt> [-o out.png] extract the walker icon to a PNG (upright)
build_wbt.py set-icon <wbt> <png> [-o out] paint a PNG as the icon (auto-resize+flip)
"""
import sys, gzip, json, os, argparse, pathlib
from bson import decode, encode
import walker_hashes as wh
KEY = bytes.fromhex('70dd1f2a0b4a'); CHUNK = 0xA000
KNOWN = pathlib.Path('/home/downloadpizza/sand_tools'
'/extracted/definition_hashes_known.json')
def _xor(b: bytes) -> bytes:
return bytes(b[i] ^ KEY[(i % CHUNK) % 6] for i in range(len(b)))
def load(path) -> dict:
"""gunzip -> XOR -> BSON -> dict (full container, not just doc['walker'])."""
return decode(_xor(gzip.decompress(pathlib.Path(path).read_bytes())))
def _known_defhashes() -> dict:
if not KNOWN.exists():
return {}
d = json.load(open(KNOWN))
out = {}
for epb, e in d['parts'].items():
if e.get('DefinitionHash'):
out[epb] = e['DefinitionHash']
return out
def pack(doc: dict, *, recompute=True, strict=True) -> bytes:
"""Recompute offline hashes on doc['walker'] and return loadable .wbt bytes.
recompute: refresh CompartmentsHash/ConnectionsHash from the parts/connections.
strict: error if any placed part lacks a known DefinitionHash (i.e. its
server-sourced value isn't recorded), since the file would carry a
DefinitionHash the server can't validate.
"""
w = doc['walker']
if strict:
known = _known_defhashes()
missing = []
for c in [w['Chassis']] + w['Compartments']:
epb = c['EpbId']
have = c.get('DefinitionHash')
if not have and epb not in known:
missing.append(epb)
if missing:
raise ValueError(
"no known DefinitionHash for EpbId(s): " + ", ".join(sorted(set(missing)))
+ "\n -> build that part in-game once and re-run harvest_hashes.py, "
"or pass strict=False to bypass.")
# backfill any part missing its per-part DefinitionHash from the known table
for c in [w['Chassis']] + w['Compartments']:
if not c.get('DefinitionHash') and c['EpbId'] in known:
c['DefinitionHash'] = known[c['EpbId']]
if recompute:
w['CompartmentsHash'] = wh.compartments_hash(w['Compartments'])
w['ConnectionsHash'] = wh.connections_hash(w['Connections'])
# DefinitionsHash (aggregate over CompartmentDefinitionDto) is server-sourced;
# left as-is -- valid while the definition set is unchanged.
return gzip.compress(_xor(encode(doc)))
def save(doc: dict, path, **kw):
pathlib.Path(path).write_bytes(pack(doc, **kw))
# ---- CLI ----
def cmd_repack(a):
doc = load(a.input)
blob = pack(doc, strict=False)
back = load_bytes(blob)
same = back == doc
print(f"load->pack->reload identity: {'OK' if same else 'MISMATCH'}")
if not same:
for k in doc:
if doc[k] != back.get(k):
print(" differs:", k)
def load_bytes(blob: bytes) -> dict:
return decode(_xor(gzip.decompress(blob)))
def cmd_rename(a):
doc = load(a.input)
doc['firstNameIndex'] = a.first
doc['secondNameIndex'] = a.second
out = a.output or a.input
save(doc, out, strict=False)
print(f"set firstNameIndex={a.first} secondNameIndex={a.second} -> {out}")
def cmd_pack(a):
doc = load(a.input)
save(doc, a.output, strict=not a.no_strict)
print(f"packed -> {a.output} "
f"(CompartmentsHash={doc['walker']['CompartmentsHash'][:8]} "
f"ConnectionsHash={doc['walker']['ConnectionsHash'][:8]})")
# textureRawData is RGBA8888 stored bottom-up (Unity convention); flip on the way
# in/out so PNGs are upright both in files and in-engine. Icon isn't hashed.
def cmd_get_icon(a):
from PIL import Image
doc = load(a.input)
sz = doc['textureSize']
img = Image.frombytes('RGBA', (sz, sz), bytes(doc['textureRawData']))
img = img.transpose(Image.FLIP_TOP_BOTTOM)
out = a.output or (pathlib.Path(a.input).stem + '.png')
img.save(out)
print(f"wrote {out} ({sz}x{sz} RGBA)")
def cmd_set_icon(a):
from PIL import Image
doc = load(a.input)
sz = doc['textureSize']
img = Image.open(a.png).convert('RGBA')
if img.size != (sz, sz):
img = img.resize((sz, sz), Image.LANCZOS)
img = img.transpose(Image.FLIP_TOP_BOTTOM) # -> bottom-up for in-engine upright
doc['textureRawData'] = img.tobytes()
out = a.output or a.input
save(doc, out, strict=False) # icon doesn't affect any hash
print(f"set icon {a.png} -> {out}")
def main():
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
sp = ap.add_subparsers(dest='cmd', required=True)
p = sp.add_parser('repack'); p.add_argument('input'); p.set_defaults(fn=cmd_repack)
p = sp.add_parser('rename'); p.add_argument('input'); p.add_argument('first', type=int)
p.add_argument('second', type=int); p.add_argument('-o', '--output'); p.set_defaults(fn=cmd_rename)
p = sp.add_parser('pack'); p.add_argument('input'); p.add_argument('-o', '--output', required=True)
p.add_argument('--no-strict', action='store_true'); p.set_defaults(fn=cmd_pack)
p = sp.add_parser('get-icon'); p.add_argument('input'); p.add_argument('-o', '--output')
p.set_defaults(fn=cmd_get_icon)
p = sp.add_parser('set-icon'); p.add_argument('input'); p.add_argument('png')
p.add_argument('-o', '--output'); p.set_defaults(fn=cmd_set_icon)
a = ap.parse_args(); a.fn(a)
if __name__ == '__main__':
main()

107
walker/harvest_hashes.py Normal file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""Harvest per-part hashes from SAND walker saves (100% offline, no injection/proxy).
Every compartment placed in the in-game editor writes its DefinitionHash (and
CompartmentHash) into the .wbt save. This scans all saves, decodes them, and
merges every (EpbId -> {DefinitionHash, CompartmentHash}) into a JSON table,
deduping and keeping a history of any older hash values seen per part.
Re-run it any time after building/saving new walkers to grow the table.
Usage:
harvest_hashes.py # scan the live Walkers dir + snapshots, update table
harvest_hashes.py <dir> ... # scan extra dirs of .wbt too
"""
import sys, gzip, json, glob, os, hashlib
KEY = bytes.fromhex('70dd1f2a0b4a'); CHUNK = 0xA000
WBT_DIR = '/home/downloadpizza/sand_tools/Walkers' # symlink -> live game saves (*.wbt)
EXTRACTED = '/home/downloadpizza/sand_tools/extracted'
OUT = os.path.join(EXTRACTED, 'definition_hashes_known.json')
COMPDB = os.path.join(EXTRACTED, 'CompartmentsDatabase.json')
try:
from bson import decode
except ImportError:
sys.exit("need pymongo: ~/sand_tools/venv/bin/pip install pymongo")
def decode_wbt(path):
raw = gzip.decompress(open(path, 'rb').read())
dec = bytes(raw[i] ^ KEY[(i % CHUNK) % 6] for i in range(len(raw)))
return decode(dec)
def main():
dirs = [WBT_DIR, os.path.join(os.path.expanduser('~'), 'sand_tools', 'snapshots')] + sys.argv[1:]
files = []
for d in dirs:
files += glob.glob(os.path.join(d, '*.wbt'))
# snapshots are decoded already (.dec) -> handle separately
dec_files = []
for d in dirs:
dec_files += glob.glob(os.path.join(d, '*.dec'))
# rebuild fresh from saves+snapshots (the authoritative source); snapshots persist
# history even if live .wbt files are later deleted.
table = {}
def note(epb, comp, deff):
e = table.setdefault(epb, {'DefinitionHash': None, 'DefinitionHash_history': [],
'CompartmentHash': None, 'CompartmentHash_history': []})
for field, hist, val in [('DefinitionHash', 'DefinitionHash_history', deff),
('CompartmentHash', 'CompartmentHash_history', comp)]:
if not val:
continue
if e[field] and e[field] != val and e[field] not in e[hist]:
e[hist].append(e[field])
e[field] = val
scanned = 0
def walkers_from(doc):
w = doc['walker']
return [w['Chassis']] + w['Compartments']
for f in sorted(set(files)):
try:
doc = decode_wbt(f); scanned += 1
except Exception as ex:
print(f"skip {os.path.basename(f)}: {ex}"); continue
for c in walkers_from(doc):
note(c['EpbId'], c.get('CompartmentHash'), c.get('DefinitionHash'))
for f in sorted(set(dec_files)):
try:
doc = decode(open(f, 'rb').read()); scanned += 1
except Exception:
continue
try:
for c in walkers_from(doc):
note(c['EpbId'], c.get('CompartmentHash'), c.get('DefinitionHash'))
except Exception:
pass
# cross-check CompartmentHash against CompartmentsDatabase (authoritative current value)
total_parts = None
if os.path.exists(COMPDB):
db = json.load(open(COMPDB))
total_parts = len(db['compartments'])
calc = {c['entityId']: hashlib.md5(json.dumps(c, separators=(',', ':')).encode()).hexdigest().upper()
for c in db['compartments']}
for epb, e in table.items():
if epb in calc:
e['CompartmentHash_computed'] = calc[epb]
out = {
'_note': ("Per-part hashes harvested from walker saves. CompartmentHash is also computed "
"offline from CompartmentsDatabase (CompartmentHash_computed, authoritative). "
"DefinitionHash is server-sourced and only known for parts that appear in a save — "
"build a part in-game and re-run this to capture it."),
'definition_known': sum(1 for e in table.values() if e.get('DefinitionHash')),
'total_parts_in_db': total_parts,
'parts': dict(sorted(table.items())),
}
os.makedirs(os.path.dirname(OUT), exist_ok=True)
json.dump(out, open(OUT, 'w'), indent=2)
print(f"scanned {scanned} saves; {out['definition_known']} parts with DefinitionHash"
+ (f" / {total_parts} total" if total_parts else "") + f"\nwrote {OUT}")
if __name__ == '__main__':
main()

68
walker/recover_key.py Normal file
View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""Recover the SAND .wbt XOR key after a game update changes it — no RE needed.
Known-plaintext crib: the walker icon (textureRawData) is RGBA8888 and its empty
space is the solid background pixel BG = 02 05 0D 00 (RGBA 2,5,13,0). The BSON
header before the pixel data is a CONSTANT size, so pixel byte 0 always lands at
decoded offset 0x2A:
4 (int32 BSON doc length)
+ 17 (textureSize field: 0x10 + "textureSize\0"(12) + int32(4))
+ 21 (textureRawData header: 0x05 + "textureRawData\0"(15) + int32 len(4) + subtype(1))
= 42 = 0x2A
Pixel (0,0) is the bottom-left of the image (Unity bottom-up), which is background
on a normal walker, so a long run of BG sits right at offset 0x2A — inside the
first 0xA000 cipher chunk, where the keystream residue is just i % keylen.
Recovery: for each i in [0x2A, 0xA000),
key[i % keylen] = enc[i] XOR BG[(i - 0x2A) % 4]
Majority-vote each residue (non-background pixels are the minority and wash out),
try keylen 6 first (current), then 1..16, and accept the key whose full decode
parses as BSON. Independent of whatever the new key bytes are.
Usage:
recover_key.py <wbt> [<wbt> ...] # print recovered key(s)
"""
import sys, gzip, struct
from collections import Counter
BG = bytes((0x02, 0x05, 0x0D, 0x00)) # background pixel, RGBA (2,5,13,0)
PIXOFF = 0x2A # fixed decoded offset of pixel byte 0
CHUNK = 0xA000
def recover(path, keylens=(6,) + tuple(range(1, 17))):
raw = gzip.decompress(open(path, 'rb').read())
end = min(CHUNK, len(raw))
tried = set()
for L in keylens:
if L in tried:
continue
tried.add(L)
votes = [Counter() for _ in range(L)]
for i in range(PIXOFF, end):
kb = raw[i] ^ BG[(i - PIXOFF) % 4]
votes[i % L][kb] += 1
if any(not v for v in votes):
continue
key = bytes(v.most_common(1)[0][0] for v in votes)
if _verifies(raw, key):
return key, L
return None, None
def _verifies(raw, key):
L = len(key)
dec = bytes(raw[i] ^ key[(i % CHUNK) % L] for i in range(len(raw)))
# cheap structural check: BSON doc length == total, and known field names present
if len(dec) < 8:
return False
if struct.unpack_from('<i', dec, 0)[0] != len(dec):
return False
return dec[5:16] == b'textureSize' and b'textureRawData' in dec[:64] and b'walker' in dec
if __name__ == '__main__':
files = sys.argv[1:] or sys.exit("usage: recover_key.py <wbt> ...")
for f in files:
key, L = recover(f)
if key:
print(f"{f.split('/')[-1][:8]} KEY = {key.hex().upper()} (keylen {L})")
else:
print(f"{f.split('/')[-1][:8]} FAILED (no BG run / unexpected layout)")

208
walker/sand.py Executable file
View File

@@ -0,0 +1,208 @@
#!/usr/bin/env python3
"""SAND .wbt save-file toolkit.
Envelope (verified from GameAssembly.dll, XorCryptography.Encrypt @ RVA 0xBE9B40):
Save: BSON(Newtonsoft) -> XOR encrypt -> gzip compress
Load: gunzip -> XOR decrypt -> BSON parse
Inner cipher is XOR with a 6-byte key, applied PER 0xA000-byte chunk with the
key index RESET to 0 at every chunk boundary (the stream is processed in
buffer.Length = 0xA000 reads, and the keystream restarts each read):
decoded[i] = raw[i] XOR KEY[(i % 0xA000) % 6]
KEY = 70 DD 1F 2A 0B 4A (recovered from known BSON plaintext "textureSize"
header; the .cctor allocates a 6-element byte[] and InitializeArray-copies it).
Decoded payload is a single Newtonsoft BSON document
(WalkerBlueprintContainerSerializableProxy): textureSize, textureRawData,
walker{...}, format, iconVersion, firstNameIndex, secondNameIndex,
creationTime, name, isBackup. Name parts are 0-based indices into the Unity
Localization tables WalkerFirstName / WalkerSecondName.
Subcommands:
decode <wbt> -> write decoded bytes
snap [--all | <wbt>...] -> decode + save numbered snapshot per file
diff <before.dec> <after.dec> -> human-readable diff
check <wbt> -> one-shot: snap + diff against latest prior snap
watch <wbt> -> poll the file forever; run check on every mtime change
"""
import sys, gzip, argparse, pathlib, re, time
KEY = bytes.fromhex('70dd1f2a0b4a')
CHUNK = 0xA000
WBT_DIR = pathlib.Path('/home/downloadpizza/sand_tools/Walkers') # symlink -> live game saves
SNAP_DIR = pathlib.Path.home() / 'sand_tools' / 'snapshots'
# Known fields to suppress in diffs (per-file). Each entry: (offset, length, note)
KNOWN_NOISE = {
# file_uuid_prefix : list of (offset, length, label)
'*': [
(0x000000, 2, 'leading 2-byte per-save field'),
],
'215949d3': [
(0x1040f3, 4, 'per-save nonce/timestamp'),
],
}
def decode_bytes(raw: bytes) -> bytes:
out = bytearray(len(raw))
for i in range(len(raw)):
out[i] = raw[i] ^ KEY[(i % CHUNK) % 6]
return bytes(out)
def encode_bytes(dec: bytes) -> bytes:
# XOR is symmetric; same transform re-encrypts a (modified) decoded payload.
return decode_bytes(dec)
def decode_file(path: pathlib.Path) -> bytes:
return decode_bytes(gzip.decompress(path.read_bytes()))
def stem8(path: pathlib.Path) -> str:
return path.stem[:8]
def next_snap_path(file_stem: str) -> pathlib.Path:
SNAP_DIR.mkdir(parents=True, exist_ok=True)
existing = sorted(SNAP_DIR.glob(f'{file_stem}_v*.dec'))
n = 1
if existing:
nums = [int(re.search(r'_v(\d+)\.dec$', p.name).group(1)) for p in existing]
n = max(nums) + 1
return SNAP_DIR / f'{file_stem}_v{n:03d}.dec'
def latest_snap(file_stem: str) -> pathlib.Path | None:
snaps = sorted(SNAP_DIR.glob(f'{file_stem}_v*.dec'))
return snaps[-1] if snaps else None
def cmd_decode(args):
data = decode_file(pathlib.Path(args.input))
out = pathlib.Path(args.output) if args.output else None
if out:
out.write_bytes(data); print(f"wrote {out} ({len(data)} bytes)", file=sys.stderr)
else:
sys.stdout.buffer.write(data)
def cmd_snap(args):
if args.all:
files = sorted(WBT_DIR.glob('*.wbt'))
else:
files = [pathlib.Path(p) for p in args.files]
for f in files:
data = decode_file(f)
out = next_snap_path(stem8(f))
out.write_bytes(data)
print(f"snap: {f.name[:8]}... -> {out.name} ({len(data)} bytes)")
def diff_regions(a: bytes, b: bytes, merge_gap=8):
common = min(len(a), len(b))
regions = []
state = None; start = 0
for i in range(common):
eq = a[i] == b[i]
cur = 'eq' if eq else 'df'
if state is None: state = cur; start = i
elif cur != state: regions.append((state, start, i)); state = cur; start = i
regions.append((state, start, common))
merged = []
for r in regions:
if merged and r[0]=='eq' and (r[2]-r[1])<merge_gap and merged[-1][0]=='df':
merged[-1] = ('df', merged[-1][1], r[2])
elif merged and merged[-1][0]==r[0]:
merged[-1] = (r[0], merged[-1][1], r[2])
else:
merged.append(list(r))
return [tuple(r) for r in merged if r[0]=='df'], len(a), len(b)
def filter_noise(diffs, file_stem):
noise = KNOWN_NOISE.get('*', []) + KNOWN_NOISE.get(file_stem, [])
def overlaps(s, e):
return any(s < no+nn and e > no for no, nn, _ in noise)
kept = [d for d in diffs if not overlaps(d[1], d[2])]
suppressed = len(diffs) - len(kept)
return kept, suppressed
def render_diffs(a, b, diffs, ctx=12):
for t, s, e in diffs:
n = e - s
pre = a[max(0,s-ctx):s].hex()
post = a[e:e+ctx].hex()
print(f"@ {s:#010x} .. {e:#010x} ({n} bytes)")
print(f" -before: {pre} [{a[s:e].hex()}] {post}")
print(f" +after : {pre} [{b[s:e].hex()}] {post}")
if n <= 4:
bv = int.from_bytes(a[s:e], 'little')
av = int.from_bytes(b[s:e], 'little')
print(f" LE int : {bv} -> {av}{av-bv:+d})")
print()
sys.stdout.flush()
def cmd_diff(args):
a = pathlib.Path(args.before).read_bytes()
b = pathlib.Path(args.after).read_bytes()
diffs, la, lb = diff_regions(a, b)
file_stem = re.match(r'([0-9a-f]{8})', pathlib.Path(args.before).name)
stem = file_stem.group(1) if file_stem else ''
if not args.no_filter:
diffs, suppressed = filter_noise(diffs, stem)
else:
suppressed = 0
print(f"sizes: before={la} after={lb} Δ={lb-la:+d}")
print(f"{len(diffs)} differing region(s)" + (f" ({suppressed} known-noise suppressed)" if suppressed else ""))
print()
render_diffs(a, b, diffs)
def cmd_check(args):
path = pathlib.Path(args.input)
stem = stem8(path)
prev = latest_snap(stem)
data = decode_file(path)
out = next_snap_path(stem)
out.write_bytes(data)
print(f"snap: {path.name[:8]}... -> {out.name}")
if not prev:
print("(no prior snapshot to diff against)")
return
a = prev.read_bytes()
b = data
diffs, la, lb = diff_regions(a, b)
if not args.no_filter:
diffs, suppressed = filter_noise(diffs, stem)
else:
suppressed = 0
print(f"diff vs {prev.name}: sizes {la}/{lb} Δ={lb-la:+d}")
print(f"{len(diffs)} region(s)" + (f" ({suppressed} known-noise suppressed)" if suppressed else ""))
print()
render_diffs(a, b, diffs)
def cmd_watch(args):
path = pathlib.Path(args.input)
print(f"watching {path.name} (poll every {args.interval}s, Ctrl-C to stop)", flush=True)
last_mtime = path.stat().st_mtime
while True:
try:
time.sleep(args.interval)
except KeyboardInterrupt:
print("stopped."); return
try:
cur = path.stat().st_mtime
except FileNotFoundError:
continue
if cur != last_mtime:
last_mtime = cur
print(f"\n[{time.strftime('%H:%M:%S')}] change detected", flush=True)
cmd_check(args)
def main():
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
sp = ap.add_subparsers(dest='cmd', required=True)
p = sp.add_parser('decode'); p.add_argument('input'); p.add_argument('-o', '--output'); p.set_defaults(fn=cmd_decode)
p = sp.add_parser('snap'); g = p.add_mutually_exclusive_group(required=True)
g.add_argument('--all', action='store_true'); g.add_argument('files', nargs='*', default=[])
p.set_defaults(fn=cmd_snap)
p = sp.add_parser('diff'); p.add_argument('before'); p.add_argument('after'); p.add_argument('--no-filter', action='store_true'); p.set_defaults(fn=cmd_diff)
p = sp.add_parser('check'); p.add_argument('input'); p.add_argument('--no-filter', action='store_true'); p.set_defaults(fn=cmd_check)
p = sp.add_parser('watch'); p.add_argument('input'); p.add_argument('--no-filter', action='store_true'); p.add_argument('--interval', type=float, default=1.0); p.set_defaults(fn=cmd_watch)
args = ap.parse_args()
args.fn(args)
if __name__ == '__main__':
main()

87
walker/walker_hashes.py Normal file
View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""Reproduce SAND walker hashes offline.
All walker hashes = MD5(UTF8(JsonConvert.SerializeObject(obj))).hexUPPER, where the
default JsonConvert settings use a global StringEnumConverter (enums -> NAME strings),
NullValueHandling.Include, compact formatting, member declaration order.
Verified against all local walkers:
CompartmentHash (per part) = md5_json(placement from CompartmentsDatabase) [compartment_hashes.py]
CompartmentsHash (walker) = md5_json( Compartments list ) <- here
ConnectionsHash (walker) = md5_json( Connections list ) <- here
DefinitionsHash (walker) = md5_json( Compartments.Select(->CompartmentDefinitionDto) )
^ needs the server-sourced CompartmentDefinitionDto objects;
not reproducible offline. Copy from source walker (it only
changes if the set/order of part definitions changes).
DefinitionHash (per part) = md5_json(CompartmentDefinitionDto) -> also server-sourced.
Enum name tables (from dump.cs):
ConnectionSlotType: 0 DOOR,1 HATCH,2 STRUCTURE,3 BALCONY,4 DECK
ConnectionState(MasterserverDtos): 0 DEFAULT,1 DOOR,2 OPEN
ConnectionsCount: 0 FULL,1 PARTIAL,2 ERROR
DecorationsInfo nesting is serialized as KeyValuePair ARRAYS (not JSON objects):
Sockets: Dict<CellCoordinate, Dict<ConnectionSlotType, {state,count}>>
-> {"Sockets":[{"Key":{x,y,z},"Value":[{"Key":"<slot>","Value":{"state":"<st>","count":"<cnt>"}}]}]}
"""
import json, hashlib
SLOT = {0: 'DOOR', 1: 'HATCH', 2: 'STRUCTURE', 3: 'BALCONY', 4: 'DECK'}
STATE = {0: 'DEFAULT', 1: 'DOOR', 2: 'OPEN'}
COUNT = {0: 'FULL', 1: 'PARTIAL', 2: 'ERROR'}
def _md5(s): return hashlib.md5(s.encode()).hexdigest().upper()
def _coord(v): return '{"x":%d,"y":%d,"z":%d}' % (v['x'], v['y'], v['z'])
def _dbl(x): return f'{int(x)}.0' if float(x) == int(x) else repr(float(x))
def _stinfo(d): return '{"state":"%s","count":"%s"}' % (STATE[d['state']], COUNT[d['count']])
def _deco(d):
if d is None:
return 'null'
socks = ','.join(
'{"Key":%s,"Value":[%s]}' % (
_coord(kv['Key']),
','.join('{"Key":"%s","Value":%s}' % (SLOT[iv['Key']], _stinfo(iv['Value']))
for iv in kv['Value']))
for kv in d['Sockets'])
return '{"Sockets":[%s]}' % socks
def compartment_json(c):
p = [('Id', str(c['Id'])), ('EpbId', json.dumps(c['EpbId'])),
('CellCoordinate', _coord(c['CellCoordinate'])),
('DecorationsInfo', _deco(c['DecorationsInfo'])),
('Rotation', _dbl(c['Rotation'])),
('CompartmentHash', json.dumps(c['CompartmentHash'])),
('DefinitionHash', json.dumps(c['DefinitionHash']))]
return '{' + ','.join(f'"{k}":{v}' for k, v in p) + '}'
def connection_json(c):
p = [('Id', str(c['Id'])),
('EpbId', json.dumps(c['EpbId']) if c['EpbId'] is not None else 'null'),
('ActualDirection', _coord(c['ActualDirection'])),
('GridCoordinate', _coord(c['GridCoordinate'])),
('SlotType', '"%s"' % SLOT[c['SlotType']]),
('State', '"%s"' % STATE[c['State']])]
return '{' + ','.join(f'"{k}":{v}' for k, v in p) + '}'
def compartments_hash(compartments):
"""CompartmentsHash from walker['Compartments'] (the BSON-decoded list)."""
return _md5('[' + ','.join(compartment_json(c) for c in compartments) + ']')
def connections_hash(connections):
"""ConnectionsHash from walker['Connections']."""
return _md5('[' + ','.join(connection_json(c) for c in connections) + ']')
if __name__ == '__main__':
import sys, gzip, glob
from bson import decode
KEY = bytes.fromhex('70dd1f2a0b4a'); CHUNK = 0xA000
files = sys.argv[1:] or glob.glob(
'/home/downloadpizza/sand_tools/Walkers/*.wbt')
for f in sorted(files):
raw = gzip.decompress(open(f, 'rb').read())
w = decode(bytes(raw[i] ^ KEY[(i % CHUNK) % 6] for i in range(len(raw))))['walker']
okC = compartments_hash(w['Compartments']) == w['CompartmentsHash']
okN = connections_hash(w['Connections']) == w['ConnectionsHash']
print(f"{f.split('/')[-1][:8]} CompartmentsHash={'OK' if okC else 'FAIL'} "
f"ConnectionsHash={'OK' if okN else 'FAIL'}")