Merge pull request #72 from zixuanzh/stream-id

Improved stream IDs
This commit is contained in:
Robert Zajac 2018-11-25 15:29:57 -05:00 committed by GitHub
commit 4439229632
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 9 deletions

View File

@ -59,9 +59,7 @@ class Swarm(INetwork):
# Use muxed conn to open stream, which returns # Use muxed conn to open stream, which returns
# a muxed stream # a muxed stream
# TODO: use better stream IDs muxed_stream = await muxed_conn.open_stream(protocol_id, peer_id, multiaddr)
stream_id = (uuid.uuid4().int & (1<<64)-1) >> 3
muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, multiaddr)
# Create a net stream # Create a net stream
net_stream = NetStream(muxed_stream) net_stream = NetStream(muxed_stream)

View File

@ -16,16 +16,24 @@ class Mplex(IMuxedConn):
""" """
self.raw_conn = conn self.raw_conn = conn
self.initiator = initiator self.initiator = initiator
# Mapping from stream ID -> buffer of messages for that stream
self.buffers = {} self.buffers = {}
self.streams = {}
self.stream_queue = asyncio.Queue() self.stream_queue = asyncio.Queue()
self.conn_lock = asyncio.Lock() self.conn_lock = asyncio.Lock()
self._next_id = 0
# The initiator need not read upon construction time. # The initiator need not read upon construction time.
# It should read when the user decides that it wants to read from the constructed stream. # It should read when the user decides that it wants to read from the constructed stream.
if not initiator: if not initiator:
asyncio.ensure_future(self.handle_incoming()) asyncio.ensure_future(self.handle_incoming())
def _next_stream_id(self):
next_id = self._next_id
self._next_id += 1
return next_id
def close(self): def close(self):
""" """
close the stream muxer and underlying raw connection close the stream muxer and underlying raw connection
@ -49,7 +57,7 @@ class Mplex(IMuxedConn):
self.buffers[stream_id] = bytearray() self.buffers[stream_id] = bytearray()
return data return data
def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): async def open_stream(self, protocol_id, peer_id, multi_addr):
""" """
creates a new muxed_stream creates a new muxed_stream
:param protocol_id: protocol_id of stream :param protocol_id: protocol_id of stream
@ -58,8 +66,9 @@ class Mplex(IMuxedConn):
:param multi_addr: multi_addr that stream connects to :param multi_addr: multi_addr that stream connects to
:return: a new stream :return: a new stream
""" """
stream_id = self._next_stream_id()
stream = MplexStream(stream_id, multi_addr, self) stream = MplexStream(stream_id, multi_addr, self)
self.streams[stream_id] = stream self.buffers[stream_id] = bytearray()
return stream return stream
async def accept_stream(self): async def accept_stream(self):

View File

@ -68,7 +68,7 @@ class MplexStream(IMuxedStream):
if remote_lock: if remote_lock:
async with self.mplex_conn.conn_lock: async with self.mplex_conn.conn_lock:
self.mplex_conn.streams.pop(self.stream_id) self.mplex_conn.buffers.pop(self.stream_id)
return True return True
@ -91,7 +91,7 @@ class MplexStream(IMuxedStream):
self.remote_closed = True self.remote_closed = True
async with self.mplex_conn.conn_lock: async with self.mplex_conn.conn_lock:
self.mplex_conn.streams.pop(self.stream_id, None) self.mplex_conn.buffers.pop(self.stream_id, None)
return True return True

View File

@ -23,7 +23,7 @@ class IMuxedConn(ABC):
pass pass
@abstractmethod @abstractmethod
def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): def open_stream(self, protocol_id, peer_id, multi_addr):
""" """
creates a new muxed_stream creates a new muxed_stream
:param protocol_id: protocol_id of stream :param protocol_id: protocol_id of stream