|
|
|
@ -20,6 +20,7 @@ import pytest
|
|
|
|
|
import uuid
|
|
|
|
|
import sys
|
|
|
|
|
import os
|
|
|
|
|
from aiohttp._ws_impl import WSMsgType
|
|
|
|
|
from tests.utils import asyncio_patch, AsyncioMagicMock
|
|
|
|
|
|
|
|
|
|
from gns3server.ubridge.ubridge_error import UbridgeNamespaceError
|
|
|
|
@ -904,3 +905,27 @@ def test_fix_permission(vm, loop):
|
|
|
|
|
loop.run_until_complete(vm._fix_permissions())
|
|
|
|
|
mock_exec.assert_called_with('docker', 'exec', 'e90e34656842', '/gns3/bin/busybox', 'sh', '-c', '(/gns3/bin/busybox find "/etc" -depth -print0 | /gns3/bin/busybox xargs -0 /gns3/bin/busybox stat -c \'%a:%u:%g:%n\' > "/etc/.gns3_perms") && /gns3/bin/busybox chmod -R u+rX "/etc" && /gns3/bin/busybox chown {}:{} -R "/etc"'.format(os.getuid(), os.getgid()))
|
|
|
|
|
assert process.wait.called
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_read_console_output_with_binary_mode(vm, loop):
|
|
|
|
|
class InputStreamMock(object):
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.sent = False
|
|
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
|
def receive(self):
|
|
|
|
|
if not self.sent:
|
|
|
|
|
self.sent = True
|
|
|
|
|
return MagicMock(tp=WSMsgType.BINARY, data=b"test")
|
|
|
|
|
else:
|
|
|
|
|
return MagicMock(tp=WSMsgType.CLOSE)
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
input_stream = InputStreamMock()
|
|
|
|
|
output_stream = MagicMock()
|
|
|
|
|
|
|
|
|
|
with asyncio_patch('gns3server.compute.docker.docker_vm.DockerVM.stop'):
|
|
|
|
|
loop.run_until_complete(asyncio.async(vm._read_console_output(input_stream, output_stream)))
|
|
|
|
|
output_stream.feed_data.assert_called_once_with(b"test")
|
|
|
|
|