Pubsub: handle_talk

- Change from async function to sync
- Change the name to `notify_subscriptions`, which is clearer.
This commit is contained in:
mhchia 2020-02-05 21:44:33 +08:00
parent 5b03a7ad9f
commit ddbedc6c15
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
2 changed files with 4 additions and 5 deletions

View File

@ -361,8 +361,7 @@ class Pubsub(Service, IPubsub):
if origin_id in self.peer_topics[sub_message.topicid]: if origin_id in self.peer_topics[sub_message.topicid]:
self.peer_topics[sub_message.topicid].discard(origin_id) self.peer_topics[sub_message.topicid].discard(origin_id)
# FIXME(mhchia): Change the function name? def notify_subscriptions(self, publish_message: rpc_pb2.Message) -> None:
async def handle_talk(self, publish_message: rpc_pb2.Message) -> None:
""" """
Put incoming message from a peer onto my blocking queue. Put incoming message from a peer onto my blocking queue.
@ -576,7 +575,7 @@ class Pubsub(Service, IPubsub):
return return
self._mark_msg_seen(msg) self._mark_msg_seen(msg)
await self.handle_talk(msg) self.notify_subscriptions(msg)
await self.router.publish(msg_forwarder, msg) await self.router.publish(msg_forwarder, msg)
def _next_seqno(self) -> bytes: def _next_seqno(self) -> bytes:

View File

@ -378,14 +378,14 @@ async def test_handle_talk():
data=b"1234", data=b"1234",
seqno=b"\x00" * 8, seqno=b"\x00" * 8,
) )
await pubsubs_fsub[0].handle_talk(msg_0) pubsubs_fsub[0].notify_subscriptions(msg_0)
msg_1 = make_pubsub_msg( msg_1 = make_pubsub_msg(
origin_id=pubsubs_fsub[0].my_id, origin_id=pubsubs_fsub[0].my_id,
topic_ids=["NOT_SUBSCRIBED"], topic_ids=["NOT_SUBSCRIBED"],
data=b"1234", data=b"1234",
seqno=b"\x11" * 8, seqno=b"\x11" * 8,
) )
await pubsubs_fsub[0].handle_talk(msg_1) pubsubs_fsub[0].notify_subscriptions(msg_1)
assert ( assert (
len(pubsubs_fsub[0].topic_ids) == 1 len(pubsubs_fsub[0].topic_ids) == 1
and sub == pubsubs_fsub[0].subscribed_topics_receive[TESTING_TOPIC] and sub == pubsubs_fsub[0].subscribed_topics_receive[TESTING_TOPIC]