Add typing and notes in pubsub

This commit is contained in:
mhchia 2019-07-24 14:54:30 +08:00
parent 1e78c21eca
commit d6c19e71a6
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
2 changed files with 69 additions and 15 deletions

View File

@ -1,3 +1,7 @@
from libp2p.peer.id import (
ID,
)
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_router_interface import IPubsubRouter from .pubsub_router_interface import IPubsubRouter
@ -42,7 +46,7 @@ class FloodSub(IPubsubRouter):
:param rpc: rpc message :param rpc: rpc message
""" """
async def publish(self, sender_peer_id, rpc_message): async def publish(self, from_peer: ID, pubsub_message: rpc_pb2.Message) -> None:
""" """
Invoked to forward a new message that has been validated. Invoked to forward a new message that has been validated.
This is where the "flooding" part of floodsub happens This is where the "flooding" part of floodsub happens

View File

@ -1,16 +1,62 @@
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
import asyncio import asyncio
import time
from typing import (
Any,
Dict,
List,
Sequence,
Tuple,
)
from lru import LRU from lru import LRU
from libp2p.host.host_interface import (
IHost,
)
from libp2p.peer.id import (
ID,
)
from libp2p.network.stream.net_stream_interface import (
INetStream,
)
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
from .pubsub_router_interface import (
IPubsubRouter,
)
class Pubsub(): def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]:
# NOTE: `string(from, seqno)` in Go
return (msg.seqno, msg.from_id)
class Pubsub:
# pylint: disable=too-many-instance-attributes, no-member # pylint: disable=too-many-instance-attributes, no-member
def __init__(self, host, router, my_id, cache_size=None): host: IHost
my_id: ID
router: IPubsubRouter
peer_queue: asyncio.Queue
protocols: Sequence[str]
incoming_msgs_from_peers: asyncio.Queue()
outgoing_messages: asyncio.Queue()
seen_messages: LRU
my_topics: Dict[str, asyncio.Queue]
peer_topics: Dict[str, List[ID]]
# FIXME: Should be changed to `Dict[ID, INetStream]`
peers: Dict[str, INetStream]
# NOTE: Be sure it is increased atomically everytime.
counter: int # uint64
def __init__(
self,
host: IHost,
router: IPubsubRouter,
my_id: ID,
cache_size: int = None) -> None:
""" """
Construct a new Pubsub object, which is responsible for handling all Construct a new Pubsub object, which is responsible for handling all
Pubsub-related messages and relaying messages as appropriate to the Pubsub-related messages and relaying messages as appropriate to the
@ -57,10 +103,12 @@ class Pubsub():
# Create peers map, which maps peer_id (as string) to stream (to a given peer) # Create peers map, which maps peer_id (as string) to stream (to a given peer)
self.peers = {} self.peers = {}
self.counter = time.time_ns()
# Call handle peer to keep waiting for updates to peer queue # Call handle peer to keep waiting for updates to peer queue
asyncio.ensure_future(self.handle_peer_queue()) asyncio.ensure_future(self.handle_peer_queue())
def get_hello_packet(self): def get_hello_packet(self) -> bytes:
""" """
Generate subscription message with all topics we are subscribed to Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics only send hello packet if we have subscribed topics
@ -73,7 +121,7 @@ class Pubsub():
return packet.SerializeToString() return packet.SerializeToString()
async def continuously_read_stream(self, stream): async def continuously_read_stream(self, stream: INetStream) -> None:
""" """
Read from input stream in an infinite loop. Process Read from input stream in an infinite loop. Process
messages from other nodes messages from other nodes
@ -120,7 +168,7 @@ class Pubsub():
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
async def stream_handler(self, stream): async def stream_handler(self, stream: INetStream) -> None:
""" """
Stream handler for pubsub. Gets invoked whenever a new stream is created Stream handler for pubsub. Gets invoked whenever a new stream is created
on one of the supported pubsub protocols. on one of the supported pubsub protocols.
@ -139,7 +187,7 @@ class Pubsub():
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continuously_read_stream(stream)) asyncio.ensure_future(self.continuously_read_stream(stream))
async def handle_peer_queue(self): async def handle_peer_queue(self) -> None:
""" """
Continuously read from peer queue and each time a new peer is found, Continuously read from peer queue and each time a new peer is found,
open a stream to the peer using a supported pubsub protocol open a stream to the peer using a supported pubsub protocol
@ -170,7 +218,8 @@ class Pubsub():
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
def handle_subscription(self, origin_id, sub_message): # FIXME: `sub_message` can be further type hinted with mypy_protobuf
def handle_subscription(self, origin_id: ID, sub_message: Any) -> None:
""" """
Handle an incoming subscription message from a peer. Update internal Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as mapping to mark the peer as subscribed or unsubscribed to topics as
@ -189,7 +238,9 @@ class Pubsub():
if origin_id in self.peer_topics[sub_message.topicid]: if origin_id in self.peer_topics[sub_message.topicid]:
self.peer_topics[sub_message.topicid].remove(origin_id) self.peer_topics[sub_message.topicid].remove(origin_id)
async def handle_talk(self, publish_message): # FIXME(mhchia): Change the function name?
# FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf
async def handle_talk(self, publish_message: Any) -> None:
""" """
Put incoming message from a peer onto my blocking queue Put incoming message from a peer onto my blocking queue
:param talk: RPC.Message format :param talk: RPC.Message format
@ -203,7 +254,7 @@ class Pubsub():
# for each topic # for each topic
await self.my_topics[topic].put(publish_message) await self.my_topics[topic].put(publish_message)
async def subscribe(self, topic_id): async def subscribe(self, topic_id: str) -> asyncio.Queue:
""" """
Subscribe ourself to a topic Subscribe ourself to a topic
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
@ -232,7 +283,7 @@ class Pubsub():
# Return the asyncio queue for messages on this topic # Return the asyncio queue for messages on this topic
return self.my_topics[topic_id] return self.my_topics[topic_id]
async def unsubscribe(self, topic_id): async def unsubscribe(self, topic_id: str) -> None:
""" """
Unsubscribe ourself from a topic Unsubscribe ourself from a topic
:param topic_id: topic_id to unsubscribe from :param topic_id: topic_id to unsubscribe from
@ -257,15 +308,14 @@ class Pubsub():
# Tell router we are leaving this topic # Tell router we are leaving this topic
await self.router.leave(topic_id) await self.router.leave(topic_id)
async def message_all_peers(self, rpc_msg): # FIXME: `rpc_msg` can be further type hinted with mypy_protobuf
async def message_all_peers(self, rpc_msg: Any) -> None:
""" """
Broadcast a message to peers Broadcast a message to peers
:param raw_msg: raw contents of the message to broadcast :param raw_msg: raw contents of the message to broadcast
""" """
# Broadcast message # Broadcast message
for peer in self.peers: for _, stream in self.peers.items():
stream = self.peers[peer]
# Write message to stream # Write message to stream
await stream.write(rpc_msg) await stream.write(rpc_msg)