Python tooling for decoding walker saves and mining game data: sand.py / build_wbt.py / walker_hashes.py / harvest_hashes.py (.wbt codec + hashes), extract_*/loot_probe/odin_read/unitybundle (asset parsing), make_*_wiki + render_wiki (wiki generation), recover_key. Paths point at the local extracted/, wiki/, and Walkers symlink.
224 lines
8.5 KiB
Python
224 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Minimal reader for Sirenix Odin 'Binary' DataFormat (SerializedFormat=0).
|
|
|
|
Implements the BinaryDataReader entry-stream well enough to reconstruct the object
|
|
tree (named fields, nodes, arrays, primitives, strings, references). Produces a
|
|
generic Python structure; the caller maps it to loot-table semantics.
|
|
|
|
Reference: Sirenix.Serialization.BinaryDataReader / BinaryEntryType enum.
|
|
String wire format: [flag:1 byte (0=8bit,1=16bit)] [charCount:int32] [chars].
|
|
"""
|
|
import struct
|
|
|
|
# BinaryEntryType
|
|
INVALID=0
|
|
NAMED_START_REF=1; UNNAMED_START_REF=2
|
|
NAMED_START_STRUCT=3; UNNAMED_START_STRUCT=4
|
|
END_OF_NODE=5
|
|
START_OF_ARRAY=6; END_OF_ARRAY=7
|
|
PRIMITIVE_ARRAY=8
|
|
NAMED_INTERNAL_REF=9; UNNAMED_INTERNAL_REF=10
|
|
NAMED_EXT_REF_INDEX=11; UNNAMED_EXT_REF_INDEX=12
|
|
NAMED_EXT_REF_GUID=13; UNNAMED_EXT_REF_GUID=14
|
|
NAMED_SBYTE=15; UNNAMED_SBYTE=16
|
|
NAMED_BYTE=17; UNNAMED_BYTE=18
|
|
NAMED_SHORT=19; UNNAMED_SHORT=20
|
|
NAMED_USHORT=21; UNNAMED_USHORT=22
|
|
NAMED_INT=23; UNNAMED_INT=24
|
|
NAMED_UINT=25; UNNAMED_UINT=26
|
|
NAMED_LONG=27; UNNAMED_LONG=28
|
|
NAMED_ULONG=29; UNNAMED_ULONG=30
|
|
NAMED_FLOAT=31; UNNAMED_FLOAT=32
|
|
NAMED_DOUBLE=33; UNNAMED_DOUBLE=34
|
|
NAMED_DECIMAL=35; UNNAMED_DECIMAL=36
|
|
NAMED_CHAR=37; UNNAMED_CHAR=38
|
|
NAMED_STRING=39; UNNAMED_STRING=40
|
|
NAMED_GUID=41; UNNAMED_GUID=42
|
|
NAMED_BOOL=43; UNNAMED_BOOL=44
|
|
NAMED_NULL=45; UNNAMED_NULL=46
|
|
TYPE_NAME=47; TYPE_ID=48
|
|
END_OF_STREAM=49
|
|
NAMED_EXT_REF_STRING=50; UNNAMED_EXT_REF_STRING=51
|
|
|
|
NAMED = {NAMED_START_REF,NAMED_START_STRUCT,NAMED_INTERNAL_REF,NAMED_EXT_REF_INDEX,
|
|
NAMED_EXT_REF_GUID,NAMED_SBYTE,NAMED_BYTE,NAMED_SHORT,NAMED_USHORT,NAMED_INT,
|
|
NAMED_UINT,NAMED_LONG,NAMED_ULONG,NAMED_FLOAT,NAMED_DOUBLE,NAMED_DECIMAL,
|
|
NAMED_CHAR,NAMED_STRING,NAMED_GUID,NAMED_BOOL,NAMED_NULL,NAMED_EXT_REF_STRING}
|
|
|
|
class Node:
|
|
"""A reconstructed reference/struct node: type name + fields + unnamed items."""
|
|
__slots__=("type","id","fields","items")
|
|
def __init__(self,type,id):
|
|
self.type=type; self.id=id; self.fields={}; self.items=[]
|
|
def to_py(self):
|
|
d={}
|
|
if self.type: d["$type"]=_short(self.type)
|
|
for k,v in self.fields.items():
|
|
d[k]=_topy(v)
|
|
if self.items:
|
|
d["$items"]=[_topy(x) for x in self.items]
|
|
return d
|
|
|
|
def _short(t):
|
|
# trim 'Namespace.Class, Assembly' -> Class (keep generics short-ish)
|
|
base=t.split(',')[0]
|
|
return base
|
|
|
|
def _topy(v):
|
|
if isinstance(v,Node): return v.to_py()
|
|
if isinstance(v,list): return [_topy(x) for x in v]
|
|
return v
|
|
|
|
class Ref:
|
|
def __init__(self,kind,val): self.kind=kind; self.val=val
|
|
def to_py(self): return {"$ref":self.val,"kind":self.kind}
|
|
|
|
class Reader:
|
|
def __init__(self,data):
|
|
self.d=data; self.p=0; self.n=len(data); self.types={}
|
|
def eof(self): return self.p>=self.n
|
|
def u8(self):
|
|
v=self.d[self.p]; self.p+=1; return v
|
|
def i32(self):
|
|
v=struct.unpack_from('<i',self.d,self.p)[0]; self.p+=4; return v
|
|
def u32(self):
|
|
v=struct.unpack_from('<I',self.d,self.p)[0]; self.p+=4; return v
|
|
def i64(self):
|
|
v=struct.unpack_from('<q',self.d,self.p)[0]; self.p+=8; return v
|
|
def u64(self):
|
|
v=struct.unpack_from('<Q',self.d,self.p)[0]; self.p+=8; return v
|
|
def f32(self):
|
|
v=struct.unpack_from('<f',self.d,self.p)[0]; self.p+=4; return v
|
|
def f64(self):
|
|
v=struct.unpack_from('<d',self.d,self.p)[0]; self.p+=8; return v
|
|
def i16(self):
|
|
v=struct.unpack_from('<h',self.d,self.p)[0]; self.p+=2; return v
|
|
def u16(self):
|
|
v=struct.unpack_from('<H',self.d,self.p)[0]; self.p+=2; return v
|
|
def string(self):
|
|
flag=self.u8(); ln=self.i32()
|
|
if flag==0:
|
|
b=self.d[self.p:self.p+ln]; self.p+=ln
|
|
return b.decode('latin1')
|
|
else:
|
|
b=self.d[self.p:self.p+ln*2]; self.p+=ln*2
|
|
return b.decode('utf-16-le')
|
|
def peek(self):
|
|
return self.d[self.p] if self.p<self.n else END_OF_STREAM
|
|
|
|
def read_type_entry(self):
|
|
b=self.peek()
|
|
if b==TYPE_NAME:
|
|
self.p+=1; tid=self.i32(); s=self.string(); self.types[tid]=s; return s
|
|
if b==TYPE_ID:
|
|
self.p+=1; tid=self.i32(); return self.types.get(tid)
|
|
return None
|
|
|
|
def read_node(self,is_ref):
|
|
t=self.read_type_entry()
|
|
nid=self.i32() if is_ref else None
|
|
node=Node(t,nid)
|
|
while True:
|
|
if self.eof(): break
|
|
b=self.u8()
|
|
if b==END_OF_NODE: break
|
|
name=None
|
|
if b in NAMED: name=self.string()
|
|
val=self.read_value(b)
|
|
if name is not None: node.fields[name]=val
|
|
else: node.items.append(val)
|
|
return node
|
|
|
|
def read_array(self):
|
|
length=self.i64()
|
|
items=[]
|
|
while True:
|
|
if self.eof(): break
|
|
b=self.peek()
|
|
if b==END_OF_ARRAY:
|
|
self.p+=1; break
|
|
self.p+=1
|
|
name=None
|
|
if b in NAMED: name=self.string()
|
|
items.append(self.read_value(b))
|
|
if len(items)>length+8: break # safety
|
|
return items
|
|
|
|
def read_value(self,b):
|
|
# b is the entry byte already consumed (and name already read if NAMED)
|
|
if b in (NAMED_START_REF,UNNAMED_START_REF):
|
|
return self.read_node(True)
|
|
if b in (NAMED_START_STRUCT,UNNAMED_START_STRUCT):
|
|
return self.read_node(False)
|
|
if b==START_OF_ARRAY:
|
|
return self.read_array()
|
|
if b==PRIMITIVE_ARRAY:
|
|
cnt=self.i32(); bpe=self.i32()
|
|
raw=self.d[self.p:self.p+cnt*bpe]; self.p+=cnt*bpe
|
|
return {"$primarray":cnt,"bytesPer":bpe}
|
|
if b in (NAMED_INTERNAL_REF,UNNAMED_INTERNAL_REF):
|
|
return Ref("internal",self.i32())
|
|
if b in (NAMED_EXT_REF_INDEX,UNNAMED_EXT_REF_INDEX):
|
|
return Ref("ext_index",self.i32())
|
|
if b in (NAMED_EXT_REF_GUID,UNNAMED_EXT_REF_GUID):
|
|
g=self.d[self.p:self.p+16]; self.p+=16; return Ref("ext_guid",g.hex())
|
|
if b in (NAMED_EXT_REF_STRING,UNNAMED_EXT_REF_STRING):
|
|
return Ref("ext_string",self.string())
|
|
if b in (NAMED_SBYTE,UNNAMED_SBYTE):
|
|
v=self.u8(); return v-256 if v>127 else v
|
|
if b in (NAMED_BYTE,UNNAMED_BYTE): return self.u8()
|
|
if b in (NAMED_SHORT,UNNAMED_SHORT): return self.i16()
|
|
if b in (NAMED_USHORT,UNNAMED_USHORT): return self.u16()
|
|
if b in (NAMED_INT,UNNAMED_INT): return self.i32()
|
|
if b in (NAMED_UINT,UNNAMED_UINT): return self.u32()
|
|
if b in (NAMED_LONG,UNNAMED_LONG): return self.i64()
|
|
if b in (NAMED_ULONG,UNNAMED_ULONG): return self.u64()
|
|
if b in (NAMED_FLOAT,UNNAMED_FLOAT): return self.f32()
|
|
if b in (NAMED_DOUBLE,UNNAMED_DOUBLE): return self.f64()
|
|
if b in (NAMED_DECIMAL,UNNAMED_DECIMAL):
|
|
raw=self.d[self.p:self.p+16]; self.p+=16; return {"$decimal":raw.hex()}
|
|
if b in (NAMED_CHAR,UNNAMED_CHAR):
|
|
v=self.d[self.p:self.p+2]; self.p+=2; return v.decode('utf-16-le')
|
|
if b in (NAMED_STRING,UNNAMED_STRING): return self.string()
|
|
if b in (NAMED_GUID,UNNAMED_GUID):
|
|
g=self.d[self.p:self.p+16]; self.p+=16; return g.hex()
|
|
if b in (NAMED_BOOL,UNNAMED_BOOL): return self.u8()!=0
|
|
if b in (NAMED_NULL,UNNAMED_NULL): return None
|
|
if b==END_OF_STREAM: return None
|
|
raise ValueError(f"unknown entry byte {b} at pos {self.p-1}")
|
|
|
|
def read_top(self):
|
|
"""Read entries at the document root until end-of-stream."""
|
|
roots={}
|
|
items=[]
|
|
while not self.eof():
|
|
b=self.u8()
|
|
if b in (END_OF_STREAM,INVALID): break
|
|
if b==END_OF_NODE or b==END_OF_ARRAY: continue
|
|
name=None
|
|
if b in NAMED: name=self.string()
|
|
val=self.read_value(b)
|
|
if name is not None: roots[name]=val
|
|
else: items.append(val)
|
|
return roots, items
|
|
|
|
def parse(data):
|
|
r=Reader(data)
|
|
roots,items=r.read_top()
|
|
return {"roots":{k:_topy(v) for k,v in roots.items()},
|
|
"items":[_topy(x) for x in items],
|
|
"consumed":r.p,"total":r.n,"types":r.types}
|
|
|
|
if __name__=="__main__":
|
|
import sys,json
|
|
data=open(sys.argv[1],'rb').read()
|
|
res=parse(data)
|
|
print(f"consumed {res['consumed']}/{res['total']} bytes")
|
|
print(f"{len(res['types'])} types registered")
|
|
out=sys.argv[2] if len(sys.argv)>2 else None
|
|
if out:
|
|
json.dump(res,open(out,'w'),indent=1,ensure_ascii=False)
|
|
print("wrote",out)
|
|
else:
|
|
print(json.dumps(res,indent=1,ensure_ascii=False)[:4000])
|