mirror of
https://github.com/hashcat/hashcat.git
synced 2025-07-19 13:08:19 +00:00

Improve unit test for -m 8300. In optimized mode, allow longer passwords, domain names, and salts. In both optimized and pure modes, ensure the domain name does not exceed 63 characters. Fix SNMPv3 unit test to produce passwords of at least 8 characters, as required by RFC 3414. Fix file permissions in tools/ folder.
433 lines
15 KiB
Python
Executable File
433 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
# For extracting APFS hashes to be cracked by hashcat modes 18300 ($fvde$2$) or 16700 ($fvde$1$).
|
||
# Usage: `python3 apfs2hashcat.py <apfs_image_file> -o <_apfs_container_offset>`
|
||
# The argument -o is optional. The script will attempt to read the partition table to find the location of APFS container(s). In the case that the partition table is missing or you want to specify a particular APFS container, use -o to provide the offset to the start of the container.
|
||
|
||
import argparse
|
||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||
|
||
KNOWN_RECOVERY_HASHES = ['64C0C6EB-0000-AA11-AA11-00306543ECAC', 'D92A1CEC-18B6-D64E-BD8D-50F361C27507']
|
||
TAG_DICT = {'unk_80' : {'tag' : b'\x80', 'expected_len' : 1},
|
||
'uuid' : {'tag' : b'\x81', 'expected_len' : 0x10},
|
||
'unk_82' : {'tag' : b'\x82'},
|
||
'wrapped_kek' : {'tag' : b'\x83', 'expected_len' : 0x28},
|
||
'iterations' : {'tag' : b'\x84'},
|
||
'salt' : {'tag' : b'\x85', 'expected_len' : 0x10}}
|
||
HEX_APFS_CONTAINER_GUID = '7C3457EF-0000-11AA-AA11-00306543ECAC'
|
||
AES_XTS_SECTOR_SIZE = 512
|
||
EFI_PARTITION_HEADER = b'EFI PART'
|
||
|
||
def uint_to_int(b):
|
||
return int(b[::-1].hex(), 16)
|
||
|
||
|
||
def findall(p, s):
|
||
i = s.find(p)
|
||
while i != -1:
|
||
yield i
|
||
i = s.find(p, i+1)
|
||
|
||
|
||
def hex_to_guid(hex_str):
|
||
|
||
guid_parts = [0] * 5
|
||
guid_parts[0] = hex_str[0:8]
|
||
guid_parts[1] = hex_str[8:12]
|
||
guid_parts[2] = hex_str[12:16]
|
||
guid_parts[3] = hex_str[16:20]
|
||
guid_parts[4] = hex_str[20:]
|
||
|
||
guid = ''.join([guid_parts[0][i:i+2] for i in range(0, len(guid_parts[0]), 2)][::-1])
|
||
guid += '-'
|
||
guid += ''.join([guid_parts[1][i:i+2] for i in range(0, len(guid_parts[1]), 2)][::-1])
|
||
guid += '-'
|
||
guid += ''.join([guid_parts[2][i:i+2] for i in range(0, len(guid_parts[2]), 2)][::-1])
|
||
guid += '-'
|
||
guid += guid_parts[3]
|
||
guid += '-'
|
||
guid += guid_parts[4]
|
||
|
||
return guid.upper()
|
||
|
||
|
||
def parse_partition_entry(partition_entry):
|
||
type_GUID = partition_entry[0:0x10]
|
||
part_GUID = partition_entry[0x10:0x20]
|
||
start_LBA = partition_entry[0x20:0x28]
|
||
# end_LBA = partition_entry[0x28:0x30]
|
||
return part_GUID, type_GUID, start_LBA
|
||
|
||
|
||
# get main_start by multiplying apfs partition start lba by block size
|
||
def parse_partition_table(fp):
|
||
|
||
# determine whether sector size is 0x200 or 0x1000
|
||
sector_size = 0x0
|
||
|
||
# look for EFI PART at start of sector 1
|
||
fp.seek(0x200)
|
||
signature = fp.read(0x8)
|
||
if signature == EFI_PARTITION_HEADER:
|
||
sector_size = 0x200
|
||
|
||
else:
|
||
fp.seek(0x1000)
|
||
signature = fp.read(0x8)
|
||
if signature == EFI_PARTITION_HEADER:
|
||
sector_size = 0x1000
|
||
|
||
print("[+] Identified sector size:", sector_size)
|
||
|
||
if not sector_size:
|
||
print(f"[!] Invalid sector size {sector_size} (not 512 or 4096 bytes). Exiting.")
|
||
|
||
fp.seek(2 * sector_size) # go to sector 2
|
||
partitions = []
|
||
partition_entry = b'1'
|
||
while any(partition_entry):
|
||
partition_entry = fp.read(0x80)
|
||
if any(partition_entry):
|
||
partitions.append(partition_entry)
|
||
|
||
partition_dict = {}
|
||
for p in partitions:
|
||
part_GUID, type_GUID, start = parse_partition_entry(p)
|
||
starting_pos = uint_to_int(start) * sector_size
|
||
partition_dict[part_GUID.hex()] = {'start':starting_pos, 'partition_type':type_GUID.hex()}
|
||
|
||
return partition_dict
|
||
|
||
|
||
def AES_XTS_decrypt_sector(uuid, tweak, ct):
|
||
|
||
decryptor = Cipher(
|
||
algorithms.AES(key=uuid+uuid),
|
||
modes.XTS(tweak=tweak),
|
||
).decryptor()
|
||
pt = decryptor.update(ct) + decryptor.finalize()
|
||
|
||
return pt
|
||
|
||
|
||
def AES_decrypt(data, start_offset, block_size, uuid):
|
||
cs_factor = block_size // 0x200 # = 8 for block_size=4096
|
||
uno = start_offset * cs_factor
|
||
pt = b''
|
||
for offset in range(0, block_size, AES_XTS_SECTOR_SIZE):
|
||
ct = data[offset:offset + AES_XTS_SECTOR_SIZE]
|
||
tweak = hex(uno)[2:].zfill(32) # 32 so that the key is the correct length (16 bytes)
|
||
tweak = bytearray.fromhex(tweak)[::-1]
|
||
pt += AES_XTS_decrypt_sector(uuid, tweak, ct)
|
||
uno += 1
|
||
|
||
return pt
|
||
|
||
|
||
def TLV(full_kek_blob, tag, starting_index):
|
||
# expected tag should follow if this is the correct TLV)
|
||
if full_kek_blob[starting_index:starting_index+1] != TAG_DICT[tag]['tag']:
|
||
return -1, starting_index
|
||
# check for expected len for further confirmation
|
||
length = uint_to_int(full_kek_blob[starting_index+1:starting_index+2])
|
||
expected_len = TAG_DICT[tag].get('expected_len') # use .get() since not all tags have an expected len
|
||
if expected_len:
|
||
if length != expected_len:
|
||
return -1, starting_index
|
||
next_starting_index = starting_index+2+length
|
||
value = full_kek_blob[starting_index+2:next_starting_index]
|
||
|
||
return value, next_starting_index
|
||
|
||
|
||
def TLV_iterate(starting_index, pt, hash_set, volume_uuid):
|
||
for tag in TAG_DICT:
|
||
value, starting_index = TLV(pt, tag, starting_index)
|
||
|
||
# i.e. if fails length check
|
||
if value == -1:
|
||
return starting_index + 1, hash_set
|
||
TAG_DICT[tag]['value'] = value
|
||
|
||
aes_type = TAG_DICT['unk_82']['value']
|
||
wrapped_kek = TAG_DICT['wrapped_kek']['value']
|
||
iterations = TAG_DICT['iterations']['value']
|
||
salt = TAG_DICT['salt']['value']
|
||
|
||
aes_type = uint_to_int(aes_type[0:4])
|
||
|
||
# FVDE - AES128
|
||
if aes_type == 2:
|
||
aes_hash_value = 1
|
||
wrapped_kek = wrapped_kek[:0x18] # shorter kek value, this removes zeros
|
||
|
||
# APFS - AES256
|
||
elif aes_type == 16 or aes_type == 0:
|
||
aes_hash_value = 2
|
||
|
||
else:
|
||
print("[!] AES type not recognised, continuing...")
|
||
return
|
||
|
||
password_hash = f"$fvde${aes_hash_value}${len(salt)}${salt.hex()}${int(iterations.hex(),16)}${wrapped_kek.hex()}"
|
||
hash_set.add(password_hash)
|
||
print(f"\nFound password hash: {password_hash} (vol uuid: {volume_uuid.hex()})")
|
||
|
||
kek_uuid = hex_to_guid(TAG_DICT['uuid']['value'].hex())
|
||
if kek_uuid in KNOWN_RECOVERY_HASHES:
|
||
print(f"[!] Warning! Recognised UUID {kek_uuid}... possible recovery hash\n")
|
||
|
||
return starting_index, hash_set
|
||
|
||
|
||
def parse_block(block):
|
||
nx_xid = uint_to_int(block[16:24])
|
||
obj_type = uint_to_int(block[24:26])
|
||
magic = block[0x20:0x24]
|
||
|
||
return nx_xid, obj_type, magic
|
||
|
||
|
||
def parse_apsb_block(block):
|
||
obj_type = uint_to_int(block[24:26])
|
||
magic = block[0x20:0x24]
|
||
uuid = block[240:256]
|
||
encryption = uint_to_int(block[264:272])
|
||
name = block[704:960]
|
||
|
||
return obj_type, magic, uuid, encryption, name
|
||
|
||
|
||
def parse_keybag_entry(uuid, pt):
|
||
uuid_iterator = findall(uuid, pt)
|
||
for starting_pos in uuid_iterator:
|
||
ke_uuid, ke_tag, ke_keylen = pt[starting_pos:starting_pos+16], uint_to_int(pt[starting_pos + 16:starting_pos + 18]), uint_to_int(pt[starting_pos + 18:starting_pos + 20])
|
||
padding = pt[starting_pos + 20:starting_pos + 24]
|
||
keydata = pt[starting_pos + 24: starting_pos + 24 + ke_keylen]
|
||
|
||
# only tag 3 is needed for constructing the hash
|
||
if ke_tag == 3:
|
||
assert padding == b'\x00\x00\x00\x00'
|
||
volume_unlock_record = keydata
|
||
return volume_unlock_record
|
||
|
||
return None
|
||
|
||
|
||
def get_fs_oids(csb_body):
|
||
max_file_systems = uint_to_int(csb_body[0x94:0x98])
|
||
fs_oids = set()
|
||
for fs_entry in range(max_file_systems):
|
||
oid_start = 0x98 + 8 * fs_entry
|
||
fs_oid = uint_to_int(csb_body[oid_start:oid_start + 8])
|
||
if not fs_oid:
|
||
continue
|
||
fs_oids.add(fs_oid)
|
||
|
||
return fs_oids
|
||
|
||
|
||
def parse_csb(csb):
|
||
csb_body = csb[0x20:0x568]
|
||
|
||
header = csb_body[:4] # 'NXSB'
|
||
assert header == b'NXSB'
|
||
block_size = uint_to_int(csb_body[4:8]) # default is 4096
|
||
uuid = csb_body[0x28:0x38] # used as key for unwrapping
|
||
omap_oid = uint_to_int(csb_body[0x80:0x88]) # omap_oid to locate the omap to find volume offsets
|
||
fs_oids = get_fs_oids(csb_body)
|
||
|
||
# locate container's keybag using nx_keylocker field
|
||
keylocker_paddr = uint_to_int(csb_body[0x4f0:0x4f8])
|
||
|
||
# block info for iterating to find most recent csb
|
||
xp_desc_blocks = uint_to_int(csb_body[0x48:0x4b])
|
||
xp_desc_base = uint_to_int(csb_body[0x50:0x54])
|
||
|
||
return block_size, uuid, keylocker_paddr, omap_oid, fs_oids, xp_desc_base, xp_desc_blocks
|
||
|
||
|
||
def get_offset_from_oid(oid, apfs_start, block_size):
|
||
return apfs_start + oid * block_size
|
||
|
||
|
||
def parse_tree(tree, fs_oids, block_size):
|
||
|
||
volume_addresses = []
|
||
|
||
# get key data from TOC:
|
||
table_space_offset = uint_to_int(tree[0x28:0x2a])
|
||
table_space_len = uint_to_int(tree[0x2a:0x2c])
|
||
start_of_key_area = table_space_offset + table_space_len + 0x38 # 0x38 = header + entries
|
||
|
||
# b-tree structure is header (0x20 bytes) -> ToC -> keys -> free space -> values -> btree_info (0x28 bytes)
|
||
end_of_value_area = block_size - 0x28
|
||
|
||
tree_data = tree[0x38:]
|
||
for m in range(len(fs_oids)):
|
||
data_start = m * 4
|
||
key_offset = uint_to_int(tree_data[data_start:data_start + 2]) # key offset is from the start of the key area downwards
|
||
data_offset = uint_to_int(tree_data[data_start + 2:data_start + 4]) # data offset is from the end of the data area upwards
|
||
|
||
# get to key area
|
||
key_start = key_offset + start_of_key_area
|
||
key_oid = uint_to_int(tree[key_start:key_start + 0x8])
|
||
|
||
if key_oid not in fs_oids:
|
||
print(f"Found key_oid {key_oid} in omap but not present in fs map. Skipping this volume")
|
||
|
||
else:
|
||
val_end = end_of_value_area - data_offset
|
||
data_paddr = uint_to_int(tree[val_end + 0x8:val_end + 0x10])
|
||
volume_addresses.append(data_paddr)
|
||
|
||
return volume_addresses
|
||
|
||
|
||
def get_volumes(fp, block_size, apfs_start, tree, fs_oids):
|
||
volume_addresses = parse_tree(tree, fs_oids, block_size)
|
||
volumes_dict = dict()
|
||
for v in volume_addresses:
|
||
fp.seek(apfs_start + block_size * v)
|
||
block_start = fp.read(block_size)
|
||
obj_type, magic, uuid, encryption, name = parse_apsb_block(block_start)
|
||
if obj_type == 13 and magic == b'APSB':
|
||
volumes_dict[uuid] = {'start':v, 'name':name}
|
||
print()
|
||
print("[+] The following volumes are present:")
|
||
for u in volumes_dict:
|
||
print(f"{u.hex()} ({volumes_dict[u]['name'].decode()}) at {hex(volumes_dict[u]['start'] * block_size + apfs_start)}")
|
||
|
||
return volumes_dict
|
||
|
||
|
||
def decrypt_volume_keybag(fp, volume_keybag_addr, block_size, apfs_struct_start, volume_uuid):
|
||
volume_keybag_addr = volume_keybag_addr[:4].hex().zfill(8)
|
||
volume_keybag_addr = bytearray.fromhex(volume_keybag_addr)[::-1]
|
||
volume_keybag_addr = int(volume_keybag_addr.hex(),16)
|
||
|
||
offset = block_size * volume_keybag_addr + apfs_struct_start
|
||
fp.seek(offset)
|
||
encrypted_keybag = fp.read(block_size)
|
||
pt = AES_decrypt(encrypted_keybag, volume_keybag_addr, block_size, volume_uuid)
|
||
|
||
return pt
|
||
|
||
|
||
def get_apfs_containers(fp):
|
||
partition_dict = parse_partition_table(fp)
|
||
apfs_containers = []
|
||
for d in partition_dict:
|
||
if hex_to_guid(partition_dict[d]['partition_type']) == HEX_APFS_CONTAINER_GUID:
|
||
apfs_containers.append(partition_dict[d]['start'])
|
||
|
||
return apfs_containers
|
||
|
||
|
||
def get_tree(fp, omap_oid, apfs_struct_start, block_size):
|
||
omap_offset = get_offset_from_oid(omap_oid, apfs_struct_start, block_size)
|
||
fp.seek(omap_offset + 0x30) # location for tree_oid
|
||
tree_oid = fp.read(0x10)
|
||
tree_oid = uint_to_int(tree_oid)
|
||
tree_offset = get_offset_from_oid(tree_oid, apfs_struct_start, block_size)
|
||
|
||
fp.seek(tree_offset)
|
||
tree = fp.read(0x1000)
|
||
|
||
return tree
|
||
|
||
|
||
def get_container_keybag(fp, apfs_struct_start, block_size, keylocker_paddr):
|
||
# calculate offset to read from
|
||
offs = block_size * keylocker_paddr + apfs_struct_start
|
||
fp.seek(offs)
|
||
data = fp.read(block_size)
|
||
|
||
return data
|
||
|
||
|
||
def find_valid_csb(fp, block_size, xp_desc_base, xp_desc_blocks, apfs_start):
|
||
max_xid = 0
|
||
max_xid_paddr = 0
|
||
|
||
for paddr in range(xp_desc_base, xp_desc_base + xp_desc_blocks):
|
||
offs = block_size * paddr + apfs_start
|
||
fp.seek(offs + 0x10)
|
||
csb_xid = uint_to_int(fp.read(0x8))
|
||
if csb_xid >= max_xid:
|
||
max_xid = csb_xid
|
||
max_xid_paddr = paddr
|
||
|
||
print(f"[+] Found valid csb with xid {max_xid} at {hex(max_xid_paddr)}")
|
||
return max_xid_paddr
|
||
|
||
|
||
def main():
|
||
|
||
p = argparse.ArgumentParser()
|
||
p.add_argument('filename')
|
||
p.add_argument('-o', '--offset', help='[OPTIONAL] offset for APFS volume - may be necessary if partition table is not present')
|
||
args = p.parse_args()
|
||
|
||
filename = args.filename
|
||
with open(filename, 'rb') as fp:
|
||
|
||
if args.offset:
|
||
apfs_offset = int(args.offset)
|
||
apfs_containers = [apfs_offset]
|
||
|
||
else:
|
||
apfs_containers = get_apfs_containers(fp)
|
||
|
||
if apfs_containers == []:
|
||
print("[!] APFS volume GUID not found, exiting.")
|
||
exit()
|
||
|
||
for apfs_struct_start in apfs_containers:
|
||
print(f"[+] APFS container starts at {hex(apfs_struct_start)}")
|
||
fp.seek(apfs_struct_start)
|
||
csb = fp.read(0x568)
|
||
|
||
# read the first csb for initial info - then use this to iterate through all csbs and find the most recent one
|
||
block_size, uuid, keylocker_paddr, omap_oid, fs_oids, xp_desc_base, xp_desc_blocks = parse_csb(csb)
|
||
valid_csb_paddr = find_valid_csb(fp, block_size, xp_desc_base, xp_desc_blocks, apfs_struct_start)
|
||
|
||
fp.seek(valid_csb_paddr * block_size + apfs_struct_start)
|
||
valid_csb = fp.read(block_size)
|
||
block_size, uuid, keylocker_paddr, omap_oid, fs_oids, xp_desc_base, xp_desc_blocks = parse_csb(valid_csb)
|
||
|
||
encrypted_keybag = get_container_keybag(fp, apfs_struct_start, block_size, keylocker_paddr)
|
||
# Unwrap container keybag using AES-XTS with container UUID as key
|
||
starting_pt = AES_decrypt(encrypted_keybag, keylocker_paddr, block_size, uuid)
|
||
|
||
# find all volumes to iterate through
|
||
tree = get_tree(fp, omap_oid, apfs_struct_start, block_size)
|
||
|
||
volumes_dict = get_volumes(fp, block_size, apfs_struct_start, tree, fs_oids)
|
||
|
||
hash_set = set()
|
||
for volume_uuid in volumes_dict:
|
||
|
||
# find entry in container's keybag matching volume UUID and has KB_TAG_VOLUME_UNLOCK_RECORDS = 3. Its keydata is location of volume keybag.
|
||
volume_keybag_addr = parse_keybag_entry(volume_uuid, starting_pt)
|
||
|
||
# continue if encrypted keybag not found
|
||
if not volume_keybag_addr:
|
||
continue
|
||
|
||
# unwrap volume keybag using volume uuid AES-XTS
|
||
pt = decrypt_volume_keybag(fp, volume_keybag_addr, block_size, apfs_struct_start, volume_uuid)
|
||
|
||
# parse TLV for 80 first
|
||
index_iterator = findall(TAG_DICT['unk_80']['tag'], pt)
|
||
for starting_index in index_iterator:
|
||
starting_index, hash_set = TLV_iterate(starting_index, pt, hash_set, volume_uuid)
|
||
|
||
print()
|
||
print("[+] All hashes found.")
|
||
|
||
return
|
||
|
||
if __name__ == "__main__":
|
||
main()
|