Fix type hints except pb msg in pubsub folder

This commit is contained in:
NIC619 2019-07-27 11:27:47 +08:00
parent a0aa105867
commit b2f496d081
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17
6 changed files with 79 additions and 73 deletions

View File

@ -25,7 +25,7 @@ class FloodSub(IPubsubRouter):
pubsub: Pubsub pubsub: Pubsub
def __init__(self, protocols: Sequence[str]) -> None: def __init__(self, protocols: Sequence[str]) -> None:
self.protocols = protocols self.protocols = list(protocols)
self.pubsub = None self.pubsub = None
def get_protocols(self) -> List[str]: def get_protocols(self) -> List[str]:
@ -42,19 +42,21 @@ class FloodSub(IPubsubRouter):
""" """
self.pubsub = pubsub self.pubsub = pubsub
def add_peer(self, peer_id: ID, protocol_id: str): # FIXME: Should be changed to type 'peer.ID'
def add_peer(self, peer_id: str, protocol_id: str) -> None:
""" """
Notifies the router that a new peer has been connected Notifies the router that a new peer has been connected
:param peer_id: id of peer to add :param peer_id: id of peer to add
""" """
def remove_peer(self, peer_id: ID): def remove_peer(self, peer_id: ID) -> None:
""" """
Notifies the router that a peer has been disconnected Notifies the router that a peer has been disconnected
:param peer_id: id of peer to remove :param peer_id: id of peer to remove
""" """
async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID): # FIXME: Should be changed to type 'peer.ID'
async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: str) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
@ -90,7 +92,7 @@ class FloodSub(IPubsubRouter):
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107
await stream.write(rpc_msg.SerializeToString()) await stream.write(rpc_msg.SerializeToString())
async def join(self, topic: str): async def join(self, topic: str) -> None:
""" """
Join notifies the router that we want to receive and Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the forward messages in a topic. It is invoked after the
@ -98,7 +100,7 @@ class FloodSub(IPubsubRouter):
:param topic: topic to join :param topic: topic to join
""" """
async def leave(self, topic: str): async def leave(self, topic: str) -> None:
""" """
Leave notifies the router that we are no longer interested in a topic. Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement. It is invoked after the unsubscription announcement.

View File

@ -27,7 +27,7 @@ class GossipSub(IPubsubRouter):
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods # pylint: disable=too-many-public-methods
protocols: Sequence[str] protocols: List[str]
pubsub: Pubsub pubsub: Pubsub
degree: int degree: int
@ -62,8 +62,8 @@ class GossipSub(IPubsubRouter):
gossip_history: int=5, gossip_history: int=5,
heartbeat_interval: int=120) -> None: heartbeat_interval: int=120) -> None:
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
self.protocols: List[str] = protocols self.protocols = list(protocols)
self.pubsub: Pubsub = None self.pubsub = None
# Store target degree, upper degree bound, and lower degree bound # Store target degree, upper degree bound, and lower degree bound
self.degree = degree self.degree = degree
@ -71,7 +71,7 @@ class GossipSub(IPubsubRouter):
self.degree_high = degree_high self.degree_high = degree_high
# Store time to live (for topics in fanout) # Store time to live (for topics in fanout)
self.time_to_live: int = time_to_live self.time_to_live = time_to_live
# Create topic --> list of peers mappings # Create topic --> list of peers mappings
self.mesh = {} self.mesh = {}
@ -91,7 +91,7 @@ class GossipSub(IPubsubRouter):
# Interface functions # Interface functions
def get_protocols(self) -> List: def get_protocols(self) -> List[str]:
""" """
:return: the list of protocols supported by the router :return: the list of protocols supported by the router
""" """
@ -109,7 +109,8 @@ class GossipSub(IPubsubRouter):
# TODO: Start after delay # TODO: Start after delay
asyncio.ensure_future(self.heartbeat()) asyncio.ensure_future(self.heartbeat())
def add_peer(self, peer_id: ID, protocol_id: str): # FIXME: Shoudl be changed to type 'peer.ID'
def add_peer(self, peer_id: str, protocol_id: str) -> None:
""" """
Notifies the router that a new peer has been connected Notifies the router that a new peer has been connected
:param peer_id: id of peer to add :param peer_id: id of peer to add
@ -133,7 +134,7 @@ class GossipSub(IPubsubRouter):
self.peers_to_protocol.remove(peer_id_str) self.peers_to_protocol.remove(peer_id_str)
# FIXME: type of `sender_peer_id` should be changed to `ID` # FIXME: type of `sender_peer_id` should be changed to `ID`
async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: str): async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: str) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
@ -300,7 +301,7 @@ class GossipSub(IPubsubRouter):
peers: List[str], peers: List[str],
msg_sender: str, msg_sender: str,
origin_id: str, origin_id: str,
serialized_packet: bytes): serialized_packet: bytes) -> None:
for peer_id_in_topic in peers: for peer_id_in_topic in peers:
# Forward to all peers that are not the # Forward to all peers that are not the
# message sender and are not the message origin # message sender and are not the message origin
@ -358,7 +359,7 @@ class GossipSub(IPubsubRouter):
if num_mesh_peers_in_topic > self.degree_high: if num_mesh_peers_in_topic > self.degree_high:
# Select |mesh[topic]| - D peers from mesh[topic] # Select |mesh[topic]| - D peers from mesh[topic]
# FIXME: Should be changed to `List[ID]` # FIXME: Should be changed to `List[ID]`
selected_peers: List[str] = GossipSub.select_from_minus( selected_peers = GossipSub.select_from_minus(
num_mesh_peers_in_topic - self.degree, num_mesh_peers_in_topic - self.degree,
self.mesh[topic], self.mesh[topic],
[], [],
@ -378,7 +379,7 @@ class GossipSub(IPubsubRouter):
if self.time_since_last_publish[topic] > self.time_to_live: if self.time_since_last_publish[topic] > self.time_to_live:
# Remove topic from fanout # Remove topic from fanout
del self.fanout[topic] del self.fanout[topic]
self.time_since_last_publish.remove(topic) del self.time_since_last_publish[topic]
else: else:
num_fanout_peers_in_topic = len(self.fanout[topic]) num_fanout_peers_in_topic = len(self.fanout[topic])
@ -393,7 +394,7 @@ class GossipSub(IPubsubRouter):
# Add the peers to fanout[topic] # Add the peers to fanout[topic]
self.fanout[topic].extend(selected_peers) self.fanout[topic].extend(selected_peers)
async def gossip_heartbeat(self): async def gossip_heartbeat(self) -> None:
# pylint: disable=too-many-nested-blocks # pylint: disable=too-many-nested-blocks
for topic in self.mesh: for topic in self.mesh:
msg_ids = self.mcache.window(topic) msg_ids = self.mcache.window(topic)
@ -412,14 +413,14 @@ class GossipSub(IPubsubRouter):
# TODO: this line is a monster, can hopefully be simplified # TODO: this line is a monster, can hopefully be simplified
if (topic not in self.mesh or (peer not in self.mesh[topic]))\ if (topic not in self.mesh or (peer not in self.mesh[topic]))\
and (topic not in self.fanout or (peer not in self.fanout[topic])): and (topic not in self.fanout or (peer not in self.fanout[topic])):
msg_ids: List[str] = [str(msg) for msg in msg_ids] msg_id_strs = [str(msg_id) for msg_id in msg_ids]
await self.emit_ihave(topic, msg_ids, peer) await self.emit_ihave(topic, msg_id_strs, peer)
# TODO: Refactor and Dedup. This section is the roughly the same as the above. # TODO: Refactor and Dedup. This section is the roughly the same as the above.
# Do the same for fanout, for all topics not already hit in mesh # Do the same for fanout, for all topics not already hit in mesh
for topic in self.fanout: for topic in self.fanout:
if topic not in self.mesh: if topic not in self.mesh:
msg_ids: List[str] = self.mcache.window(topic) msg_ids = self.mcache.window(topic)
if msg_ids: if msg_ids:
# TODO: Make more efficient, possibly using a generator? # TODO: Make more efficient, possibly using a generator?
# Get all pubsub peers in topic and only add if they are gossipsub peers also # Get all pubsub peers in topic and only add if they are gossipsub peers also
@ -433,8 +434,8 @@ class GossipSub(IPubsubRouter):
for peer in peers_to_emit_ihave_to: for peer in peers_to_emit_ihave_to:
if peer not in self.mesh[topic] and peer not in self.fanout[topic]: if peer not in self.mesh[topic] and peer not in self.fanout[topic]:
msg_ids: List[str] = [str(msg) for msg in msg_ids] msg_id_strs = [str(msg) for msg in msg_ids]
await self.emit_ihave(topic, msg_ids, peer) await self.emit_ihave(topic, msg_id_strs, peer)
self.mcache.shift() self.mcache.shift()
@ -453,7 +454,7 @@ class GossipSub(IPubsubRouter):
selection_pool: List[Any] = [x for x in pool if x not in minus] selection_pool: List[Any] = [x for x in pool if x not in minus]
else: else:
# Don't create a new selection_pool if we are not subbing anything # Don't create a new selection_pool if we are not subbing anything
selection_pool: List[Any] = pool selection_pool = list(pool)
# If num_to_select > size(selection_pool), then return selection_pool (which has the most # If num_to_select > size(selection_pool), then return selection_pool (which has the most
# possible elements s.t. the number of elements is less than num_to_select) # possible elements s.t. the number of elements is less than num_to_select)
@ -518,7 +519,7 @@ class GossipSub(IPubsubRouter):
# FIXME: Update type of message ID # FIXME: Update type of message ID
msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs] msg_ids: List[Any] = [literal_eval(msg) for msg in iwant_msg.messageIDs]
msgs_to_forward: List = [] msgs_to_forward: List[rpc_pb2.Message] = []
for msg_id_iwant in msg_ids: for msg_id_iwant in msg_ids:
# Check if the wanted message ID is present in mcache # Check if the wanted message ID is present in mcache
msg: rpc_pb2.Message = self.mcache.get(msg_id_iwant) msg: rpc_pb2.Message = self.mcache.get(msg_id_iwant)

View File

@ -2,38 +2,41 @@ from typing import (
Dict, Dict,
List, List,
Optional, Optional,
Sequence,
Tuple, Tuple,
) )
from .pb import rpc_pb2 from .pb import rpc_pb2
class CacheEntry:
# pylint: disable=too-few-public-methods
mid: Tuple[bytes, bytes]
topics: List[str]
"""
A logical representation of an entry in the mcache's _history_.
"""
def __init__(self, mid: Tuple[bytes, bytes], topics: Sequence[str]) -> None:
"""
Constructor.
:param mid: (seqno, from_id) of the msg
:param topics: list of topics this message was sent on
"""
self.mid = mid
self.topics = list(topics)
class MessageCache: class MessageCache:
class CacheEntry:
# pylint: disable=too-few-public-methods
mid: Tuple[bytes, bytes]
topics: List[str]
"""
A logical representation of an entry in the mcache's _history_.
"""
def __init__(self, mid: Tuple[bytes, bytes], topics: List[str]) -> None:
"""
Constructor.
:param mid: (seqno, from_id) of the msg
:param topics: list of topics this message was sent on
"""
self.mid = mid
self.topics = topics
window_size: int window_size: int
history_size: int history_size: int
msgs: Dict[Tuple[bytes, bytes], rpc_pb2.Message] msgs: Dict[Tuple[bytes, bytes], rpc_pb2.Message]
history = List[List[CacheEntry]] history: List[List[CacheEntry]]
def __init__(self, window_size: int, history_size: int) -> None: def __init__(self, window_size: int, history_size: int) -> None:
""" """
@ -50,10 +53,10 @@ class MessageCache:
# max length of history_size. each item is a list of CacheEntry. # max length of history_size. each item is a list of CacheEntry.
# messages lost upon shift(). # messages lost upon shift().
self.history = [] self.history = [
[]
for _ in range(history_size): for _ in range(history_size)
self.history.append([]) ]
def put(self, msg: rpc_pb2.Message) -> None: def put(self, msg: rpc_pb2.Message) -> None:
""" """
@ -63,10 +66,7 @@ class MessageCache:
mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id) mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id)
self.msgs[mid] = msg self.msgs[mid] = msg
if not self.history[0]: self.history[0].append(CacheEntry(mid, msg.topicIDs))
self.history[0] = []
self.history[0].append(self.CacheEntry(mid, msg.topicIDs))
def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]: def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]:
""" """

View File

@ -22,8 +22,8 @@ from libp2p.host.host_interface import (
from libp2p.peer.id import ( from libp2p.peer.id import (
ID, ID,
) )
from libp2p.network.stream.net_stream_interface import ( from libp2p.network.stream.net_stream import (
INetStream, NetStream,
) )
@ -109,7 +109,7 @@ class Pubsub:
self.peer_topics = {} self.peer_topics = {}
# Create peers map, which maps peer_id (as string) to stream (to a given peer) # Create peers map, which maps peer_id (as string) to stream (to a given peer)
# FIXME: Should be changed to `Dict[ID, INetStream]` # FIXME: Should be changed to `Dict[ID, NetStream]`
self.peers = {} self.peers = {}
self.counter = time.time_ns() self.counter = time.time_ns()
@ -130,7 +130,7 @@ class Pubsub:
return packet.SerializeToString() return packet.SerializeToString()
async def continuously_read_stream(self, stream: INetStream) -> None: async def continuously_read_stream(self, stream: NetStream) -> None:
""" """
Read from input stream in an infinite loop. Process Read from input stream in an infinite loop. Process
messages from other nodes messages from other nodes
@ -168,7 +168,7 @@ class Pubsub:
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
async def stream_handler(self, stream: INetStream) -> None: async def stream_handler(self, stream: NetStream) -> None:
""" """
Stream handler for pubsub. Gets invoked whenever a new stream is created Stream handler for pubsub. Gets invoked whenever a new stream is created
on one of the supported pubsub protocols. on one of the supported pubsub protocols.
@ -196,12 +196,13 @@ class Pubsub:
""" """
while True: while True:
peer_id: ID = await self.peer_queue.get() # FIXME: Should be changed to type 'ID'
peer_id: str = await self.peer_queue.get()
# Open a stream to peer on existing connection # Open a stream to peer on existing connection
# (we know connection exists since that's the only way # (we know connection exists since that's the only way
# an element gets added to peer_queue) # an element gets added to peer_queue)
stream: INetStream = await self.host.new_stream(peer_id, self.protocols) stream: NetStream = await self.host.new_stream(peer_id, self.protocols)
# Add Peer # Add Peer
# Map peer to stream # Map peer to stream

View File

@ -1,12 +1,12 @@
import asyncio
from typing import ( from typing import (
List,
Sequence, Sequence,
) )
from multiaddr import Multiaddr from multiaddr import Multiaddr
from libp2p.network.connection.raw_connection import ( from libp2p.stream_muxer.mplex.mplex import (
RawConnection, Mplex,
) )
from libp2p.network.notifee_interface import ( from libp2p.network.notifee_interface import (
INotifee, INotifee,
@ -17,21 +17,20 @@ from libp2p.network.network_interface import (
from libp2p.network.stream.net_stream_interface import ( from libp2p.network.stream.net_stream_interface import (
INetStream, INetStream,
) )
from libp2p.peer.id import (
ID,
)
class PubsubNotifee(INotifee): class PubsubNotifee(INotifee):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop # pylint: disable=too-many-instance-attributes, cell-var-from-loop
initiator_peers_queue: List[ID] # FIXME: Should be changed to type 'peer.ID'
initiator_peers_queue: asyncio.Queue[str]
def __init__(self, initiator_peers_queue: Sequence[ID]) -> None: def __init__(self, initiator_peers_queue: asyncio.Queue[str]) -> None:
""" """
:param initiator_peers_queue: queue to add new peers to so that pubsub :param initiator_peers_queue: queue to add new peers to so that pubsub
can process new peers after we connect to them can process new peers after we connect to them
""" """
self.initiator_peers_queue = List(initiator_peers_queue) self.initiator_peers_queue = initiator_peers_queue
async def opened_stream(self, network: INetwork, stream: INetStream) -> None: async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
pass pass
@ -39,7 +38,7 @@ class PubsubNotifee(INotifee):
async def closed_stream(self, network: INetwork, stream: INetStream) -> None: async def closed_stream(self, network: INetwork, stream: INetStream) -> None:
pass pass
async def connected(self, network: INetwork, conn: RawConnection) -> None: async def connected(self, network: INetwork, conn: Mplex) -> None:
""" """
Add peer_id to initiator_peers_queue, so that this peer_id can be used to Add peer_id to initiator_peers_queue, so that this peer_id can be used to
create a stream and we only want to have one pubsub stream with each peer. create a stream and we only want to have one pubsub stream with each peer.
@ -52,7 +51,7 @@ class PubsubNotifee(INotifee):
if conn.initiator: if conn.initiator:
await self.initiator_peers_queue.put(conn.peer_id) await self.initiator_peers_queue.put(conn.peer_id)
async def disconnected(self, network: INetwork, conn: RawConnection) -> None: async def disconnected(self, network: INetwork, conn: Mplex) -> None:
pass pass
async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None: async def listen(self, network: INetwork, multiaddr: Multiaddr) -> None:

View File

@ -25,8 +25,9 @@ class IPubsubRouter(ABC):
:param pubsub: pubsub instance to attach to :param pubsub: pubsub instance to attach to
""" """
# FIXME: Should be changed to type 'peer.ID'
@abstractmethod @abstractmethod
def add_peer(self, peer_id: ID, protocol_id: str) -> None: def add_peer(self, peer_id: str, protocol_id: str) -> None:
""" """
Notifies the router that a new peer has been connected Notifies the router that a new peer has been connected
:param peer_id: id of peer to add :param peer_id: id of peer to add
@ -39,8 +40,9 @@ class IPubsubRouter(ABC):
:param peer_id: id of peer to remove :param peer_id: id of peer to remove
""" """
# FIXME: Should be changed to type 'peer.ID'
@abstractmethod @abstractmethod
def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None: async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: str) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
@ -50,8 +52,9 @@ class IPubsubRouter(ABC):
:param rpc: rpc message :param rpc: rpc message
""" """
# FIXME: Should be changed to type 'peer.ID'
@abstractmethod @abstractmethod
async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message): async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
""" """
Invoked to forward a new message that has been validated Invoked to forward a new message that has been validated
:param msg_forwarder: peer_id of message sender :param msg_forwarder: peer_id of message sender
@ -59,7 +62,7 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
def join(self, topic: str) -> None: async def join(self, topic: str) -> None:
""" """
Join notifies the router that we want to receive and Join notifies the router that we want to receive and
forward messages in a topic. It is invoked after the forward messages in a topic. It is invoked after the
@ -68,7 +71,7 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
def leave(self, topic: str) -> None: async def leave(self, topic: str) -> None:
""" """
Leave notifies the router that we are no longer interested in a topic. Leave notifies the router that we are no longer interested in a topic.
It is invoked after the unsubscription announcement. It is invoked after the unsubscription announcement.