|
|
|
@ -225,45 +225,43 @@ class InvalidImageError(Exception):
|
|
|
|
|
return self._message
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_valid_image_header(data: bytes, image_type: str, header_magic_len: int) -> None:
|
|
|
|
|
|
|
|
|
|
if image_type == "ios":
|
|
|
|
|
# file must start with the ELF magic number, be 32-bit, big endian and have an ELF version of 1
|
|
|
|
|
if data[:header_magic_len] != b'\x7fELF\x01\x02\x01':
|
|
|
|
|
raise InvalidImageError("Invalid IOS file detected")
|
|
|
|
|
elif image_type == "iou":
|
|
|
|
|
# file must start with the ELF magic number, be 32-bit or 64-bit, little endian and have an ELF version of 1
|
|
|
|
|
# (normal IOS images are big endian!)
|
|
|
|
|
if data[:header_magic_len] != b'\x7fELF\x01\x01\x01' and data[:7] != b'\x7fELF\x02\x01\x01':
|
|
|
|
|
raise InvalidImageError("Invalid IOU file detected")
|
|
|
|
|
elif image_type == "qemu":
|
|
|
|
|
if data[:header_magic_len] != b'QFI\xfb' and data[:header_magic_len] != b'KDMV':
|
|
|
|
|
raise InvalidImageError("Invalid Qemu file detected (must be qcow2 or VDMK format)")
|
|
|
|
|
def check_valid_image_header(data: bytes) -> str:
|
|
|
|
|
|
|
|
|
|
if data[:7] == b'\x7fELF\x01\x02\x01':
|
|
|
|
|
# for IOS images: file must start with the ELF magic number, be 32-bit, big endian and have an ELF version of 1
|
|
|
|
|
return "ios"
|
|
|
|
|
elif data[:7] == b'\x7fELF\x01\x01\x01' or data[:7] == b'\x7fELF\x02\x01\x01':
|
|
|
|
|
# for IOU images file must start with the ELF magic number, be 32-bit or 64-bit, little endian and
|
|
|
|
|
# have an ELF version of 1 (normal IOS images are big endian!)
|
|
|
|
|
return "iou"
|
|
|
|
|
elif data[:4] != b'QFI\xfb' or data[:4] != b'KDMV':
|
|
|
|
|
return "qemu"
|
|
|
|
|
else:
|
|
|
|
|
raise InvalidImageError("Could not detect image type, please make sure it is a valid image")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def write_image(
|
|
|
|
|
image_name: str,
|
|
|
|
|
image_type: str,
|
|
|
|
|
path: str,
|
|
|
|
|
image_filename: str,
|
|
|
|
|
image_path: str,
|
|
|
|
|
stream: AsyncGenerator[bytes, None],
|
|
|
|
|
images_repo: ImagesRepository,
|
|
|
|
|
check_image_header=True
|
|
|
|
|
) -> models.Image:
|
|
|
|
|
|
|
|
|
|
log.info(f"Writing image file to '{path}'")
|
|
|
|
|
image_dir, image_name = os.path.split(image_filename)
|
|
|
|
|
log.info(f"Writing image file to '{image_path}'")
|
|
|
|
|
# Store the file under its final name only when the upload is completed
|
|
|
|
|
tmp_path = path + ".tmp"
|
|
|
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
|
|
|
tmp_path = image_path + ".tmp"
|
|
|
|
|
os.makedirs(os.path.dirname(image_path), exist_ok=True)
|
|
|
|
|
checksum = hashlib.md5()
|
|
|
|
|
header_magic_len = 7
|
|
|
|
|
if image_type == "qemu":
|
|
|
|
|
header_magic_len = 4
|
|
|
|
|
image_type = None
|
|
|
|
|
try:
|
|
|
|
|
async with aiofiles.open(tmp_path, "wb") as f:
|
|
|
|
|
async for chunk in stream:
|
|
|
|
|
if check_image_header and len(chunk) >= header_magic_len:
|
|
|
|
|
check_image_header = False
|
|
|
|
|
check_valid_image_header(chunk, image_type, header_magic_len)
|
|
|
|
|
image_type = check_valid_image_header(chunk)
|
|
|
|
|
await f.write(chunk)
|
|
|
|
|
checksum.update(chunk)
|
|
|
|
|
|
|
|
|
@ -273,12 +271,16 @@ async def write_image(
|
|
|
|
|
|
|
|
|
|
checksum = checksum.hexdigest()
|
|
|
|
|
duplicate_image = await images_repo.get_image_by_checksum(checksum)
|
|
|
|
|
if duplicate_image and os.path.dirname(duplicate_image.path) == os.path.dirname(path):
|
|
|
|
|
if duplicate_image and os.path.dirname(duplicate_image.path) == os.path.dirname(image_path):
|
|
|
|
|
raise InvalidImageError(f"Image {duplicate_image.filename} with "
|
|
|
|
|
f"same checksum already exists in the same directory")
|
|
|
|
|
except InvalidImageError:
|
|
|
|
|
os.remove(tmp_path)
|
|
|
|
|
raise
|
|
|
|
|
os.chmod(tmp_path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
|
|
|
|
|
shutil.move(tmp_path, path)
|
|
|
|
|
return await images_repo.add_image(image_name, image_type, image_size, path, checksum, checksum_algorithm="md5")
|
|
|
|
|
if not image_dir:
|
|
|
|
|
directory = default_images_directory(image_type)
|
|
|
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
|
image_path = os.path.abspath(os.path.join(directory, image_filename))
|
|
|
|
|
shutil.move(tmp_path, image_path)
|
|
|
|
|
return await images_repo.add_image(image_name, image_type, image_size, image_path, checksum, checksum_algorithm="md5")
|
|
|
|
|