diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 4b16020..949b929 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,3 +1,7 @@ +from libp2p.peer.id import ( + ID, +) + from .pb import rpc_pb2 from .pubsub_router_interface import IPubsubRouter @@ -42,7 +46,7 @@ class FloodSub(IPubsubRouter): :param rpc: rpc message """ - async def publish(self, sender_peer_id, rpc_message): + async def publish(self, from_peer: ID, pubsub_message: rpc_pb2.Message) -> None: """ Invoked to forward a new message that has been validated. This is where the "flooding" part of floodsub happens diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index f80ed97..7afdc2a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,16 +1,62 @@ # pylint: disable=no-name-in-module import asyncio +import time +from typing import ( + Any, + Dict, + List, + Sequence, + Tuple, +) from lru import LRU +from libp2p.host.host_interface import ( + IHost, +) +from libp2p.peer.id import ( + ID, +) +from libp2p.network.stream.net_stream_interface import ( + INetStream, +) + from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee +from .pubsub_router_interface import ( + IPubsubRouter, +) -class Pubsub(): +def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: + # NOTE: `string(from, seqno)` in Go + return (msg.seqno, msg.from_id) + + +class Pubsub: # pylint: disable=too-many-instance-attributes, no-member - def __init__(self, host, router, my_id, cache_size=None): + host: IHost + my_id: ID + router: IPubsubRouter + peer_queue: asyncio.Queue + protocols: Sequence[str] + incoming_msgs_from_peers: asyncio.Queue() + outgoing_messages: asyncio.Queue() + seen_messages: LRU + my_topics: Dict[str, asyncio.Queue] + peer_topics: Dict[str, List[ID]] + # FIXME: Should be changed to `Dict[ID, INetStream]` + peers: Dict[str, INetStream] + # NOTE: Be sure it is increased atomically everytime. + counter: int # uint64 + + def __init__( + self, + host: IHost, + router: IPubsubRouter, + my_id: ID, + cache_size: int = None) -> None: """ Construct a new Pubsub object, which is responsible for handling all Pubsub-related messages and relaying messages as appropriate to the @@ -57,10 +103,12 @@ class Pubsub(): # Create peers map, which maps peer_id (as string) to stream (to a given peer) self.peers = {} + self.counter = time.time_ns() + # Call handle peer to keep waiting for updates to peer queue asyncio.ensure_future(self.handle_peer_queue()) - def get_hello_packet(self): + def get_hello_packet(self) -> bytes: """ Generate subscription message with all topics we are subscribed to only send hello packet if we have subscribed topics @@ -73,7 +121,7 @@ class Pubsub(): return packet.SerializeToString() - async def continuously_read_stream(self, stream): + async def continuously_read_stream(self, stream: INetStream) -> None: """ Read from input stream in an infinite loop. Process messages from other nodes @@ -120,7 +168,7 @@ class Pubsub(): # Force context switch await asyncio.sleep(0) - async def stream_handler(self, stream): + async def stream_handler(self, stream: INetStream) -> None: """ Stream handler for pubsub. Gets invoked whenever a new stream is created on one of the supported pubsub protocols. @@ -139,7 +187,7 @@ class Pubsub(): # Pass stream off to stream reader asyncio.ensure_future(self.continuously_read_stream(stream)) - async def handle_peer_queue(self): + async def handle_peer_queue(self) -> None: """ Continuously read from peer queue and each time a new peer is found, open a stream to the peer using a supported pubsub protocol @@ -170,7 +218,8 @@ class Pubsub(): # Force context switch await asyncio.sleep(0) - def handle_subscription(self, origin_id, sub_message): + # FIXME: `sub_message` can be further type hinted with mypy_protobuf + def handle_subscription(self, origin_id: ID, sub_message: Any) -> None: """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as @@ -189,7 +238,9 @@ class Pubsub(): if origin_id in self.peer_topics[sub_message.topicid]: self.peer_topics[sub_message.topicid].remove(origin_id) - async def handle_talk(self, publish_message): + # FIXME(mhchia): Change the function name? + # FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf + async def handle_talk(self, publish_message: Any) -> None: """ Put incoming message from a peer onto my blocking queue :param talk: RPC.Message format @@ -203,7 +254,7 @@ class Pubsub(): # for each topic await self.my_topics[topic].put(publish_message) - async def subscribe(self, topic_id): + async def subscribe(self, topic_id: str) -> asyncio.Queue: """ Subscribe ourself to a topic :param topic_id: topic_id to subscribe to @@ -232,7 +283,7 @@ class Pubsub(): # Return the asyncio queue for messages on this topic return self.my_topics[topic_id] - async def unsubscribe(self, topic_id): + async def unsubscribe(self, topic_id: str) -> None: """ Unsubscribe ourself from a topic :param topic_id: topic_id to unsubscribe from @@ -257,15 +308,14 @@ class Pubsub(): # Tell router we are leaving this topic await self.router.leave(topic_id) - async def message_all_peers(self, rpc_msg): + # FIXME: `rpc_msg` can be further type hinted with mypy_protobuf + async def message_all_peers(self, rpc_msg: Any) -> None: """ Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast """ # Broadcast message - for peer in self.peers: - stream = self.peers[peer] - + for _, stream in self.peers.items(): # Write message to stream await stream.write(rpc_msg)