From d83b6ef459865d7141e5b8cfc3dd292cd1bd6147 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Tue, 20 Nov 2018 21:46:18 -0500 Subject: [PATCH] mplex stream close reset done --- stream_muxer/mplex/mplex.py | 1 + stream_muxer/mplex/mplex_stream.py | 63 +++++++++++++++++------------- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/stream_muxer/mplex/mplex.py b/stream_muxer/mplex/mplex.py index 02e6c3d..516dce2 100644 --- a/stream_muxer/mplex/mplex.py +++ b/stream_muxer/mplex/mplex.py @@ -19,6 +19,7 @@ class Mplex(IMuxedConn): self.buffers = {} self.streams = {} self.stream_queue = asyncio.Queue() + self.conn_lock = asyncio.Lock() # The initiator need not read upon construction time. # It should read when the user decides that it wants to read from the constructed stream. diff --git a/stream_muxer/mplex/mplex_stream.py b/stream_muxer/mplex/mplex_stream.py index ebf77e2..f310a34 100644 --- a/stream_muxer/mplex/mplex_stream.py +++ b/stream_muxer/mplex/mplex_stream.py @@ -1,3 +1,4 @@ +import asyncio from .constants import HEADER_TAGS from ..muxed_stream_interface import IMuxedStream @@ -7,22 +8,21 @@ class MplexStream(IMuxedStream): reference: https://github.com/libp2p/go-mplex/blob/master/stream.go """ - def __init__(self, stream_id, initiator, muxed_conn): + def __init__(self, stream_id, initiator, mplex_conn): """ create new MuxedStream in muxer :param stream_id: stream stream id :param initiator: boolean if this is an initiator - :param muxed_conn: muxed connection of this muxed_stream + :param mplex_conn: muxed connection of this muxed_stream """ self.stream_id = stream_id self.initiator = initiator - self.muxed_conn = muxed_conn - + self.mplex_conn = mplex_conn self.read_deadline = None self.write_deadline = None - self.local_closed = False self.remote_closed = False + self.stream_lock = asyncio.Lock() def get_flag(self, action): """ @@ -40,50 +40,57 @@ class MplexStream(IMuxedStream): read messages associated with stream from buffer til end of file :return: bytes of input """ - return await self.muxed_conn.read_buffer(self.stream_id) + return await self.mplex_conn.read_buffer(self.stream_id) async def write(self, data): """ write to stream :return: number of bytes written """ - return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id) + return await self.mplex_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id) async def close(self): """ - close stream + Closing a stream closes it for writing and closes the remote end for reading + but allows writing in the other direction. :return: true if successful """ + #TODO error handling with timeout + await self.mplex_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) - if self.local_closed and self.remote_closed: - return True + remote_lock = "" + async with self.stream_lock: + if self.local_closed: + return True + self.local_closed = True + remote_lock = self.remote_closed - await self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) - self.muxed_conn.streams.pop(self.stream_id) - - self.local_closed = True - self.remote_closed = True + if remote_lock: + async with self.mplex_conn.conn_lock: + self.mplex_conn.streams.pop(self.stream_id) return True - def reset(self): + async def reset(self): """ closes both ends of the stream tells this remote side to hang up :return: true if successful """ - # TODO behavior not fully understood - pass - # if self.local_closed and self.remote_closed: - # return True - # - # self.muxed_conn.send_message(self.get_flag("RESET"), None, self.id) - # self.muxed_conn.streams.pop(self.id, None) - # - # self.local_closed = True - # self.remote_closed = True - # - # return True + async with self.stream_lock: + if self.remote_closed and self.local_closed: + return True + + if not self.remote_closed: + await self.mplex_conn.send_message(self.get_flag("RESET"), None, self.stream_id) + + self.local_closed = True + self.remote_closed = True + + async with self.mplex_conn.conn_lock: + self.mplex_conn.streams.pop(self.stream_id, None) + + return True # TODO deadline not in use def set_deadline(self, ttl):