|
|
|
@ -16,6 +16,10 @@ _session_handlers = {} # session id -> generator
|
|
|
|
|
_workflow_genfuncs = {} # wire type -> (generator function, args)
|
|
|
|
|
_opened_sessions = set() # session ids
|
|
|
|
|
|
|
|
|
|
# TODO: get rid of this, use callbacks instead
|
|
|
|
|
report_writer = write_report_stream()
|
|
|
|
|
report_writer.send(None)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_session_id():
|
|
|
|
|
while True:
|
|
|
|
@ -54,30 +58,17 @@ def register_session(session_id, handler):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def setup():
|
|
|
|
|
report_writer = write_report_stream()
|
|
|
|
|
report_writer.send(None)
|
|
|
|
|
|
|
|
|
|
open_session_handler = _handle_open_session(report_writer)
|
|
|
|
|
open_session_handler.send(None)
|
|
|
|
|
|
|
|
|
|
close_session_handler = _handle_close_session(report_writer)
|
|
|
|
|
close_session_handler.send(None)
|
|
|
|
|
|
|
|
|
|
fallback_session_handler = _handle_unknown_session()
|
|
|
|
|
fallback_session_handler.send(None)
|
|
|
|
|
|
|
|
|
|
session_dispatcher = dispatch_reports_by_session(
|
|
|
|
|
_session_handlers,
|
|
|
|
|
open_session_handler,
|
|
|
|
|
close_session_handler,
|
|
|
|
|
fallback_session_handler)
|
|
|
|
|
_handle_open_session,
|
|
|
|
|
_handle_close_session,
|
|
|
|
|
_handle_unknown_session)
|
|
|
|
|
session_dispatcher.send(None)
|
|
|
|
|
|
|
|
|
|
schedule_task(read_report_stream(session_dispatcher))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def read_message(session_id, *exp_types):
|
|
|
|
|
log.info(__name__, 'reading message, one of %s', exp_types)
|
|
|
|
|
log.info(__name__, 'reading message of types %s', exp_types)
|
|
|
|
|
future = Future()
|
|
|
|
|
wire_decoder = decode_wire_stream(
|
|
|
|
|
_dispatch_and_build_protobuf, session_id, exp_types, future)
|
|
|
|
@ -90,9 +81,7 @@ async def write_message(session_id, pbuf_message):
|
|
|
|
|
log.info(__name__, 'writing message %s', pbuf_message)
|
|
|
|
|
msg_data = await pbuf_message.dumps()
|
|
|
|
|
msg_type = pbuf_message.message_type.wire_type
|
|
|
|
|
writer = write_report_stream()
|
|
|
|
|
writer.send(None)
|
|
|
|
|
encode_wire_message(msg_type, msg_data, session_id, writer)
|
|
|
|
|
encode_wire_message(msg_type, msg_data, session_id, report_writer)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def reply_message(session_id, pbuf_message, *exp_types):
|
|
|
|
@ -151,26 +140,21 @@ def protobuf_handler(msg_type, data_len, session_id, callback, *args):
|
|
|
|
|
return pbuf_type.load(builder)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_open_session(write_target):
|
|
|
|
|
while True:
|
|
|
|
|
yield
|
|
|
|
|
session_id = open_session()
|
|
|
|
|
wire_decoder = decode_wire_stream(_handle_registered_type, session_id)
|
|
|
|
|
wire_decoder.send(None)
|
|
|
|
|
register_session(session_id, wire_decoder)
|
|
|
|
|
encode_session_open_message(session_id, write_target)
|
|
|
|
|
def _handle_open_session():
|
|
|
|
|
session_id = open_session()
|
|
|
|
|
wire_decoder = decode_wire_stream(_handle_registered_type, session_id)
|
|
|
|
|
wire_decoder.send(None)
|
|
|
|
|
register_session(session_id, wire_decoder)
|
|
|
|
|
encode_session_open_message(session_id, report_writer)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_close_session(write_target):
|
|
|
|
|
while True:
|
|
|
|
|
session_id = yield
|
|
|
|
|
close_session(session_id)
|
|
|
|
|
encode_session_close_message(session_id, write_target)
|
|
|
|
|
def _handle_close_session(session_id):
|
|
|
|
|
close_session(session_id)
|
|
|
|
|
encode_session_close_message(session_id, report_writer)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _handle_unknown_session():
|
|
|
|
|
while True:
|
|
|
|
|
yield # TODO
|
|
|
|
|
def _handle_unknown_session(session_id, report_data):
|
|
|
|
|
pass # TODO
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _dispatch_and_build_protobuf(msg_type, data_len, session_id, exp_types, future):
|
|
|
|
|