Fix after rebase

This commit is contained in:
NIC619 2019-07-30 17:31:08 +08:00
parent e53727d301
commit c4105688d1
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17
13 changed files with 52 additions and 51 deletions

View File

@ -59,8 +59,8 @@ class IHost(ABC):
# stream will decide which protocol_id to run on # stream will decide which protocol_id to run on
@abstractmethod @abstractmethod
async def new_stream(self, async def new_stream(self,
peer_id: ID, peer_id: ID,
protocol_ids: Sequence[str]) -> INetStream: protocol_ids: Sequence[str]) -> INetStream:
""" """
:param peer_id: peer_id that host is connecting :param peer_id: peer_id that host is connecting
:param protocol_ids: protocol ids that stream can run on :param protocol_ids: protocol ids that stream can run on

View File

@ -3,7 +3,6 @@ from abc import (
abstractmethod, abstractmethod,
) )
from typing import ( from typing import (
Any,
Awaitable, Awaitable,
Callable, Callable,
Dict, Dict,
@ -59,8 +58,8 @@ class INetwork(ABC):
@abstractmethod @abstractmethod
async def new_stream(self, async def new_stream(self,
peer_id: ID, peer_id: ID,
protocol_ids: Sequence[str]) -> INetStream: protocol_ids: Sequence[str]) -> INetStream:
""" """
:param peer_id: peer_id of destination :param peer_id: peer_id of destination
:param protocol_ids: available protocol ids to use for stream :param protocol_ids: available protocol ids to use for stream

View File

@ -1,7 +1,4 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import (
Any,
)
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn

View File

@ -1,6 +1,5 @@
import asyncio import asyncio
from typing import ( from typing import (
Any,
Awaitable, Awaitable,
Callable, Callable,
Dict, Dict,

View File

@ -25,7 +25,8 @@ class ID:
def __init__(self, id_str: str) -> None: def __init__(self, id_str: str) -> None:
self._id_str = id_str self._id_str = id_str
def to_bytes(self) -> bytes: # FIXME: Should return type `bytes`
def to_bytes(self) -> str:
return self._id_str return self._id_str
def get_raw_id(self) -> str: def get_raw_id(self) -> str:

View File

@ -51,8 +51,7 @@ class FloodSub(IPubsubRouter):
:param peer_id: id of peer to remove :param peer_id: id of peer to remove
""" """
# FIXME: Should be changed to type 'peer.ID' async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None:
async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: str) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed

View File

@ -6,7 +6,7 @@ from typing import (
Dict, Dict,
Iterable, Iterable,
List, List,
MutableSet, Set,
Sequence, Sequence,
) )
@ -145,8 +145,7 @@ class GossipSub(IPubsubRouter):
if peer_id_str in self.peers_gossipsub: if peer_id_str in self.peers_gossipsub:
self.peers_floodsub.remove(peer_id_str) self.peers_floodsub.remove(peer_id_str)
# FIXME: type of `sender_peer_id` should be changed to `ID` async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: ID) -> None:
async def handle_rpc(self, rpc: rpc_pb2.Message, sender_peer_id: str) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed
@ -154,21 +153,21 @@ class GossipSub(IPubsubRouter):
:param sender_peer_id: id of the peer who sent the message :param sender_peer_id: id of the peer who sent the message
""" """
control_message = rpc.control control_message = rpc.control
sender_peer_id = str(sender_peer_id) sender_peer_id_str = str(sender_peer_id)
# Relay each rpc control message to the appropriate handler # Relay each rpc control message to the appropriate handler
if control_message.ihave: if control_message.ihave:
for ihave in control_message.ihave: for ihave in control_message.ihave:
await self.handle_ihave(ihave, sender_peer_id) await self.handle_ihave(ihave, sender_peer_id_str)
if control_message.iwant: if control_message.iwant:
for iwant in control_message.iwant: for iwant in control_message.iwant:
await self.handle_iwant(iwant, sender_peer_id) await self.handle_iwant(iwant, sender_peer_id_str)
if control_message.graft: if control_message.graft:
for graft in control_message.graft: for graft in control_message.graft:
await self.handle_graft(graft, sender_peer_id) await self.handle_graft(graft, sender_peer_id_str)
if control_message.prune: if control_message.prune:
for prune in control_message.prune: for prune in control_message.prune:
await self.handle_prune(prune, sender_peer_id) await self.handle_prune(prune, sender_peer_id_str)
async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
@ -203,7 +202,8 @@ class GossipSub(IPubsubRouter):
:param origin: peer id of the peer the message originate from. :param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to. :return: a generator of the peer ids who we send data to.
""" """
send_to: MutableSet[ID] = set() # pylint: disable=len-as-condition
send_to: Set[ID] = set()
for topic in topic_ids: for topic in topic_ids:
if topic not in self.pubsub.peer_topics: if topic not in self.pubsub.peer_topics:
continue continue
@ -228,7 +228,6 @@ class GossipSub(IPubsubRouter):
# I assume there could be short periods between heartbeats where topic may not # I assume there could be short periods between heartbeats where topic may not
# be but we should check that this path gets hit appropriately # be but we should check that this path gets hit appropriately
# pylint: disable=len-as-condition
if (topic not in self.fanout) or (len(self.fanout[topic]) == 0): if (topic not in self.fanout) or (len(self.fanout[topic]) == 0):
# If no peers in fanout, choose some peers from gossipsub peers in topic. # If no peers in fanout, choose some peers from gossipsub peers in topic.
self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus( self.fanout[topic] = self._get_in_topic_gossipsub_peers_from_minus(
@ -480,11 +479,13 @@ class GossipSub(IPubsubRouter):
return selection return selection
# FIXME: type of `minus` should be changed to type `Sequence[ID]`
# FIXME: return type should be changed to type `List[ID]`
def _get_in_topic_gossipsub_peers_from_minus( def _get_in_topic_gossipsub_peers_from_minus(
self, self,
topic: str, topic: str,
num_to_select: int, num_to_select: int,
minus: Sequence[ID]) -> List[ID]: minus: Sequence[str]) -> List[str]:
gossipsub_peers_in_topic = [ gossipsub_peers_in_topic = [
peer_str peer_str
for peer_str in self.pubsub.peer_topics[topic] for peer_str in self.pubsub.peer_topics[topic]

View File

@ -29,23 +29,23 @@ def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]:
class Pubsub: class Pubsub:
# pylint: disable=too-many-instance-attributes, no-member # pylint: disable=too-many-instance-attributes, no-member, unsubscriptable-object
host: IHost host: IHost
my_id: ID my_id: ID
router: 'IPubsubRouter' router: 'IPubsubRouter'
peer_queue: asyncio.Queue peer_queue: asyncio.Queue[ID]
protocols: List[str] protocols: List[str]
incoming_msgs_from_peers: asyncio.Queue incoming_msgs_from_peers: asyncio.Queue[rpc_pb2.Message]
outgoing_messages: asyncio.Queue outgoing_messages: asyncio.Queue[rpc_pb2.Message]
seen_messages: LRU seen_messages: LRU
my_topics: Dict[str, asyncio.Queue] my_topics: Dict[str, asyncio.Queue[rpc_pb2.Message]]
# FIXME: Should be changed to `Dict[str, List[ID]]` # FIXME: Should be changed to `Dict[str, List[ID]]`
peer_topics: Dict[str, List[str]] peer_topics: Dict[str, List[str]]
@ -214,9 +214,8 @@ class Pubsub:
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
# FIXME: type of `origin_id` should be changed to `ID`
# FIXME: `sub_message` can be further type hinted with mypy_protobuf # FIXME: `sub_message` can be further type hinted with mypy_protobuf
def handle_subscription(self, origin_id: str, sub_message: Any) -> None: def handle_subscription(self, origin_id: ID, sub_message: Any) -> None:
""" """
Handle an incoming subscription message from a peer. Update internal Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as mapping to mark the peer as subscribed or unsubscribed to topics as
@ -224,17 +223,17 @@ class Pubsub:
:param origin_id: id of the peer who subscribe to the message :param origin_id: id of the peer who subscribe to the message
:param sub_message: RPC.SubOpts :param sub_message: RPC.SubOpts
""" """
origin_id = str(origin_id) origin_id_str = str(origin_id)
if sub_message.subscribe: if sub_message.subscribe:
if sub_message.topicid not in self.peer_topics: if sub_message.topicid not in self.peer_topics:
self.peer_topics[sub_message.topicid] = [origin_id] self.peer_topics[sub_message.topicid] = [origin_id_str]
elif origin_id not in self.peer_topics[sub_message.topicid]: elif origin_id_str not in self.peer_topics[sub_message.topicid]:
# Add peer to topic # Add peer to topic
self.peer_topics[sub_message.topicid].append(origin_id) self.peer_topics[sub_message.topicid].append(origin_id_str)
else: else:
if sub_message.topicid in self.peer_topics: if sub_message.topicid in self.peer_topics:
if origin_id in self.peer_topics[sub_message.topicid]: if origin_id_str in self.peer_topics[sub_message.topicid]:
self.peer_topics[sub_message.topicid].remove(origin_id) self.peer_topics[sub_message.topicid].remove(origin_id_str)
# FIXME(mhchia): Change the function name? # FIXME(mhchia): Change the function name?
# FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf # FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf
@ -252,7 +251,7 @@ class Pubsub:
# for each topic # for each topic
await self.my_topics[topic].put(publish_message) await self.my_topics[topic].put(publish_message)
async def subscribe(self, topic_id: str) -> asyncio.Queue: async def subscribe(self, topic_id: str) -> asyncio.Queue[rpc_pb2.Message]:
""" """
Subscribe ourself to a topic Subscribe ourself to a topic
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
@ -374,6 +373,6 @@ class Pubsub:
self.seen_messages[msg_id] = 1 self.seen_messages[msg_id] = 1
def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool:
if len(self.my_topics) == 0: if not bool(self.my_topics):
return False return False
return all([topic in self.my_topics for topic in msg.topicIDs]) return all([topic in self.my_topics for topic in msg.topicIDs])

View File

@ -4,17 +4,18 @@ from multiaddr import Multiaddr
from libp2p.network.network_interface import INetwork from libp2p.network.network_interface import INetwork
from libp2p.network.notifee_interface import INotifee from libp2p.network.notifee_interface import INotifee
from libp2p.peer.id import ID
from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn from libp2p.stream_muxer.muxed_connection_interface import IMuxedConn
from libp2p.network.stream.net_stream_interface import INetStream from libp2p.network.stream.net_stream_interface import INetStream
class PubsubNotifee(INotifee): class PubsubNotifee(INotifee):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop # pylint: disable=too-many-instance-attributes, cell-var-from-loop, unsubscriptable-object
initiator_peers_queue: asyncio.Queue initiator_peers_queue: asyncio.Queue[ID]
def __init__(self, initiator_peers_queue: asyncio.Queue) -> None: def __init__(self, initiator_peers_queue: asyncio.Queue[ID]) -> None:
""" """
:param initiator_peers_queue: queue to add new peers to so that pubsub :param initiator_peers_queue: queue to add new peers to so that pubsub
can process new peers after we connect to them can process new peers after we connect to them

View File

@ -41,9 +41,8 @@ class IPubsubRouter(ABC):
:param peer_id: id of peer to remove :param peer_id: id of peer to remove
""" """
# FIXME: Should be changed to type 'peer.ID'
@abstractmethod @abstractmethod
async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: str) -> None: async def handle_rpc(self, rpc: rpc_pb2.ControlMessage, sender_peer_id: ID) -> None:
""" """
Invoked to process control messages in the RPC envelope. Invoked to process control messages in the RPC envelope.
It is invoked after subscriptions and payload messages have been processed It is invoked after subscriptions and payload messages have been processed

View File

@ -1,8 +1,8 @@
from abc import ABC, abstractmethod from abc import (
from typing import ( ABC,
Any, abstractmethod,
Iterable,
) )
from typing import Iterable
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo from libp2p.peer.peerinfo import PeerInfo

View File

@ -9,7 +9,6 @@ from libp2p.kademlia.kad_peerinfo import (
) )
from libp2p.kademlia.network import KademliaServer from libp2p.kademlia.network import KademliaServer
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo
from libp2p.routing.interfaces import IPeerRouting from libp2p.routing.interfaces import IPeerRouting

View File

@ -1,11 +1,18 @@
from abc import ABC, abstractmethod from abc import (
ABC,
abstractmethod,
)
from typing import (
TYPE_CHECKING,
)
from libp2p.stream_muxer.mplex.mplex import Mplex if TYPE_CHECKING:
from libp2p.stream_muxer.mplex.mplex import Mplex
class IMuxedStream(ABC): class IMuxedStream(ABC):
mplex_conn: Mplex mplex_conn: 'Mplex'
@abstractmethod @abstractmethod
def read(self): def read(self):