mirror of
https://github.com/GNS3/gns3-server
synced 2024-11-24 09:18:08 +00:00
Apply pep8 fix
This commit is contained in:
parent
f01a6dcaaf
commit
da240f21ba
@ -323,7 +323,7 @@ class DynamipsVMHandler:
|
||||
pcap_file_path = os.path.join(vm.project.capture_working_directory(), request.json["capture_file_name"])
|
||||
|
||||
if sys.platform.startswith('win'):
|
||||
#FIXME: Dynamips (Cygwin actually) doesn't like non ascii paths on Windows
|
||||
# FIXME: Dynamips (Cygwin actually) doesn't like non ascii paths on Windows
|
||||
try:
|
||||
pcap_file_path.encode('ascii')
|
||||
except UnicodeEncodeError:
|
||||
|
@ -57,4 +57,3 @@ class FileHandler:
|
||||
raise aiohttp.web.HTTPNotFound()
|
||||
except OSError as e:
|
||||
raise aiohttp.web.HTTPConflict(text=str(e))
|
||||
|
||||
|
@ -16,7 +16,9 @@
|
||||
|
||||
from ..web.route import Route
|
||||
|
||||
|
||||
class IndexHandler:
|
||||
|
||||
@classmethod
|
||||
@Route.get(
|
||||
r"/",
|
||||
|
@ -44,6 +44,7 @@ class Container(BaseVM):
|
||||
:param manager: Manager instance
|
||||
:param image: Docker image
|
||||
"""
|
||||
|
||||
def __init__(self, name, vm_id, project, manager, image, startcmd=None):
|
||||
self._name = name
|
||||
self._id = vm_id
|
||||
|
@ -276,7 +276,7 @@ class DynamipsHypervisor:
|
||||
while True:
|
||||
try:
|
||||
try:
|
||||
#line = yield from self._reader.readline() # this can lead to ValueError: Line is too long
|
||||
# line = yield from self._reader.readline() # this can lead to ValueError: Line is too long
|
||||
chunk = yield from self._reader.read(1024) # match to Dynamips' buffer size
|
||||
except asyncio.CancelledError:
|
||||
# task has been canceled but continue to read
|
||||
|
@ -193,7 +193,7 @@ class EthernetSwitch(Device):
|
||||
elif settings["type"] == "dot1q":
|
||||
yield from self.set_dot1q_port(port_number, settings["vlan"])
|
||||
elif settings["type"] == "qinq":
|
||||
yield from self.set_qinq_port(port_number, settings["vlan"], settings["ethertype"] )
|
||||
yield from self.set_qinq_port(port_number, settings["vlan"], settings["ethertype"])
|
||||
|
||||
@asyncio.coroutine
|
||||
def set_access_port(self, port_number, vlan_id):
|
||||
|
@ -48,8 +48,8 @@ def uncompress_LZC(data):
|
||||
LZC_NUM_BITS_MIN = 9
|
||||
LZC_NUM_BITS_MAX = 16
|
||||
|
||||
in_data = bytearray(data)
|
||||
in_len = len(in_data)
|
||||
in_data = bytearray(data)
|
||||
in_len = len(in_data)
|
||||
out_data = bytearray()
|
||||
|
||||
if in_len == 0:
|
||||
@ -59,26 +59,26 @@ def uncompress_LZC(data):
|
||||
if in_data[0] != 0x1F or in_data[1] != 0x9D:
|
||||
raise ValueError('invalid header')
|
||||
|
||||
maxbits = in_data[2] & 0x1F
|
||||
numItems = 1 << maxbits
|
||||
maxbits = in_data[2] & 0x1F
|
||||
numItems = 1 << maxbits
|
||||
blockMode = (in_data[2] & 0x80) != 0
|
||||
if maxbits < LZC_NUM_BITS_MIN or maxbits > LZC_NUM_BITS_MAX:
|
||||
raise ValueError('not supported')
|
||||
|
||||
parents = [0] * numItems
|
||||
parents = [0] * numItems
|
||||
suffixes = [0] * numItems
|
||||
|
||||
in_pos = 3
|
||||
in_pos = 3
|
||||
numBits = LZC_NUM_BITS_MIN
|
||||
head = 256
|
||||
head = 256
|
||||
if blockMode:
|
||||
head += 1
|
||||
|
||||
needPrev = 0
|
||||
bitPos = 0
|
||||
needPrev = 0
|
||||
bitPos = 0
|
||||
numBufBits = 0
|
||||
|
||||
parents[256] = 0
|
||||
parents[256] = 0
|
||||
suffixes[256] = 0
|
||||
|
||||
buf_extend = bytearray([0] * 3)
|
||||
@ -87,7 +87,7 @@ def uncompress_LZC(data):
|
||||
# fill buffer, when empty
|
||||
if numBufBits == bitPos:
|
||||
buf_len = min(in_len - in_pos, numBits)
|
||||
buf = in_data[in_pos:in_pos+buf_len] + buf_extend
|
||||
buf = in_data[in_pos:in_pos + buf_len] + buf_extend
|
||||
numBufBits = buf_len << 3
|
||||
bitPos = 0
|
||||
in_pos += buf_len
|
||||
@ -142,12 +142,12 @@ def uncompress_LZC(data):
|
||||
|
||||
# extract 16 bit unsigned int from data
|
||||
def get_uint16(data, off):
|
||||
return data[off] << 8 | data[off+1]
|
||||
return data[off] << 8 | data[off + 1]
|
||||
|
||||
|
||||
# extract 32 bit unsigned int from data
|
||||
def get_uint32(data, off):
|
||||
return data[off] << 24 | data[off+1] << 16 | data[off+2] << 8 | data[off+3]
|
||||
return data[off] << 24 | data[off + 1] << 16 | data[off + 2] << 8 | data[off + 3]
|
||||
|
||||
|
||||
# export IOU NVRAM
|
||||
@ -165,7 +165,7 @@ def nvram_export(nvram):
|
||||
offset += 36
|
||||
if len(nvram) < offset + length:
|
||||
raise ValueError('invalid length')
|
||||
startup = nvram[offset:offset+length]
|
||||
startup = nvram[offset:offset + length]
|
||||
|
||||
# compressed startup config
|
||||
if format == 2:
|
||||
@ -176,7 +176,7 @@ def nvram_export(nvram):
|
||||
|
||||
offset += length
|
||||
# alignment to multiple of 4
|
||||
offset = (offset+3) & ~3
|
||||
offset = (offset + 3) & ~3
|
||||
# check for additonal offset of 4
|
||||
if len(nvram) >= offset + 8 and \
|
||||
get_uint16(nvram, offset + 4) == 0xFEDC and \
|
||||
@ -189,7 +189,7 @@ def nvram_export(nvram):
|
||||
length = get_uint32(nvram, offset + 12)
|
||||
offset += 16
|
||||
if len(nvram) >= offset + length:
|
||||
private = nvram[offset:offset+length]
|
||||
private = nvram[offset:offset + length]
|
||||
|
||||
return (startup, private)
|
||||
|
||||
|
@ -41,26 +41,26 @@ import sys
|
||||
|
||||
# extract 16 bit unsigned int from data
|
||||
def get_uint16(data, off):
|
||||
return data[off] << 8 | data[off+1]
|
||||
return data[off] << 8 | data[off + 1]
|
||||
|
||||
|
||||
# extract 32 bit unsigned int from data
|
||||
def get_uint32(data, off):
|
||||
return data[off] << 24 | data[off+1] << 16 | data[off+2] << 8 | data[off+3]
|
||||
return data[off] << 24 | data[off + 1] << 16 | data[off + 2] << 8 | data[off + 3]
|
||||
|
||||
|
||||
# insert 16 bit unsigned int into data
|
||||
def put_uint16(data, off, value):
|
||||
data[off] = (value >> 8) & 0xff
|
||||
data[off+1] = value & 0xff
|
||||
data[off] = (value >> 8) & 0xff
|
||||
data[off + 1] = value & 0xff
|
||||
|
||||
|
||||
# insert 32 bit unsigned int into data
|
||||
def put_uint32(data, off, value):
|
||||
data[off] = (value >> 24) & 0xff
|
||||
data[off+1] = (value >> 16) & 0xff
|
||||
data[off+2] = (value >> 8) & 0xff
|
||||
data[off+3] = value & 0xff
|
||||
data[off] = (value >> 24) & 0xff
|
||||
data[off + 1] = (value >> 16) & 0xff
|
||||
data[off + 2] = (value >> 8) & 0xff
|
||||
data[off + 3] = value & 0xff
|
||||
|
||||
|
||||
# calculate padding
|
||||
@ -77,7 +77,7 @@ def checksum(data, start, end):
|
||||
|
||||
chk = 0
|
||||
idx = start
|
||||
while idx < end-1:
|
||||
while idx < end - 1:
|
||||
chk += get_uint16(data, idx)
|
||||
idx += 2
|
||||
if idx < end:
|
||||
@ -93,21 +93,21 @@ def checksum(data, start, end):
|
||||
# import IOU NVRAM
|
||||
def nvram_import(nvram, startup, private, size):
|
||||
BASE_ADDRESS = 0x10000000
|
||||
DEFAULT_IOS = 0x0F04 # IOS 15.4
|
||||
DEFAULT_IOS = 0x0F04 # IOS 15.4
|
||||
|
||||
# check size parameter
|
||||
if size is not None and (size < 8 or size > 1024):
|
||||
raise ValueError('invalid size')
|
||||
|
||||
# create new nvram if nvram is empty or has wrong size
|
||||
if nvram is None or (size is not None and len(nvram) != size*1024):
|
||||
nvram = bytearray([0] * (size*1024))
|
||||
if nvram is None or (size is not None and len(nvram) != size * 1024):
|
||||
nvram = bytearray([0] * (size * 1024))
|
||||
else:
|
||||
nvram = bytearray(nvram)
|
||||
|
||||
# check nvram size
|
||||
nvram_len = len(nvram)
|
||||
if nvram_len < 8*1024 or nvram_len > 1024*1024 or nvram_len % 1024 != 0:
|
||||
if nvram_len < 8 * 1024 or nvram_len > 1024 * 1024 or nvram_len % 1024 != 0:
|
||||
raise ValueError('invalid NVRAM length')
|
||||
nvram_len = nvram_len // 2
|
||||
|
||||
@ -127,7 +127,7 @@ def nvram_import(nvram, startup, private, size):
|
||||
raise ValueError('unknown nvram format')
|
||||
|
||||
# calculate max. config size
|
||||
max_config = nvram_len - 2*1024 # reserve 2k for files
|
||||
max_config = nvram_len - 2 * 1024 # reserve 2k for files
|
||||
idx = max_config
|
||||
empty_sector = bytearray([0] * 1024)
|
||||
while True:
|
||||
@ -135,11 +135,11 @@ def nvram_import(nvram, startup, private, size):
|
||||
if idx < config_len:
|
||||
break
|
||||
# if valid file header:
|
||||
if get_uint16(nvram, idx+0) == 0xDCBA and \
|
||||
get_uint16(nvram, idx+4) < 8 and \
|
||||
get_uint16(nvram, idx+6) <= 992:
|
||||
if get_uint16(nvram, idx + 0) == 0xDCBA and \
|
||||
get_uint16(nvram, idx + 4) < 8 and \
|
||||
get_uint16(nvram, idx + 6) <= 992:
|
||||
max_config = idx
|
||||
elif nvram[idx:idx+1024] != empty_sector:
|
||||
elif nvram[idx:idx + 1024] != empty_sector:
|
||||
break
|
||||
|
||||
# import startup config
|
||||
@ -151,11 +151,11 @@ def nvram_import(nvram, startup, private, size):
|
||||
ios = DEFAULT_IOS
|
||||
startup.extend([ord('\n')] * ((4 - len(startup) % 4) % 4))
|
||||
new_nvram = bytearray([0] * 36) # startup hdr
|
||||
put_uint16(new_nvram, 0, 0xABCD) # magic
|
||||
put_uint16(new_nvram, 2, 1) # raw data
|
||||
put_uint16(new_nvram, 6, ios) # IOS version
|
||||
put_uint32(new_nvram, 8, BASE_ADDRESS+36) # start address
|
||||
put_uint32(new_nvram, 12, BASE_ADDRESS+36 + len(startup)) # end address
|
||||
put_uint16(new_nvram, 0, 0xABCD) # magic
|
||||
put_uint16(new_nvram, 2, 1) # raw data
|
||||
put_uint16(new_nvram, 6, ios) # IOS version
|
||||
put_uint32(new_nvram, 8, BASE_ADDRESS + 36) # start address
|
||||
put_uint32(new_nvram, 12, BASE_ADDRESS + 36 + len(startup)) # end address
|
||||
put_uint32(new_nvram, 16, len(startup)) # length
|
||||
new_nvram.extend(startup)
|
||||
new_nvram.extend([0] * padding(len(new_nvram), ios))
|
||||
@ -167,11 +167,11 @@ def nvram_import(nvram, startup, private, size):
|
||||
private = bytearray(private)
|
||||
offset = len(new_nvram)
|
||||
new_nvram.extend([0] * 16) # private hdr
|
||||
put_uint16(new_nvram, 0 + offset, 0xFEDC) # magic
|
||||
put_uint16(new_nvram, 2 + offset, 1) # raw data
|
||||
put_uint32(new_nvram, 4 + offset,
|
||||
put_uint16(new_nvram, 0 + offset, 0xFEDC) # magic
|
||||
put_uint16(new_nvram, 2 + offset, 1) # raw data
|
||||
put_uint32(new_nvram, 4 + offset,
|
||||
BASE_ADDRESS + offset + 16) # start address
|
||||
put_uint32(new_nvram, 8 + offset,
|
||||
put_uint32(new_nvram, 8 + offset,
|
||||
BASE_ADDRESS + offset + 16 + len(private)) # end address
|
||||
put_uint32(new_nvram, 12 + offset, len(private)) # length
|
||||
new_nvram.extend(private)
|
||||
|
@ -557,7 +557,7 @@ class VMware(BaseManager):
|
||||
|
||||
inventory_path = self.get_vmware_inventory_path()
|
||||
if os.path.exists(inventory_path):
|
||||
#FIXME: inventory may exist if VMware workstation has not been fully uninstalled, therefore VMware player VMs are not searched
|
||||
# FIXME: inventory may exist if VMware workstation has not been fully uninstalled, therefore VMware player VMs are not searched
|
||||
return self._get_vms_from_inventory(inventory_path)
|
||||
else:
|
||||
# VMware player has no inventory file, let's search the default location for VMs.
|
||||
|
@ -21,6 +21,7 @@ import hashlib
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def md5sum(path):
|
||||
"""
|
||||
Return the md5sum of an image and cache it on disk
|
||||
|
@ -30,7 +30,7 @@ def _get_windows_interfaces_from_registry():
|
||||
|
||||
import winreg
|
||||
|
||||
#HKLM\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\Interfaces
|
||||
# HKLM\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters\Interfaces
|
||||
interfaces = []
|
||||
try:
|
||||
hkey = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion\NetworkCards")
|
||||
|
@ -104,6 +104,7 @@ def test_upload_previous_checksum(server, tmpdir):
|
||||
checksum = f.read()
|
||||
assert checksum == "ae187e1febee2a150b64849c32d566ca"
|
||||
|
||||
|
||||
def test_upload_images_backup(server, tmpdir):
|
||||
Config.instance().set("Server", "images_path", str(tmpdir / 'images'))
|
||||
os.makedirs(str(tmpdir / 'images' / 'IOU'))
|
||||
|
@ -242,7 +242,6 @@ def test_set_qemu_path_environ(vm, tmpdir, fake_qemu_binary):
|
||||
assert vm.platform == "x86_64"
|
||||
|
||||
|
||||
|
||||
def test_set_qemu_path_windows(vm, tmpdir):
|
||||
|
||||
bin_path = os.path.join(os.environ["PATH"], "qemu-system-x86_64w.EXE")
|
||||
@ -255,7 +254,6 @@ def test_set_qemu_path_windows(vm, tmpdir):
|
||||
assert vm.platform == "x86_64"
|
||||
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("linux") is False, reason="Supported only on linux")
|
||||
def test_set_qemu_path_kvm_binary(vm, tmpdir, fake_qemu_binary):
|
||||
|
||||
|
@ -93,6 +93,7 @@ def parse_vmnet_range(start, end):
|
||||
"""
|
||||
|
||||
class Range(argparse.Action):
|
||||
|
||||
def __call__(self, parser, args, values, option_string=None):
|
||||
if len(values) != 2:
|
||||
raise argparse.ArgumentTypeError("vmnet range must consist of 2 numbers")
|
||||
|
Loading…
Reference in New Issue
Block a user