diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 41cd965..af5c41d 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,6 +1,5 @@ import functools import logging -import math import time from typing import TYPE_CHECKING, Dict, KeysView, List, NamedTuple, Set, Tuple, cast @@ -32,6 +31,9 @@ if TYPE_CHECKING: from typing import Any # noqa: F401 +# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/40e1c94708658b155f30cf99e4574f384756d83c/topic.go#L97 # noqa: E501 +SUBSCRIPTION_CHANNEL_SIZE = 32 + logger = logging.getLogger("libp2p.pubsub") @@ -373,7 +375,13 @@ class Pubsub(Service, IPubsub): # we are subscribed to a topic this message was sent for, # so add message to the subscription output queue # for each topic - await self.subscribed_topics_send[topic].send(publish_message) + try: + self.subscribed_topics_send[topic].send_nowait(publish_message) + except trio.WouldBlock: + # Channel is full, ignore this message. + logger.warning( + "fail to deliver message to subscription for topic %s", topic + ) async def subscribe(self, topic_id: str) -> ISubscriptionAPI: """ @@ -389,7 +397,7 @@ class Pubsub(Service, IPubsub): return self.subscribed_topics_receive[topic_id] send_channel, receive_channel = trio.open_memory_channel[rpc_pb2.Message]( - math.inf + SUBSCRIPTION_CHANNEL_SIZE ) subscription = TrioSubscriptionAPI( diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 1e9d670..6a22008 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -6,7 +6,7 @@ import trio from libp2p.exceptions import ValidationError from libp2p.pubsub.pb import rpc_pb2 -from libp2p.pubsub.pubsub import PUBSUB_SIGNING_PREFIX +from libp2p.pubsub.pubsub import PUBSUB_SIGNING_PREFIX, SUBSCRIPTION_CHANNEL_SIZE from libp2p.tools.constants import MAX_READ_LEN from libp2p.tools.factories import IDFactory, PubsubFactory, net_stream_pair_factory from libp2p.tools.pubsub.utils import make_pubsub_msg @@ -444,6 +444,38 @@ async def test_subscribe_and_publish(): nursery.start_soon(publish_data, TESTING_TOPIC) +@pytest.mark.trio +async def test_subscribe_and_publish_full_channel(): + async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub: + pubsub = pubsubs_fsub[0] + + extra_data_0 = b"extra_data_0" + extra_data_1 = b"extra_data_1" + + # Test: Subscription channel is of size `SUBSCRIPTION_CHANNEL_SIZE`. + # When the channel is full, new received messages are dropped. + # After the channel has empty slot, the channel can receive new messages. + + # Assume `SUBSCRIPTION_CHANNEL_SIZE` is smaller than `2**(4*8)`. + list_data = [i.to_bytes(4, "big") for i in range(SUBSCRIPTION_CHANNEL_SIZE)] + # Expect `extra_data_0` is dropped and `extra_data_1` is appended. + expected_list_data = list_data + [extra_data_1] + + subscription = await pubsub.subscribe(TESTING_TOPIC) + for data in list_data: + await pubsub.publish(TESTING_TOPIC, data) + + # Publish `extra_data_0` which should be dropped since the channel is already full. + await pubsub.publish(TESTING_TOPIC, extra_data_0) + # Consume a message and there is an empty slot in the channel. + assert (await subscription.get()).data == expected_list_data.pop(0) + # Publish `extra_data_1` which should be appended to the channel. + await pubsub.publish(TESTING_TOPIC, extra_data_1) + + for expected_data in expected_list_data: + assert (await subscription.get()).data == expected_data + + @pytest.mark.trio async def test_publish_push_msg_is_called(monkeypatch): msg_forwarders = []