Apply PR feedback:

- fix await stream close/reset
- make `_handle_dead_peer` a sync function
This commit is contained in:
NIC619 2019-11-05 14:27:06 +08:00
parent d36e323703
commit eeb87848af
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17

View File

@ -253,10 +253,10 @@ class Pubsub:
try:
await self.continuously_read_stream(stream)
except StreamEOF as error:
stream.close()
await stream.close()
logger.debug("fail to read from peer %s, error=%s", peer_id, error)
except (ParseError, IncompleteReadError, StreamReset) as error:
stream.reset()
await stream.reset()
logger.debug("read corrupted data from peer %s, error=%s", peer_id, error)
async def _handle_new_peer(self, peer_id: ID) -> None:
@ -281,7 +281,7 @@ class Pubsub:
logger.debug("added new peer %s", peer_id)
async def _handle_dead_peer(self, peer_id: ID) -> None:
def _handle_dead_peer(self, peer_id: ID) -> None:
del self.peers[peer_id]
for topic in self.peer_topics:
@ -300,14 +300,9 @@ class Pubsub:
pubsub protocols we support
"""
while True:
peer_id: ID = await self.peer_queue.get()
# Add Peer
asyncio.ensure_future(self._handle_new_peer(peer_id))
# Force context switch
await asyncio.sleep(0)
async def handle_dead_peer_queue(self) -> None:
"""
@ -315,14 +310,9 @@ class Pubsub:
remove peer info from pubsub and pubsub router.
"""
while True:
peer_id: ID = await self.dead_peer_queue.get()
# Remove Peer
asyncio.ensure_future(self._handle_dead_peer(peer_id))
# Force context switch
await asyncio.sleep(0)
self._handle_dead_peer(peer_id)
def handle_subscription(
self, origin_id: ID, sub_message: rpc_pb2.RPC.SubOpts