Pubsub: change channel size
To `32` to conform to the go implementation.
This commit is contained in:
parent
ba0fb8a833
commit
7f8c0f11f6
|
@ -1,6 +1,5 @@
|
||||||
import functools
|
import functools
|
||||||
import logging
|
import logging
|
||||||
import math
|
|
||||||
import time
|
import time
|
||||||
from typing import TYPE_CHECKING, Dict, KeysView, List, NamedTuple, Set, Tuple, cast
|
from typing import TYPE_CHECKING, Dict, KeysView, List, NamedTuple, Set, Tuple, cast
|
||||||
|
|
||||||
|
@ -32,6 +31,9 @@ if TYPE_CHECKING:
|
||||||
from typing import Any # noqa: F401
|
from typing import Any # noqa: F401
|
||||||
|
|
||||||
|
|
||||||
|
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/40e1c94708658b155f30cf99e4574f384756d83c/topic.go#L97 # noqa: E501
|
||||||
|
SUBSCRIPTION_CHANNEL_SIZE = 32
|
||||||
|
|
||||||
logger = logging.getLogger("libp2p.pubsub")
|
logger = logging.getLogger("libp2p.pubsub")
|
||||||
|
|
||||||
|
|
||||||
|
@ -373,7 +375,13 @@ class Pubsub(Service, IPubsub):
|
||||||
# we are subscribed to a topic this message was sent for,
|
# we are subscribed to a topic this message was sent for,
|
||||||
# so add message to the subscription output queue
|
# so add message to the subscription output queue
|
||||||
# for each topic
|
# for each topic
|
||||||
await self.subscribed_topics_send[topic].send(publish_message)
|
try:
|
||||||
|
self.subscribed_topics_send[topic].send_nowait(publish_message)
|
||||||
|
except trio.WouldBlock:
|
||||||
|
# Channel is full, ignore this message.
|
||||||
|
logger.warning(
|
||||||
|
"fail to deliver message to subscription for topic %s", topic
|
||||||
|
)
|
||||||
|
|
||||||
async def subscribe(self, topic_id: str) -> ISubscriptionAPI:
|
async def subscribe(self, topic_id: str) -> ISubscriptionAPI:
|
||||||
"""
|
"""
|
||||||
|
@ -389,7 +397,7 @@ class Pubsub(Service, IPubsub):
|
||||||
return self.subscribed_topics_receive[topic_id]
|
return self.subscribed_topics_receive[topic_id]
|
||||||
|
|
||||||
send_channel, receive_channel = trio.open_memory_channel[rpc_pb2.Message](
|
send_channel, receive_channel = trio.open_memory_channel[rpc_pb2.Message](
|
||||||
math.inf
|
SUBSCRIPTION_CHANNEL_SIZE
|
||||||
)
|
)
|
||||||
|
|
||||||
subscription = TrioSubscriptionAPI(
|
subscription = TrioSubscriptionAPI(
|
||||||
|
|
|
@ -6,7 +6,7 @@ import trio
|
||||||
|
|
||||||
from libp2p.exceptions import ValidationError
|
from libp2p.exceptions import ValidationError
|
||||||
from libp2p.pubsub.pb import rpc_pb2
|
from libp2p.pubsub.pb import rpc_pb2
|
||||||
from libp2p.pubsub.pubsub import PUBSUB_SIGNING_PREFIX
|
from libp2p.pubsub.pubsub import PUBSUB_SIGNING_PREFIX, SUBSCRIPTION_CHANNEL_SIZE
|
||||||
from libp2p.tools.constants import MAX_READ_LEN
|
from libp2p.tools.constants import MAX_READ_LEN
|
||||||
from libp2p.tools.factories import IDFactory, PubsubFactory, net_stream_pair_factory
|
from libp2p.tools.factories import IDFactory, PubsubFactory, net_stream_pair_factory
|
||||||
from libp2p.tools.pubsub.utils import make_pubsub_msg
|
from libp2p.tools.pubsub.utils import make_pubsub_msg
|
||||||
|
@ -444,6 +444,38 @@ async def test_subscribe_and_publish():
|
||||||
nursery.start_soon(publish_data, TESTING_TOPIC)
|
nursery.start_soon(publish_data, TESTING_TOPIC)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.trio
|
||||||
|
async def test_subscribe_and_publish_full_channel():
|
||||||
|
async with PubsubFactory.create_batch_with_floodsub(1) as pubsubs_fsub:
|
||||||
|
pubsub = pubsubs_fsub[0]
|
||||||
|
|
||||||
|
extra_data_0 = b"extra_data_0"
|
||||||
|
extra_data_1 = b"extra_data_1"
|
||||||
|
|
||||||
|
# Test: Subscription channel is of size `SUBSCRIPTION_CHANNEL_SIZE`.
|
||||||
|
# When the channel is full, new received messages are dropped.
|
||||||
|
# After the channel has empty slot, the channel can receive new messages.
|
||||||
|
|
||||||
|
# Assume `SUBSCRIPTION_CHANNEL_SIZE` is smaller than `2**(4*8)`.
|
||||||
|
list_data = [i.to_bytes(4, "big") for i in range(SUBSCRIPTION_CHANNEL_SIZE)]
|
||||||
|
# Expect `extra_data_0` is dropped and `extra_data_1` is appended.
|
||||||
|
expected_list_data = list_data + [extra_data_1]
|
||||||
|
|
||||||
|
subscription = await pubsub.subscribe(TESTING_TOPIC)
|
||||||
|
for data in list_data:
|
||||||
|
await pubsub.publish(TESTING_TOPIC, data)
|
||||||
|
|
||||||
|
# Publish `extra_data_0` which should be dropped since the channel is already full.
|
||||||
|
await pubsub.publish(TESTING_TOPIC, extra_data_0)
|
||||||
|
# Consume a message and there is an empty slot in the channel.
|
||||||
|
assert (await subscription.get()).data == expected_list_data.pop(0)
|
||||||
|
# Publish `extra_data_1` which should be appended to the channel.
|
||||||
|
await pubsub.publish(TESTING_TOPIC, extra_data_1)
|
||||||
|
|
||||||
|
for expected_data in expected_list_data:
|
||||||
|
assert (await subscription.get()).data == expected_data
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.trio
|
@pytest.mark.trio
|
||||||
async def test_publish_push_msg_is_called(monkeypatch):
|
async def test_publish_push_msg_is_called(monkeypatch):
|
||||||
msg_forwarders = []
|
msg_forwarders = []
|
||||||
|
|
Loading…
Reference in New Issue
Block a user