diff --git a/network/swarm.py b/network/swarm.py index 41ef9bc..dc1f67e 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -59,9 +59,7 @@ class Swarm(INetwork): # Use muxed conn to open stream, which returns # a muxed stream - # TODO: use better stream IDs - stream_id = (uuid.uuid4().int & (1<<64)-1) >> 3 - muxed_stream = muxed_conn.open_stream(protocol_id, stream_id, peer_id, multiaddr) + muxed_stream = await muxed_conn.open_stream(protocol_id, peer_id, multiaddr) # Create a net stream net_stream = NetStream(muxed_stream) diff --git a/stream_muxer/mplex/mplex.py b/stream_muxer/mplex/mplex.py index 516dce2..2d677d5 100644 --- a/stream_muxer/mplex/mplex.py +++ b/stream_muxer/mplex/mplex.py @@ -16,16 +16,24 @@ class Mplex(IMuxedConn): """ self.raw_conn = conn self.initiator = initiator + + # Mapping from stream ID -> buffer of messages for that stream self.buffers = {} - self.streams = {} + self.stream_queue = asyncio.Queue() self.conn_lock = asyncio.Lock() + self._next_id = 0 # The initiator need not read upon construction time. # It should read when the user decides that it wants to read from the constructed stream. if not initiator: asyncio.ensure_future(self.handle_incoming()) + def _next_stream_id(self): + next_id = self._next_id + self._next_id += 1 + return next_id + def close(self): """ close the stream muxer and underlying raw connection @@ -49,7 +57,7 @@ class Mplex(IMuxedConn): self.buffers[stream_id] = bytearray() return data - def open_stream(self, protocol_id, stream_id, peer_id, multi_addr): + async def open_stream(self, protocol_id, peer_id, multi_addr): """ creates a new muxed_stream :param protocol_id: protocol_id of stream @@ -58,8 +66,9 @@ class Mplex(IMuxedConn): :param multi_addr: multi_addr that stream connects to :return: a new stream """ + stream_id = self._next_stream_id() stream = MplexStream(stream_id, multi_addr, self) - self.streams[stream_id] = stream + self.buffers[stream_id] = bytearray() return stream async def accept_stream(self):