Add dead_peer_queue to pubsub

This commit is contained in:
NIC619 2019-11-04 14:22:24 +08:00
parent 3a0c7d06d1
commit c6c9393f2b
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17
2 changed files with 14 additions and 2 deletions

View File

@ -60,6 +60,7 @@ class Pubsub:
router: "IPubsubRouter" router: "IPubsubRouter"
peer_queue: "asyncio.Queue[ID]" peer_queue: "asyncio.Queue[ID]"
dead_peer_queue: "asyncio.Queue[ID]"
protocols: List[TProtocol] protocols: List[TProtocol]
@ -100,7 +101,10 @@ class Pubsub:
# Register a notifee # Register a notifee
self.peer_queue = asyncio.Queue() self.peer_queue = asyncio.Queue()
self.host.get_network().register_notifee(PubsubNotifee(self.peer_queue)) self.dead_peer_queue = asyncio.Queue()
self.host.get_network().register_notifee(
PubsubNotifee(self.peer_queue, self.dead_peer_queue)
)
# Register stream handlers for each pubsub router protocol to handle # Register stream handlers for each pubsub router protocol to handle
# the pubsub streams opened on those protocols # the pubsub streams opened on those protocols

View File

@ -15,13 +15,21 @@ if TYPE_CHECKING:
class PubsubNotifee(INotifee): class PubsubNotifee(INotifee):
initiator_peers_queue: "asyncio.Queue[ID]" initiator_peers_queue: "asyncio.Queue[ID]"
dead_peers_queue: "asyncio.Queue[ID]"
def __init__(self, initiator_peers_queue: "asyncio.Queue[ID]") -> None: def __init__(
self,
initiator_peers_queue: "asyncio.Queue[ID]",
dead_peers_queue: "asyncio.Queue[ID]",
) -> None:
""" """
:param initiator_peers_queue: queue to add new peers to so that pubsub :param initiator_peers_queue: queue to add new peers to so that pubsub
can process new peers after we connect to them can process new peers after we connect to them
:param dead_peers_queue: queue to add dead peers to so that pubsub
can process dead peers after we disconnect from each other
""" """
self.initiator_peers_queue = initiator_peers_queue self.initiator_peers_queue = initiator_peers_queue
self.dead_peers_queue = dead_peers_queue
async def opened_stream(self, network: INetwork, stream: INetStream) -> None: async def opened_stream(self, network: INetwork, stream: INetStream) -> None:
pass pass