This commit is contained in:
NIC619 2019-11-09 23:55:35 +08:00
parent 5dfa29a0df
commit b8c7f0cfff
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17

View File

@ -32,6 +32,7 @@ from .validators import signature_validator
if TYPE_CHECKING: if TYPE_CHECKING:
from .pubsub_router_interface import IPubsubRouter # noqa: F401 from .pubsub_router_interface import IPubsubRouter # noqa: F401
from typing import Any # noqa: F401
logger = logging.getLogger("libp2p.pubsub") logger = logging.getLogger("libp2p.pubsub")
@ -177,7 +178,11 @@ class Pubsub:
logger.debug( logger.debug(
"received `publish` message %s from peer %s", msg, peer_id "received `publish` message %s from peer %s", msg, peer_id
) )
self._tasks.append(asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg))) self._tasks.append(
asyncio.ensure_future(
self.push_msg(msg_forwarder=peer_id, msg=msg)
)
)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions:
# deal with RPC.subscriptions # deal with RPC.subscriptions
@ -311,10 +316,8 @@ class Pubsub:
self._tasks.append(asyncio.ensure_future(self._handle_new_peer(peer_id))) self._tasks.append(asyncio.ensure_future(self._handle_new_peer(peer_id)))
async def handle_dead_peer_queue(self) -> None: async def handle_dead_peer_queue(self) -> None:
""" """Continuously read from dead peer queue and close the stream between
Continuously read from dead peer queue and close the stream between that peer and that peer and remove peer info from pubsub and pubsub router."""
remove peer info from pubsub and pubsub router.
"""
while True: while True:
peer_id: ID = await self.dead_peer_queue.get() peer_id: ID = await self.dead_peer_queue.get()
# Remove Peer # Remove Peer