Add fixtures for pubsub and router

And a starting `test_pubsub.py`
This commit is contained in:
mhchia 2019-07-27 18:41:16 +08:00
parent 21e97407ef
commit 96563c0d84
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
5 changed files with 126 additions and 6 deletions

View File

@ -340,11 +340,11 @@ class Pubsub:
:param msg_forwarder: the peer who forward us the message.
:param msg: the message we are going to push out.
"""
# TODO: - Check if the `source` is in the blacklist. If yes, reject.
# TODO: Check if the `source` is in the blacklist. If yes, reject.
# TODO: - Check if the `from` is in the blacklist. If yes, reject.
# TODO: Check if the `from` is in the blacklist. If yes, reject.
# TODO: - Check if signing is required and if so signature should be attached.
# TODO: Check if signing is required and if so signature should be attached.
if self._is_msg_seen(msg):
return

109
tests/pubsub/conftest.py Normal file
View File

@ -0,0 +1,109 @@
import asyncio
from typing import NamedTuple
import pytest
from multiaddr import Multiaddr
from libp2p import new_node
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.pubsub import Pubsub
from .configs import (
FLOODSUB_PROTOCOL_ID,
GOSSIPSUB_PROTOCOL_ID,
LISTEN_MADDR,
)
# pylint: disable=redefined-outer-name
@pytest.fixture
def num_hosts():
return 3
@pytest.fixture
async def hosts(num_hosts):
new_node_coros = tuple(
new_node(transport_opt=[str(LISTEN_MADDR)])
for _ in range(num_hosts)
)
_hosts = await asyncio.gather(*new_node_coros)
await asyncio.gather(*[
_host.get_network().listen(LISTEN_MADDR)
for _host in _hosts
])
yield _hosts
# Clean up
for _host in _hosts:
for listener in _host.get_network().listeners.values():
listener.server.close()
await listener.server.wait_closed()
@pytest.fixture
def floodsubs(num_hosts):
return tuple(
FloodSub(protocols=[FLOODSUB_PROTOCOL_ID])
for _ in range(num_hosts)
)
class GossipsubParams(NamedTuple):
degree: int = 10
degree_low: int = 9
degree_high: int = 11
fanout_ttl: int = 30
gossip_window: int = 3
gossip_history: int = 5
heartbeat_interval: float = 0.5
@pytest.fixture
def gossipsub_params():
return GossipsubParams()
@pytest.fixture
def gossipsubs(num_hosts, gossipsub_params):
yield tuple(
GossipSub(
protocols=[GOSSIPSUB_PROTOCOL_ID],
**gossipsub_params._asdict(),
)
for _ in range(num_hosts)
)
# TODO: Clean up
def _make_pubsubs(hosts, pubsub_routers):
if len(pubsub_routers) != len(hosts):
raise ValueError(
f"lenght of pubsub_routers={pubsub_routers} should be equaled to "
f"hosts={hosts}"
)
return tuple(
Pubsub(
host=host,
router=router,
my_id=host.get_id(),
)
for host, router in zip(hosts, pubsub_routers)
)
@pytest.fixture
def pubsubs_fsub(hosts, floodsubs):
_pubsubs_gsub = _make_pubsubs(hosts, floodsubs)
yield _pubsubs_gsub
# TODO: Clean up
@pytest.fixture
def pubsubs_gsub(hosts, gossipsubs):
_pubsubs_gsub = _make_pubsubs(hosts, gossipsubs)
yield _pubsubs_gsub
# TODO: Clean up

View File

@ -39,9 +39,9 @@ async def test_simple_two_nodes():
data = b"some data"
floodsub_a = FloodSub(supported_protocols)
pubsub_a = Pubsub(node_a, floodsub_a, ID(b"a" * 32))
pubsub_a = Pubsub(node_a, floodsub_a, ID(b"\x12\x20" + b"a" * 32))
floodsub_b = FloodSub(supported_protocols)
pubsub_b = Pubsub(node_b, floodsub_b, ID(b"b" * 32))
pubsub_b = Pubsub(node_b, floodsub_b, ID(b"\x12\x20" + b"a" * 32))
await connect(node_a, node_b)
await asyncio.sleep(0.25)

View File

@ -0,0 +1,11 @@
import pytest
@pytest.mark.asyncio
async def test_test(pubsubs_fsub):
topic = "topic"
data = b"data"
sub = await pubsubs_fsub[0].subscribe(topic)
await pubsubs_fsub[0].publish(topic, data)
msg = await sub.get()
assert msg.data == data

View File

@ -80,7 +80,7 @@ async def create_libp2p_hosts(num_hosts):
tasks_create = []
for i in range(0, num_hosts):
# Create node
tasks_create.append(asyncio.ensure_future(new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])))
tasks_create.append(new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]))
hosts = await asyncio.gather(*tasks_create)
tasks_listen = []