Provide easy to test a shell by using stdin as an input

pull/1055/head
Julien Duponchelle 7 years ago
parent 5616ab0e9a
commit ca3f8d0b43
No known key found for this signature in database
GPG Key ID: CE8B29639E07F5E8

@ -136,6 +136,39 @@ def create_telnet_shell(shell, loop=None):
return AsyncioTelnetServer(reader=shell.writer, writer=shell.reader, binary=False, echo=False)
def create_stdin_shell(shell, loop=None):
"""
Run a shell application with a stdin frontend
:param application: An EmbedShell instance
:param loop: The event loop
:returns: Telnet server
"""
@asyncio.coroutine
def feed_stdin(loop, reader):
while True:
line = yield from loop.run_in_executor(None, sys.stdin.readline)
reader.feed_data(line.encode())
@asyncio.coroutine
def read_stdout(writer):
while True:
c = yield from writer.read(1)
print(c.decode(), end='')
sys.stdout.flush()
reader = asyncio.StreamReader()
writer = asyncio.StreamReader()
shell.reader = reader
shell.writer = writer
if loop is None:
loop = asyncio.get_event_loop()
reader_task = loop.create_task(feed_stdin(loop, reader))
writer_task = loop.create_task(read_stdout(writer))
shell_task = loop.create_task(shell.run())
return asyncio.gather(shell_task, writer_task, reader_task)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
@ -154,34 +187,14 @@ if __name__ == '__main__':
return 'world\n'
# Demo using telnet
server = create_telnet_shell(Demo())
coro = asyncio.start_server(server.run, '127.0.0.1', 4444, loop=loop)
s = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# server = create_telnet_shell(Demo())
# coro = asyncio.start_server(server.run, '127.0.0.1', 4444, loop=loop)
# s = loop.run_until_complete(coro)
# try:
# loop.run_forever()
# except KeyboardInterrupt:
# pass
# Demo using stdin
# @asyncio.coroutine
# def feed_stdin(loop, reader):
# while True:
# line = yield from loop.run_in_executor(None, sys.stdin.readline)
# reader.feed_data(line.encode())
#
# @asyncio.coroutine
# def read_stdout(writer):
# while True:
# c = yield from writer.read(1)
# print(c.decode(), end='')
# sys.stdout.flush()
#
# reader = asyncio.StreamReader()
# writer = asyncio.StreamReader()
# shell = Demo(reader, writer, loop=loop)
#
# reader_task = loop.create_task(feed_stdin(loop, reader))
# writer_task = loop.create_task(read_stdout(writer))
# shell_task = loop.create_task(shell.run())
# loop.run_until_complete(asyncio.gather(shell_task, writer_task, reader_task))
# loop.close()
loop.run_until_complete(create_stdin_shell(Demo()))
loop.close()

Loading…
Cancel
Save