1
0
mirror of https://github.com/GNS3/gns3-server synced 2025-02-17 18:42:00 +00:00

Non blocking project exportation.

This commit is contained in:
grossmj 2019-02-26 15:55:07 +07:00
parent 135d56371d
commit a8990c9e89
5 changed files with 459 additions and 21 deletions

View File

@ -22,7 +22,6 @@ import asyncio
import aiohttp import aiohttp
import zipfile import zipfile
import tempfile import tempfile
import zipstream
from datetime import datetime from datetime import datetime
@ -30,7 +29,7 @@ import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def export_project(project, temporary_dir, include_images=False, keep_compute_id=False, allow_all_nodes=False, reset_mac_addresses=False): async def export_project(zstream, project, temporary_dir, include_images=False, keep_compute_id=False, allow_all_nodes=False, reset_mac_addresses=False):
""" """
Export a project to a zip file. Export a project to a zip file.
@ -53,8 +52,6 @@ async def export_project(project, temporary_dir, include_images=False, keep_comp
# Make sure we save the project # Make sure we save the project
project.dump() project.dump()
zstream = zipstream.ZipFile(allowZip64=True)
if not os.path.exists(project._path): if not os.path.exists(project._path):
raise aiohttp.web.HTTPNotFound(text="Project could not be found at '{}'".format(project._path)) raise aiohttp.web.HTTPNotFound(text="Project could not be found at '{}'".format(project._path))
@ -80,7 +77,7 @@ async def export_project(project, temporary_dir, include_images=False, keep_comp
if file.endswith(".gns3"): if file.endswith(".gns3"):
continue continue
_patch_mtime(path) _patch_mtime(path)
zstream.write(path, os.path.relpath(path, project._path), compress_type=zipfile.ZIP_DEFLATED) zstream.write(path, os.path.relpath(path, project._path))
# Export files from remote computes # Export files from remote computes
downloaded_files = set() downloaded_files = set()
@ -103,11 +100,9 @@ async def export_project(project, temporary_dir, include_images=False, keep_comp
response.close() response.close()
f.close() f.close()
_patch_mtime(temp_path) _patch_mtime(temp_path)
zstream.write(temp_path, arcname=compute_file["path"], compress_type=zipfile.ZIP_DEFLATED) zstream.write(temp_path, arcname=compute_file["path"])
downloaded_files.add(compute_file['path']) downloaded_files.add(compute_file['path'])
return zstream
def _patch_mtime(path): def _patch_mtime(path):
""" """
@ -232,6 +227,7 @@ async def _patch_project_file(project, path, zstream, include_images, keep_compu
zstream.writestr("project.gns3", json.dumps(topology).encode()) zstream.writestr("project.gns3", json.dumps(topology).encode())
return images return images
def _export_local_image(image, zstream): def _export_local_image(image, zstream):
""" """
Exports a local image to the zip file. Exports a local image to the zip file.

View File

@ -24,6 +24,7 @@ import shutil
import asyncio import asyncio
import aiohttp import aiohttp
import tempfile import tempfile
import zipfile
from uuid import UUID, uuid4 from uuid import UUID, uuid4
@ -38,6 +39,7 @@ from ..utils.path import check_path_allowed, get_default_project_directory
from ..utils.asyncio.pool import Pool from ..utils.asyncio.pool import Pool
from ..utils.asyncio import locking from ..utils.asyncio import locking
from ..utils.asyncio import wait_run_in_executor from ..utils.asyncio import wait_run_in_executor
from ..utils.asyncio import aiozipstream
from .export_project import export_project from .export_project import export_project
from .import_project import import_project from .import_project import import_project
@ -976,9 +978,10 @@ class Project:
assert self._status != "closed" assert self._status != "closed"
try: try:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
zipstream = await export_project(self, tmpdir, keep_compute_id=True, allow_all_nodes=True, reset_mac_addresses=True) with aiozipstream.ZipFile(compression=zipfile.ZIP_STORED) as zstream:
project_path = os.path.join(tmpdir, "project.gns3p") zipstream = await export_project(zstream, self, tmpdir, keep_compute_id=True, allow_all_nodes=True, reset_mac_addresses=True)
await wait_run_in_executor(self._create_duplicate_project_file, project_path, zipstream) project_path = os.path.join(tmpdir, "project.gns3p")
await wait_run_in_executor(self._create_duplicate_project_file, project_path, zipstream)
with open(project_path, "rb") as f: with open(project_path, "rb") as f:
project = await import_project(self._controller, str(uuid.uuid4()), f, location=location, name=name, keep_compute_id=True) project = await import_project(self._controller, str(uuid.uuid4()), f, location=location, name=name, keep_compute_id=True)
except (ValueError, OSError, UnicodeEncodeError) as e: except (ValueError, OSError, UnicodeEncodeError) as e:

View File

@ -20,11 +20,14 @@ import sys
import aiohttp import aiohttp
import asyncio import asyncio
import tempfile import tempfile
import zipfile
import time
from gns3server.web.route import Route from gns3server.web.route import Route
from gns3server.controller import Controller from gns3server.controller import Controller
from gns3server.controller.import_project import import_project from gns3server.controller.import_project import import_project
from gns3server.controller.export_project import export_project from gns3server.controller.export_project import export_project
from gns3server.utils.asyncio import aiozipstream
from gns3server.config import Config from gns3server.config import Config
@ -301,19 +304,24 @@ class ProjectHandler:
controller = Controller.instance() controller = Controller.instance()
project = await controller.get_loaded_project(request.match_info["project_id"]) project = await controller.get_loaded_project(request.match_info["project_id"])
try: try:
begin = time.time()
with tempfile.TemporaryDirectory() as tmp_dir: with tempfile.TemporaryDirectory() as tmp_dir:
stream = await export_project(project, tmp_dir, include_images=bool(int(request.query.get("include_images", "0")))) with aiozipstream.ZipFile(compression=zipfile.ZIP_DEFLATED) as zstream:
# We need to do that now because export could failed and raise an HTTP error await export_project(zstream, project, tmp_dir, include_images=bool(int(request.query.get("include_images", "0"))))
# that why response start need to be the later possible
response.content_type = 'application/gns3project'
response.headers['CONTENT-DISPOSITION'] = 'attachment; filename="{}.gns3project"'.format(project.name)
response.enable_chunked_encoding()
await response.prepare(request)
for data in stream: # We need to do that now because export could failed and raise an HTTP error
await response.write(data) # that why response start need to be the later possible
response.content_type = 'application/octet-stream'
response.headers['CONTENT-DISPOSITION'] = 'attachment; filename="{}.gns3project"'.format(project.name)
response.enable_chunked_encoding()
await response.prepare(request)
async for chunk in zstream:
await response.write(chunk)
log.info("Project '{}' exported in {:.4f} seconds".format(project.id, time.time() - begin))
#await response.write_eof() #FIXME: shound't be needed anymore #await response.write_eof() #FIXME: shound't be needed anymore
# Will be raise if you have no space left or permission issue on your temporary directory # Will be raise if you have no space left or permission issue on your temporary directory
# RuntimeError: something was wrong during the zip process # RuntimeError: something was wrong during the zip process

View File

@ -0,0 +1,430 @@
#!/usr/bin/env python
#
# Copyright (C) 2019 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Iterable ZIP archive generator.
Derived directly from zipfile.py and the zipstream project
https://github.com/allanlei/python-zipstream
"""
import os
import sys
import stat
import struct
import time
import zipfile
import asyncio
import aiofiles
from concurrent import futures
from async_generator import async_generator, yield_
from zipfile import (structCentralDir, structEndArchive64, structEndArchive, structEndArchive64Locator,
stringCentralDir, stringEndArchive64, stringEndArchive, stringEndArchive64Locator)
stringDataDescriptor = b'PK\x07\x08' # magic number for data descriptor
def _get_compressor(compress_type):
"""
Return the compressor.
"""
if compress_type == zipfile.ZIP_DEFLATED:
from zipfile import zlib
return zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
elif compress_type == zipfile.ZIP_BZIP2:
from zipfile import bz2
return bz2.BZ2Compressor()
elif compress_type == zipfile.ZIP_LZMA:
from zipfile import LZMACompressor
return LZMACompressor()
else:
return None
class PointerIO(object):
def __init__(self, mode='wb'):
if mode not in ('wb', ):
raise RuntimeError('zipstream.ZipFile() requires mode "wb"')
self.data_pointer = 0
self.__mode = mode
self.__closed = False
@property
def mode(self):
return self.__mode
@property
def closed(self):
return self.__closed
def close(self):
self.__closed = True
def flush(self):
pass
def next(self):
raise NotImplementedError()
def tell(self):
return self.data_pointer
def truncate(size=None):
raise NotImplementedError()
def write(self, data):
if self.closed:
raise ValueError('I/O operation on closed file')
if isinstance(data, str):
data = data.encode('utf-8')
if not isinstance(data, bytes):
raise TypeError('expected bytes')
self.data_pointer += len(data)
return data
class ZipInfo(zipfile.ZipInfo):
def __init__(self, *args, **kwargs):
zipfile.ZipInfo.__init__(self, *args, **kwargs)
def DataDescriptor(self):
"""
crc-32 4 bytes
compressed size 4 bytes
uncompressed size 4 bytes
"""
if self.compress_size > zipfile.ZIP64_LIMIT or self.file_size > zipfile.ZIP64_LIMIT:
fmt = b'<4sLQQ'
else:
fmt = b'<4sLLL'
return struct.pack(fmt, stringDataDescriptor, self.CRC, self.compress_size, self.file_size)
class ZipFile(zipfile.ZipFile):
def __init__(self, fileobj=None, mode='w', compression=zipfile.ZIP_STORED, allowZip64=True, chunksize=32768):
"""Open the ZIP file with mode write "w"."""
if mode not in ('w', ):
raise RuntimeError('aiozipstream.ZipFile() requires mode "w"')
if fileobj is None:
fileobj = PointerIO()
self._comment = b''
zipfile.ZipFile.__init__(self, fileobj, mode=mode, compression=compression, allowZip64=allowZip64)
self._chunksize = chunksize
self.paths_to_write = []
def __aiter__(self):
return self._stream()
@property
def comment(self):
"""
The comment text associated with the ZIP file.
"""
return self._comment
@comment.setter
def comment(self, comment):
"""
Add a comment text associated with the ZIP file.
"""
if not isinstance(comment, bytes):
raise TypeError("comment: expected bytes, got %s" % type(comment))
# check for valid comment length
if len(comment) >= zipfile.ZIP_MAX_COMMENT:
if self.debug:
print('Archive comment is too long; truncating to %d bytes' % zipfile.ZIP_MAX_COMMENT)
comment = comment[:zipfile.ZIP_MAX_COMMENT]
self._comment = comment
self._didModify = True
@async_generator
async def data_generator(self, path):
async with aiofiles.open(path, "rb") as f:
while True:
part = await f.read(self._chunksize)
if not part:
break
await yield_(part)
return
async def _run_in_executor(self, task, *args, **kwargs):
"""
Run synchronous task in separate thread and await for result.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(futures.ThreadPoolExecutor(max_workers=1), task, *args, **kwargs)
@async_generator
async def _stream(self):
for kwargs in self.paths_to_write:
async for chunk in self._write(**kwargs):
await yield_(chunk)
for chunk in self._close():
await yield_(chunk)
def write(self, filename, arcname=None, compress_type=None):
"""
Write a file to the archive under the name `arcname`.
"""
kwargs = {'filename': filename, 'arcname': arcname, 'compress_type': compress_type}
self.paths_to_write.append(kwargs)
def write_iter(self, arcname, iterable, compress_type=None):
"""
Write the bytes iterable `iterable` to the archive under the name `arcname`.
"""
kwargs = {'arcname': arcname, 'iterable': iterable, 'compress_type': compress_type}
self.paths_to_write.append(kwargs)
def writestr(self, arcname, data, compress_type=None):
"""
Writes a str into ZipFile by wrapping data as a generator
"""
def _iterable():
yield data
return self.write_iter(arcname, _iterable(), compress_type=compress_type)
@async_generator
async def _write(self, filename=None, iterable=None, arcname=None, compress_type=None):
"""
Put the bytes from filename into the archive under the name `arcname`.
"""
if not self.fp:
raise RuntimeError(
"Attempt to write to ZIP archive that was already closed")
if (filename is None and iterable is None) or (filename is not None and iterable is not None):
raise ValueError("either (exclusively) filename or iterable shall be not None")
if filename:
st = os.stat(filename)
isdir = stat.S_ISDIR(st.st_mode)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
else:
st, isdir, date_time = None, False, time.localtime()[0:6]
# Create ZipInfo instance to store file information
if arcname is None:
arcname = filename
arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
while arcname[0] in (os.sep, os.altsep):
arcname = arcname[1:]
if isdir:
arcname += '/'
zinfo = ZipInfo(arcname, date_time)
if st:
zinfo.external_attr = (st[0] & 0xFFFF) << 16 # Unix attributes
else:
zinfo.external_attr = 0o600 << 16 # ?rw-------
if compress_type is None:
zinfo.compress_type = self.compression
else:
zinfo.compress_type = compress_type
if st:
zinfo.file_size = st[6]
else:
zinfo.file_size = 0
zinfo.flag_bits = 0x00
zinfo.flag_bits |= 0x08 # ZIP flag bits, bit 3 indicates presence of data descriptor
zinfo.header_offset = self.fp.tell() # Start of header bytes
if zinfo.compress_type == zipfile.ZIP_LZMA:
# Compressed data includes an end-of-stream (EOS) marker
zinfo.flag_bits |= 0x02
self._writecheck(zinfo)
self._didModify = True
if isdir:
zinfo.file_size = 0
zinfo.compress_size = 0
zinfo.CRC = 0
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
await yield_(self.fp.write(zinfo.FileHeader(False)))
return
cmpr = _get_compressor(zinfo.compress_type)
# Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
# Compressed size can be larger than uncompressed size
zip64 = self._allowZip64 and zinfo.file_size * 1.05 > zipfile.ZIP64_LIMIT
await yield_(self.fp.write(zinfo.FileHeader(zip64)))
file_size = 0
if filename:
async for buf in self.data_generator(filename):
file_size = file_size + len(buf)
CRC = zipfile.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = await self._run_in_executor(cmpr.compress, buf)
compress_size = compress_size + len(buf)
await yield_(self.fp.write(buf))
else: # we have an iterable
for buf in iterable:
file_size = file_size + len(buf)
CRC = zipfile.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = await self._run_in_executor(cmpr.compress, buf)
compress_size = compress_size + len(buf)
await yield_(self.fp.write(buf))
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
await yield_(self.fp.write(buf))
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
if not zip64 and self._allowZip64:
if file_size > zipfile.ZIP64_LIMIT:
raise RuntimeError('File size has increased during compressing')
if compress_size > zipfile.ZIP64_LIMIT:
raise RuntimeError('Compressed size larger than uncompressed size')
await yield_(self.fp.write(zinfo.DataDescriptor()))
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
def _close(self):
"""
Close the file, and for mode "w" write the ending records.
"""
if self.fp is None:
return
try:
if self.mode in ('w', 'a') and self._didModify: # write ending records
count = 0
pos1 = self.fp.tell()
for zinfo in self.filelist: # write central directory
count = count + 1
dt = zinfo.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
extra = []
if zinfo.file_size > zipfile.ZIP64_LIMIT or zinfo.compress_size > zipfile.ZIP64_LIMIT:
extra.append(zinfo.file_size)
extra.append(zinfo.compress_size)
file_size = 0xffffffff
compress_size = 0xffffffff
else:
file_size = zinfo.file_size
compress_size = zinfo.compress_size
if zinfo.header_offset > zipfile.ZIP64_LIMIT:
extra.append(zinfo.header_offset)
header_offset = 0xffffffff
else:
header_offset = zinfo.header_offset
extra_data = zinfo.extra
min_version = 0
if extra:
# Append a ZIP64 field to the extra's
extra_data = struct.pack(
b'<HH' + b'Q'*len(extra),
1, 8*len(extra), *extra) + extra_data
min_version = zipfile.ZIP64_VERSION
if zinfo.compress_type == zipfile.ZIP_BZIP2:
min_version = max(zipfile.BZIP2_VERSION, min_version)
elif zinfo.compress_type == zipfile.ZIP_LZMA:
min_version = max(zipfile.LZMA_VERSION, min_version)
extract_version = max(min_version, zinfo.extract_version)
create_version = max(min_version, zinfo.create_version)
try:
filename, flag_bits = zinfo._encodeFilenameFlags()
centdir = struct.pack(structCentralDir,
stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
header_offset)
except DeprecationWarning:
print((structCentralDir, stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
zinfo.flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(zinfo.filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
header_offset), file=sys.stderr)
raise
yield self.fp.write(centdir)
yield self.fp.write(filename)
yield self.fp.write(extra_data)
yield self.fp.write(zinfo.comment)
pos2 = self.fp.tell()
# Write end-of-zip-archive record
centDirCount = count
centDirSize = pos2 - pos1
centDirOffset = pos1
if (centDirCount >= zipfile.ZIP_FILECOUNT_LIMIT or
centDirOffset > zipfile.ZIP64_LIMIT or
centDirSize > zipfile.ZIP64_LIMIT):
# Need to write the ZIP64 end-of-archive records
zip64endrec = struct.pack(
structEndArchive64, stringEndArchive64,
44, 45, 45, 0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset)
yield self.fp.write(zip64endrec)
zip64locrec = struct.pack(
structEndArchive64Locator,
stringEndArchive64Locator, 0, pos2, 1)
yield self.fp.write(zip64locrec)
centDirCount = min(centDirCount, 0xFFFF)
centDirSize = min(centDirSize, 0xFFFFFFFF)
centDirOffset = min(centDirOffset, 0xFFFFFFFF)
endrec = struct.pack(structEndArchive, stringEndArchive,
0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset, len(self._comment))
yield self.fp.write(endrec)
yield self.fp.write(self._comment)
self.fp.flush()
finally:
fp = self.fp
self.fp = None
if not self._filePassed:
fp.close()

View File

@ -1,10 +1,11 @@
jsonschema>=2.4.0 jsonschema>=2.4.0
aiohttp==3.5.4 aiohttp==3.5.4
aiohttp-cors==0.7.0 aiohttp-cors==0.7.0
aiofiles==0.4.0
async_generator>=1.10
Jinja2>=2.7.3 Jinja2>=2.7.3
raven>=5.23.0 raven>=5.23.0
psutil>=3.0.0 psutil>=3.0.0
zipstream>=1.1.4
prompt-toolkit==1.0.15 prompt-toolkit==1.0.15
async-timeout==3.0.1 async-timeout==3.0.1
distro>=1.3.0 distro>=1.3.0