diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 126e4c2..8b10eb4 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -25,7 +25,7 @@ class FloodSub(IPubsubRouter): pubsub: Pubsub def __init__(self, protocols: Sequence[str]) -> None: - self.protocols = protocols + self.protocols = list(protocols) self.pubsub = None def get_protocols(self) -> List[str]: @@ -42,19 +42,21 @@ class FloodSub(IPubsubRouter): """ self.pubsub = pubsub - def add_peer(self, peer_id: ID, protocol_id: str): + # FIXME: Should be changed to type 'peer.ID' + def add_peer(self, peer_id: str, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add """ - def remove_peer(self, peer_id: ID): + def remove_peer(self, peer_id: ID) -> None: """ Notifies the router that a peer has been disconnected :param peer_id: id of peer to remove """ - async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID): + # FIXME: Should be changed to type 'peer.ID' + async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: str) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed @@ -90,7 +92,7 @@ class FloodSub(IPubsubRouter): # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 await stream.write(rpc_msg.SerializeToString()) - async def join(self, topic: str): + async def join(self, topic: str) -> None: """ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the @@ -98,7 +100,7 @@ class FloodSub(IPubsubRouter): :param topic: topic to join """ - async def leave(self, topic: str): + async def leave(self, topic: str) -> None: """ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement. diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 0c9bde9..c10dc76 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -27,7 +27,7 @@ class GossipSub(IPubsubRouter): # pylint: disable=too-many-instance-attributes # pylint: disable=too-many-public-methods - protocols: Sequence[str] + protocols: List[str] pubsub: Pubsub degree: int @@ -62,8 +62,8 @@ class GossipSub(IPubsubRouter): gossip_history: int=5, heartbeat_interval: int=120) -> None: # pylint: disable=too-many-arguments - self.protocols: List[str] = protocols - self.pubsub: Pubsub = None + self.protocols = list(protocols) + self.pubsub = None # Store target degree, upper degree bound, and lower degree bound self.degree = degree @@ -71,7 +71,7 @@ class GossipSub(IPubsubRouter): self.degree_high = degree_high # Store time to live (for topics in fanout) - self.time_to_live: int = time_to_live + self.time_to_live = time_to_live # Create topic --> list of peers mappings self.mesh = {} @@ -91,7 +91,7 @@ class GossipSub(IPubsubRouter): # Interface functions - def get_protocols(self) -> List: + def get_protocols(self) -> List[str]: """ :return: the list of protocols supported by the router """ @@ -109,7 +109,8 @@ class GossipSub(IPubsubRouter): # TODO: Start after delay asyncio.ensure_future(self.heartbeat()) - def add_peer(self, peer_id: ID, protocol_id: str): + # FIXME: Shoudl be changed to type 'peer.ID' + def add_peer(self, peer_id: str, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add @@ -133,7 +134,7 @@ class GossipSub(IPubsubRouter): self.peers_to_protocol.remove(peer_id_str) # FIXME: type of `sender_peer_id` should be changed to `ID` - async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: str): + async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: str) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed @@ -300,7 +301,7 @@ class GossipSub(IPubsubRouter): peers: List[str], msg_sender: str, origin_id: str, - serialized_packet: bytes): + serialized_packet: bytes) -> None: for peer_id_in_topic in peers: # Forward to all peers that are not the # message sender and are not the message origin @@ -358,7 +359,7 @@ class GossipSub(IPubsubRouter): if num_mesh_peers_in_topic > self.degree_high: # Select |mesh[topic]| - D peers from mesh[topic] # FIXME: Should be changed to `List[ID]` - selected_peers: List[str] = GossipSub.select_from_minus( + selected_peers = GossipSub.select_from_minus( num_mesh_peers_in_topic - self.degree, self.mesh[topic], [], @@ -378,7 +379,7 @@ class GossipSub(IPubsubRouter): if self.time_since_last_publish[topic] > self.time_to_live: # Remove topic from fanout del self.fanout[topic] - self.time_since_last_publish.remove(topic) + del self.time_since_last_publish[topic] else: num_fanout_peers_in_topic = len(self.fanout[topic]) @@ -393,7 +394,7 @@ class GossipSub(IPubsubRouter): # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) - async def gossip_heartbeat(self): + async def gossip_heartbeat(self) -> None: # pylint: disable=too-many-nested-blocks for topic in self.mesh: msg_ids = self.mcache.window(topic) @@ -412,14 +413,14 @@ class GossipSub(IPubsubRouter): # TODO: this line is a monster, can hopefully be simplified if (topic not in self.mesh or (peer not in self.mesh[topic]))\ and (topic not in self.fanout or (peer not in self.fanout[topic])): - msg_ids: List[str] = [str(msg) for msg in msg_ids] - await self.emit_ihave(topic, msg_ids, peer) + msg_id_strs = [str(msg_id) for msg_id in msg_ids] + await self.emit_ihave(topic, msg_id_strs, peer) # TODO: Refactor and Dedup. This section is the roughly the same as the above. # Do the same for fanout, for all topics not already hit in mesh for topic in self.fanout: if topic not in self.mesh: - msg_ids: List[str] = self.mcache.window(topic) + msg_ids = self.mcache.window(topic) if msg_ids: # TODO: Make more efficient, possibly using a generator? # Get all pubsub peers in topic and only add if they are gossipsub peers also @@ -433,8 +434,8 @@ class GossipSub(IPubsubRouter): for peer in peers_to_emit_ihave_to: if peer not in self.mesh[topic] and peer not in self.fanout[topic]: - msg_ids: List[str] = [str(msg) for msg in msg_ids] - await self.emit_ihave(topic, msg_ids, peer) + msg_id_strs = [str(msg) for msg in msg_ids] + await self.emit_ihave(topic, msg_id_strs, peer) self.mcache.shift() @@ -453,7 +454,7 @@ class GossipSub(IPubsubRouter): selection_pool: List[Any] = [x for x in pool if x not in minus] else: # Don't create a new selection_pool if we are not subbing anything - selection_pool: List[Any] = pool + selection_pool = list(pool) # If num_to_select > size(selection_pool), then return selection_pool (which has the most # possible elements s.t. the number of elements is less than num_to_select) @@ -518,7 +519,7 @@ class GossipSub(IPubsubRouter): # FIXME: Update type of message ID msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs] - msgs_to_forward: List = [] + msgs_to_forward: List[rpc_pb2.Message] = [] for msg_id_iwant in msg_ids: # Check if the wanted message ID is present in mcache msg: rpc_pb2.Message = self.mcache.get(msg_id_iwant) diff --git a/libp2p/pubsub/mcache.py b/libp2p/pubsub/mcache.py index 42d954c..ef140c5 100644 --- a/libp2p/pubsub/mcache.py +++ b/libp2p/pubsub/mcache.py @@ -2,38 +2,41 @@ from typing import ( Dict, List, Optional, + Sequence, Tuple, ) from .pb import rpc_pb2 +class CacheEntry: + # pylint: disable=too-few-public-methods + + mid: Tuple[bytes, bytes] + topics: List[str] + + """ + A logical representation of an entry in the mcache's _history_. + """ + def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None: + """ + Constructor. + :param mid: (seqno, from_id) of the msg + :param topics: list of topics this message was sent on + """ + self.mid = mid + self.topics = list(topics) + + class MessageCache: - class CacheEntry: - # pylint: disable=too-few-public-methods - - mid: Tuple[bytes, bytes] - topics: List[str] - - """ - A logical representation of an entry in the mcache's _history_. - """ - def __init__(self, mid: Tuple[bytes, bytes], topics: List[str]) -> None: - """ - Constructor. - :param mid: (seqno, from_id) of the msg - :param topics: list of topics this message was sent on - """ - self.mid = mid - self.topics = topics window_size: int history_size: int msgs: Dict[Tuple[bytes, bytes], rpc_pb2.Message] - history = List[List[CacheEntry]] + history: List[List[CacheEntry]] def __init__(self, window_size: int, history_size: int) -> None: """ @@ -50,10 +53,10 @@ class MessageCache: # max length of history_size. each item is a list of CacheEntry. # messages lost upon shift(). - self.history = [] - - for _ in range(history_size): - self.history.append([]) + self.history = [ + [] + for _ in range(history_size) + ] def put(self, msg: rpc_pb2.Message) -> None: """ @@ -63,10 +66,7 @@ class MessageCache: mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id) self.msgs[mid] = msg - if not self.history[0]: - self.history[0] = [] - - self.history[0].append(self.CacheEntry(mid, msg.topicIDs)) + self.history[0].append(CacheEntry(mid, msg.topicIDs)) def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]: """ diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7643bef..f63fa6a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -22,8 +22,8 @@ from libp2p.host.host_interface import ( from libp2p.peer.id import ( ID, ) -from libp2p.network.stream.net_stream_interface import ( - INetStream, +from libp2p.network.stream.net_stream import ( + NetStream, ) @@ -109,7 +109,7 @@ class Pubsub: self.peer_topics = {} # Create peers map, which maps peer_id (as string) to stream (to a given peer) - # FIXME: Should be changed to `Dict[ID, INetStream]` + # FIXME: Should be changed to `Dict[ID, NetStream]` self.peers = {} self.counter = time.time_ns() @@ -130,7 +130,7 @@ class Pubsub: return packet.SerializeToString() - async def continuously_read_stream(self, stream: INetStream) -> None: + async def continuously_read_stream(self, stream: NetStream) -> None: """ Read from input stream in an infinite loop. Process messages from other nodes @@ -168,7 +168,7 @@ class Pubsub: # Force context switch await asyncio.sleep(0) - async def stream_handler(self, stream: INetStream) -> None: + async def stream_handler(self, stream: NetStream) -> None: """ Stream handler for pubsub. Gets invoked whenever a new stream is created on one of the supported pubsub protocols. @@ -196,12 +196,13 @@ class Pubsub: """ while True: - peer_id: ID = await self.peer_queue.get() + # FIXME: Should be changed to type 'ID' + peer_id: str = await self.peer_queue.get() # Open a stream to peer on existing connection # (we know connection exists since that's the only way # an element gets added to peer_queue) - stream: INetStream = await self.host.new_stream(peer_id, self.protocols) + stream: NetStream = await self.host.new_stream(peer_id, self.protocols) # Add Peer # Map peer to stream diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 1fa3b41..2798bb1 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -1,12 +1,12 @@ +import asyncio from typing import ( - List, Sequence, ) from multiaddr import Multiaddr -from libp2p.network.connection.raw_connection import ( - RawConnection, +from libp2p.stream_muxer.mplex.mplex import ( + Mplex, ) from libp2p.network.notifee_interface import ( INotifee, @@ -17,21 +17,20 @@ from libp2p.network.network_interface import ( from libp2p.network.stream.net_stream_interface import ( INetStream, ) -from libp2p.peer.id import ( - ID, -) + class PubsubNotifee(INotifee): # pylint: disable=too-many-instance-attributes, cell-var-from-loop - initiator_peers_queue: List[ID] + # FIXME: Should be changed to type 'peer.ID' + initiator_peers_queue: asyncio.Queue[str] - def __init__(self, initiator_peers_queue: Sequence[ID]) -> None: + def __init__(self, initiator_peers_queue: asyncio.Queue[str]) -> None: """ :param initiator_peers_queue: queue to add new peers to so that pubsub can process new peers after we connect to them """ - self.initiator_peers_queue = List(initiator_peers_queue) + self.initiator_peers_queue = initiator_peers_queue async def opened_stream(self, network: INetwork, stream: INetStream) -> None: pass @@ -39,7 +38,7 @@ class PubsubNotifee(INotifee): async def closed_stream(self, network: INetwork, stream: INetStream) -> None: pass - async def connected(self, network: INetwork, conn: RawConnection) -> None: + async def connected(self, network: INetwork, conn: Mplex) -> None: """ Add peer_id to initiator_peers_queue, so that this peer_id can be used to create a stream and we only want to have one pubsub stream with each peer. @@ -52,7 +51,7 @@ class PubsubNotifee(INotifee): if conn.initiator: await self.initiator_peers_queue.put(conn.peer_id) - async def disconnected(self, network: INetwork, conn: RawConnection) -> None: + async def disconnected(self, network: INetwork, conn: Mplex) -> None: pass async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None: diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index e0787e2..14ad2f0 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -25,8 +25,9 @@ class IPubsubRouter(ABC): :param pubsub: pubsub instance to attach to """ + # FIXME: Should be changed to type 'peer.ID' @abstractmethod - def add_peer(self, peer_id: ID, protocol_id: str) -> None: + def add_peer(self, peer_id: str, protocol_id: str) -> None: """ Notifies the router that a new peer has been connected :param peer_id: id of peer to add @@ -39,8 +40,9 @@ class IPubsubRouter(ABC): :param peer_id: id of peer to remove """ + # FIXME: Should be changed to type 'peer.ID' @abstractmethod - def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None: + async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: str) -> None: """ Invoked to process control messages in the RPC envelope. It is invoked after subscriptions and payload messages have been processed @@ -50,8 +52,9 @@ class IPubsubRouter(ABC): :param rpc: rpc message """ + # FIXME: Should be changed to type 'peer.ID' @abstractmethod - async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message): + async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: """ Invoked to forward a new message that has been validated :param msg_forwarder: peer_id of message sender @@ -59,7 +62,7 @@ class IPubsubRouter(ABC): """ @abstractmethod - def join(self, topic: str) -> None: + async def join(self, topic: str) -> None: """ Join notifies the router that we want to receive and forward messages in a topic. It is invoked after the @@ -68,7 +71,7 @@ class IPubsubRouter(ABC): """ @abstractmethod - def leave(self, topic: str) -> None: + async def leave(self, topic: str) -> None: """ Leave notifies the router that we are no longer interested in a topic. It is invoked after the unsubscription announcement.