From c028aef2de5c3e6d899df65766d5973670f80599 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 27 Jul 2019 11:49:03 +0800 Subject: [PATCH] Fix all tests - Dedup `perform_test_from_obj` and the test cases used in both `test_floodsub` and `test_gossipsub_backward_compatibility.py`. Therefore, they are put in the standalone file `tests/pubsub/floodsub_integration_test_settings.py`. The functions and testcases are imported from there then. - IMO still need a refactor on the tests. There are still some duplicate code. --- libp2p/pubsub/gossipsub.py | 2 +- tests/pubsub/configs.py | 7 + tests/pubsub/dummy_account_node.py | 4 +- .../floodsub_integration_test_settings.py | 448 +++++++++++++++ tests/pubsub/test_floodsub.py | 480 +--------------- tests/pubsub/test_gossipsub.py | 10 +- .../test_gossipsub_backward_compatibility.py | 519 +----------------- tests/pubsub/utils.py | 18 +- 8 files changed, 526 insertions(+), 962 deletions(-) create mode 100644 tests/pubsub/configs.py create mode 100644 tests/pubsub/floodsub_integration_test_settings.py diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 8051a3b..b65462d 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -32,8 +32,8 @@ class GossipSub(IPubsubRouter): # Store target degree, upper degree bound, and lower degree bound self.degree = degree - self.degree_high = degree_high self.degree_low = degree_low + self.degree_high = degree_high # Store time to live (for topics in fanout) self.time_to_live = time_to_live diff --git a/tests/pubsub/configs.py b/tests/pubsub/configs.py new file mode 100644 index 0000000..d6849f7 --- /dev/null +++ b/tests/pubsub/configs.py @@ -0,0 +1,7 @@ +import multiaddr + + +FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" +SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] + +LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 8446028..f0a95f8 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,7 +1,8 @@ import asyncio -import multiaddr import uuid +import multiaddr + from libp2p import new_node from libp2p.host.host_interface import IHost from libp2p.pubsub.pubsub import Pubsub @@ -144,4 +145,3 @@ class DummyAccountNode: return self.balances[user] else: return -1 - diff --git a/tests/pubsub/floodsub_integration_test_settings.py b/tests/pubsub/floodsub_integration_test_settings.py new file mode 100644 index 0000000..eac62ba --- /dev/null +++ b/tests/pubsub/floodsub_integration_test_settings.py @@ -0,0 +1,448 @@ +import asyncio + +import pytest + +import multiaddr + +from libp2p import new_node +from libp2p.peer.id import ID +from libp2p.pubsub.pubsub import Pubsub + +from tests.utils import ( + cleanup, + connect, +) + + +FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" +SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] + +FLOODSUB_PROTOCOL_TEST_CASES = [ + { + "name": "simple_two_nodes", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "A": ["B"] + }, + "topic_map": { + "topic1": ["B"] + }, + "messages": [ + { + "topics": ["topic1"], + "data": b"foo", + "node_id": "A" + } + ] + }, + { + "name": "three_nodes_two_topics", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "A": ["B"], + "B": ["C"], + }, + "topic_map": { + "topic1": ["B", "C"], + "topic2": ["B", "C"], + }, + "messages": [ + { + "topics": ["topic1"], + "data": b"foo", + "node_id": "A", + }, + { + "topics": ["topic2"], + "data": b"Alex is tall", + "node_id": "A", + } + ] + }, + { + "name": "two_nodes_one_topic_single_subscriber_is_sender", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "A": ["B"], + }, + "topic_map": { + "topic1": ["B"], + }, + "messages": [ + { + "topics": ["topic1"], + "data": b"Alex is tall", + "node_id": "B", + } + ] + }, + { + "name": "two_nodes_one_topic_two_msgs", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "A": ["B"], + }, + "topic_map": { + "topic1": ["B"], + }, + "messages": [ + { + "topics": ["topic1"], + "data": b"Alex is tall", + "node_id": "B", + }, + { + "topics": ["topic1"], + "data": b"foo", + "node_id": "A", + } + ] + }, + { + "name": "seven_nodes_tree_one_topics", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"], + }, + "topic_map": { + "astrophysics": ["2", "3", "4", "5", "6", "7"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + } + ] + }, + { + "name": "seven_nodes_tree_three_topics", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"], + }, + "topic_map": { + "astrophysics": ["2", "3", "4", "5", "6", "7"], + "space": ["2", "3", "4", "5", "6", "7"], + "onions": ["2", "3", "4", "5", "6", "7"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["space"], + "data": b"foobar", + "node_id": "1", + }, + { + "topics": ["onions"], + "data": b"I am allergic", + "node_id": "1", + } + ] + }, + { + "name": "seven_nodes_tree_three_topics_diff_origin", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3"], + "2": ["4", "5"], + "3": ["6", "7"], + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], + "space": ["1", "2", "3", "4", "5", "6", "7"], + "onions": ["1", "2", "3", "4", "5", "6", "7"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["space"], + "data": b"foobar", + "node_id": "4", + }, + { + "topics": ["onions"], + "data": b"I am allergic", + "node_id": "7", + } + ] + }, + { + "name": "three_nodes_clique_two_topic_diff_origin", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3"], + "2": ["3"], + }, + "topic_map": { + "astrophysics": ["1", "2", "3"], + "school": ["1", "2", "3"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic", + "node_id": "1", + } + ] + }, + { + "name": "four_nodes_clique_two_topic_diff_origin_many_msgs", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2", "3", "4"], + "2": ["1", "3", "4"], + "3": ["1", "2", "4"], + "4": ["1", "2", "3"], + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4"], + "school": ["1", "2", "3", "4"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar2", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar3", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic3", + "node_id": "1", + } + ] + }, + { + "name": "five_nodes_ring_two_topic_diff_origin_many_msgs", + "supported_protocols": SUPPORTED_PROTOCOLS, + "adj_list": { + "1": ["2"], + "2": ["3"], + "3": ["4"], + "4": ["5"], + "5": ["1"], + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5"], + "school": ["1", "2", "3", "4", "5"], + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": b"e=mc^2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar2", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic2", + "node_id": "1", + }, + { + "topics": ["school"], + "data": b"foobar3", + "node_id": "2", + }, + { + "topics": ["astrophysics"], + "data": b"I am allergic3", + "node_id": "1", + } + ] + } +] + +# pylint: disable=invalid-name +floodsub_protocol_pytest_params = [ + pytest.param(test_case, id=test_case["name"]) + for test_case in FLOODSUB_PROTOCOL_TEST_CASES +] + + +# pylint: disable=too-many-locals +async def perform_test_from_obj(obj, router_factory): + """ + Perform pubsub tests from a test obj. + test obj are composed as follows: + + { + "supported_protocols": ["supported/protocol/1.0.0",...], + "adj_list": { + "node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...], + "node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...], + ... + }, + "topic_map": { + "topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...] + }, + "messages": [ + { + "topics": ["topic1_for_message", "topic2_for_message", ...], + "data": b"some contents of the message (newlines are not supported)", + "node_id": "message sender node id" + }, + ... + ] + } + NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A + or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior + is undefined (even if it may work) + """ + listen_maddr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") + + # Step 1) Create graph + adj_list = obj["adj_list"] + node_map = {} + pubsub_map = {} + + async def add_node(node_id: str) -> None: + node = await new_node(transport_opt=[str(listen_maddr)]) + await node.get_network().listen(listen_maddr) + node_map[node_id] = node + pubsub_router = router_factory(protocols=obj["supported_protocols"]) + pubsub = Pubsub(node, pubsub_router, ID(node_id.encode())) + pubsub_map[node_id] = pubsub + + tasks_connect = [] + for start_node_id in adj_list: + # Create node if node does not yet exist + if start_node_id not in node_map: + await add_node(start_node_id) + + # For each neighbor of start_node, create if does not yet exist, + # then connect start_node to neighbor + for neighbor_id in adj_list[start_node_id]: + # Create neighbor if neighbor does not yet exist + if neighbor_id not in node_map: + await add_node(neighbor_id) + tasks_connect.append( + connect(node_map[start_node_id], node_map[neighbor_id]) + ) + # Connect nodes and wait at least for 2 seconds + await asyncio.gather(*tasks_connect, asyncio.sleep(2)) + + # Step 2) Subscribe to topics + queues_map = {} + topic_map = obj["topic_map"] + + tasks_topic = [] + tasks_topic_data = [] + for topic, node_ids in topic_map.items(): + for node_id in node_ids: + tasks_topic.append(pubsub_map[node_id].subscribe(topic)) + tasks_topic_data.append((node_id, topic)) + tasks_topic.append(asyncio.sleep(2)) + + # Gather is like Promise.all + responses = await asyncio.gather(*tasks_topic, return_exceptions=True) + for i in range(len(responses) - 1): + node_id, topic = tasks_topic_data[i] + if node_id not in queues_map: + queues_map[node_id] = {} + # Store queue in topic-queue map for node + queues_map[node_id][topic] = responses[i] + + # Allow time for subscribing before continuing + await asyncio.sleep(0.01) + + # Step 3) Publish messages + topics_in_msgs_ordered = [] + messages = obj["messages"] + tasks_publish = [] + + for msg in messages: + topics = msg["topics"] + data = msg["data"] + node_id = msg["node_id"] + + # Publish message + # FIXME: Should be single RPC package with several topics + for topic in topics: + tasks_publish.append( + pubsub_map[node_id].publish( + topic, + data, + ) + ) + + # For each topic in topics, add topic, msg_talk tuple to ordered test list + # TODO: Update message sender to be correct message sender before + # adding msg_talk to this list + for topic in topics: + topics_in_msgs_ordered.append((topic, data)) + + # Allow time for publishing before continuing + await asyncio.gather(*tasks_publish, asyncio.sleep(2)) + + # Step 4) Check that all messages were received correctly. + # TODO: Check message sender too + for topic, data in topics_in_msgs_ordered: + # Look at each node in each topic + for node_id in topic_map[topic]: + # Get message from subscription queue + msg = await queues_map[node_id][topic].get() + assert data == msg.data + + # Success, terminate pending tasks. + await cleanup() diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 3b4de2b..c3d3e24 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -4,24 +4,26 @@ import pytest from libp2p import new_node from libp2p.peer.id import ID -from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub +from libp2p.pubsub.pubsub import Pubsub from tests.utils import ( cleanup, connect, ) -from .utils import ( - message_id_generator, - generate_RPC_packet, + +from .configs import ( + FLOODSUB_PROTOCOL_ID, + LISTEN_MADDR, +) +from .floodsub_integration_test_settings import ( + perform_test_from_obj, + floodsub_protocol_pytest_params, ) -# pylint: disable=too-many-locals -FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] - -LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") +# pylint: disable=too-many-locals @pytest.mark.asyncio @@ -119,457 +121,13 @@ async def test_lru_cache_two_nodes(monkeypatch): await cleanup() -async def perform_test_from_obj(obj): - """ - Perform a floodsub test from a test obj. - test obj are composed as follows: - - { - "supported_protocols": ["supported/protocol/1.0.0",...], - "adj_list": { - "node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...], - "node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...], - ... - }, - "topic_map": { - "topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...] - }, - "messages": [ - { - "topics": ["topic1_for_message", "topic2_for_message", ...], - "data": b"some contents of the message (newlines are not supported)", - "node_id": "message sender node id" - }, - ... - ] - } - NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A - or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior - is undefined (even if it may work) - """ - - # Step 1) Create graph - adj_list = obj["adj_list"] - node_map = {} - floodsub_map = {} - pubsub_map = {} - - async def add_node(node_id: str) -> None: - node = await new_node(transport_opt=[str(LISTEN_MADDR)]) - await node.get_network().listen(LISTEN_MADDR) - node_map[node_id] = node - floodsub = FloodSub(supported_protocols) - floodsub_map[node_id] = floodsub - pubsub = Pubsub(node, floodsub, ID(node_id.encode())) - pubsub_map[node_id] = pubsub - - supported_protocols = obj["supported_protocols"] - - tasks_connect = [] - for start_node_id in adj_list: - # Create node if node does not yet exist - if start_node_id not in node_map: - await add_node(start_node_id) - - # For each neighbor of start_node, create if does not yet exist, - # then connect start_node to neighbor - for neighbor_id in adj_list[start_node_id]: - # Create neighbor if neighbor does not yet exist - if neighbor_id not in node_map: - await add_node(neighbor_id) - tasks_connect.append( - connect(node_map[start_node_id], node_map[neighbor_id]) - ) - # Connect nodes and wait at least for 2 seconds - await asyncio.gather(*tasks_connect, asyncio.sleep(2)) - - # Step 2) Subscribe to topics - queues_map = {} - topic_map = obj["topic_map"] - - tasks_topic = [] - tasks_topic_data = [] - for topic, node_ids in topic_map.items(): - for node_id in node_ids: - tasks_topic.append(pubsub_map[node_id].subscribe(topic)) - tasks_topic_data.append((node_id, topic)) - tasks_topic.append(asyncio.sleep(2)) - - # Gather is like Promise.all - responses = await asyncio.gather(*tasks_topic, return_exceptions=True) - for i in range(len(responses) - 1): - node_id, topic = tasks_topic_data[i] - if node_id not in queues_map: - queues_map[node_id] = {} - # Store queue in topic-queue map for node - queues_map[node_id][topic] = responses[i] - - # Allow time for subscribing before continuing - await asyncio.sleep(0.01) - - # Step 3) Publish messages - topics_in_msgs_ordered = [] - messages = obj["messages"] - tasks_publish = [] - - for msg in messages: - topics = msg["topics"] - data = msg["data"] - node_id = msg["node_id"] - - # Publish message - # FIXME: Should be single RPC package with several topics - for topic in topics: - tasks_publish.append( - pubsub_map[node_id].publish( - topic, - data, - ) - ) - - # For each topic in topics, add topic, msg_talk tuple to ordered test list - # TODO: Update message sender to be correct message sender before - # adding msg_talk to this list - for topic in topics: - topics_in_msgs_ordered.append((topic, data)) - - # Allow time for publishing before continuing - await asyncio.gather(*tasks_publish, asyncio.sleep(2)) - - # Step 4) Check that all messages were received correctly. - # TODO: Check message sender too - for topic, data in topics_in_msgs_ordered: - # Look at each node in each topic - for node_id in topic_map[topic]: - # Get message from subscription queue - msg = await queues_map[node_id][topic].get() - assert data == msg.data - - # Success, terminate pending tasks. - await cleanup() - - +@pytest.mark.parametrize( + "test_case_obj", + floodsub_protocol_pytest_params, +) @pytest.mark.asyncio -async def test_simple_two_nodes_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": b"foo", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_three_nodes_two_topics_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "A": ["B"], - "B": ["C"], - }, - "topic_map": { - "topic1": ["B", "C"], - "topic2": ["B", "C"], - }, - "messages": [ - { - "topics": ["topic1"], - "data": b"foo", - "node_id": "A", - }, - { - "topics": ["topic2"], - "data": b"Alex is tall", - "node_id": "A", - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "A": ["B"], - }, - "topic_map": { - "topic1": ["B"], - }, - "messages": [ - { - "topics": ["topic1"], - "data": b"Alex is tall", - "node_id": "B", - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_two_nodes_one_topic_two_msgs_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "A": ["B"], - }, - "topic_map": { - "topic1": ["B"], - }, - "messages": [ - { - "topics": ["topic1"], - "data": b"Alex is tall", - "node_id": "B", - }, - { - "topics": ["topic1"], - "data": b"foo", - "node_id": "A", - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_one_topics_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"], - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"], - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_three_topics_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"], - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"], - "space": ["2", "3", "4", "5", "6", "7"], - "onions": ["2", "3", "4", "5", "6", "7"], - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["space"], - "data": b"foobar", - "node_id": "1", - }, - { - "topics": ["onions"], - "data": b"I am allergic", - "node_id": "1", - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_three_topics_diff_origin_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"], - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], - "space": ["1", "2", "3", "4", "5", "6", "7"], - "onions": ["1", "2", "3", "4", "5", "6", "7"], - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["space"], - "data": b"foobar", - "node_id": "4", - }, - { - "topics": ["onions"], - "data": b"I am allergic", - "node_id": "7", - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3"], - "2": ["3"], - }, - "topic_map": { - "astrophysics": ["1", "2", "3"], - "school": ["1", "2", "3"], - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic", - "node_id": "1", - } - ] - } - await perform_test_from_obj(test_obj) - - -@pytest.mark.asyncio -async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2", "3", "4"], - "2": ["1", "3", "4"], - "3": ["1", "2", "4"], - "4": ["1", "2", "3"], - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4"], - "school": ["1", "2", "3", "4"], - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar2", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar3", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic3", - "node_id": "1", - } - ] - } - await perform_test_from_obj(test_obj) - - -@pytest.mark.asyncio -async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): - test_obj = { - "supported_protocols": SUPPORTED_PROTOCOLS, - "adj_list": { - "1": ["2"], - "2": ["3"], - "3": ["4"], - "4": ["5"], - "5": ["1"], - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4", "5"], - "school": ["1", "2", "3", "4", "5"], - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": b"e=mc^2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar2", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic2", - "node_id": "1", - }, - { - "topics": ["school"], - "data": b"foobar3", - "node_id": "2", - }, - { - "topics": ["astrophysics"], - "data": b"I am allergic3", - "node_id": "1", - } - ] - } - await perform_test_from_obj(test_obj) +async def test_gossipsub_run_with_floodsub_tests(test_case_obj): + await perform_test_from_obj( + test_case_obj, + FloodSub, + ) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index 7e2c757..7f4c81c 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -8,9 +8,13 @@ from tests.utils import ( connect, ) -from .utils import message_id_generator, generate_RPC_packet, \ - create_libp2p_hosts, create_pubsub_and_gossipsub_instances, sparse_connect, dense_connect, \ - one_to_all_connect +from .utils import ( + create_libp2p_hosts, + create_pubsub_and_gossipsub_instances, + dense_connect, + one_to_all_connect, +) + SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] diff --git a/tests/pubsub/test_gossipsub_backward_compatibility.py b/tests/pubsub/test_gossipsub_backward_compatibility.py index 060973a..2a3167b 100644 --- a/tests/pubsub/test_gossipsub_backward_compatibility.py +++ b/tests/pubsub/test_gossipsub_backward_compatibility.py @@ -1,34 +1,31 @@ -import asyncio -import multiaddr +import functools + import pytest from libp2p import new_node -from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.gossipsub import GossipSub -from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from tests.utils import cleanup -from .utils import ( - connect, - message_id_generator, - generate_RPC_packet, +from .configs import ( + FLOODSUB_PROTOCOL_ID, + LISTEN_MADDR, +) +from .floodsub_integration_test_settings import ( + perform_test_from_obj, + floodsub_protocol_pytest_params, ) # pylint: disable=too-many-locals - @pytest.mark.asyncio -async def test_init(): - node = await new_node(transport_opt=["/ip4/127.1/tcp/0"]) +async def test_gossipsub_initialize_with_floodsub_protocol(): + node = await new_node(transport_opt=[str(LISTEN_MADDR)]) - await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.1/tcp/0")) + await node.get_network().listen(LISTEN_MADDR) - supported_protocols = ["/gossipsub/1.0.0"] - - gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) + gossipsub = GossipSub([FLOODSUB_PROTOCOL_ID], 3, 2, 4, 30) pubsub = Pubsub(node, gossipsub, "a") # Did it work? @@ -37,479 +34,19 @@ async def test_init(): await cleanup() -async def perform_test_from_obj(obj): - """ - Perform a floodsub test from a test obj. - test obj are composed as follows: - - { - "supported_protocols": ["supported/protocol/1.0.0",...], - "adj_list": { - "node1": ["neighbor1_of_node1", "neighbor2_of_node1", ...], - "node2": ["neighbor1_of_node2", "neighbor2_of_node2", ...], - ... - }, - "topic_map": { - "topic1": ["node1_subscribed_to_topic1", "node2_subscribed_to_topic1", ...] - }, - "messages": [ - { - "topics": ["topic1_for_message", "topic2_for_message", ...], - "data": "some contents of the message (newlines are not supported)", - "node_id": "message sender node id" - }, - ... - ] - } - NOTE: In adj_list, for any neighbors A and B, only list B as a neighbor of A - or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior - is undefined (even if it may work) - """ - - # Step 1) Create graph - adj_list = obj["adj_list"] - node_map = {} - gossipsub_map = {} - pubsub_map = {} - - supported_protocols = obj["supported_protocols"] - - tasks_connect = [] - for start_node_id in adj_list: - # Create node if node does not yet exist - if start_node_id not in node_map: - node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - node_map[start_node_id] = node - - gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) - gossipsub_map[start_node_id] = gossipsub - pubsub = Pubsub(node, gossipsub, start_node_id) - pubsub_map[start_node_id] = pubsub - - # For each neighbor of start_node, create if does not yet exist, - # then connect start_node to neighbor - for neighbor_id in adj_list[start_node_id]: - # Create neighbor if neighbor does not yet exist - if neighbor_id not in node_map: - neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - node_map[neighbor_id] = neighbor_node - - gossipsub = GossipSub(supported_protocols, 3, 2, 4, 30) - gossipsub_map[neighbor_id] = gossipsub - pubsub = Pubsub(neighbor_node, gossipsub, neighbor_id) - pubsub_map[neighbor_id] = pubsub - - # Connect node and neighbor - tasks_connect.append(connect(node_map[start_node_id], node_map[neighbor_id])) - tasks_connect.append(asyncio.sleep(2)) - await asyncio.gather(*tasks_connect) - - # Allow time for graph creation before continuing - # await asyncio.sleep(0.25) - - # Step 2) Subscribe to topics - queues_map = {} - topic_map = obj["topic_map"] - - tasks_topic = [] - tasks_topic_data = [] - for topic in topic_map: - for node_id in topic_map[topic]: - """ - # Subscribe node to topic - q = await pubsub_map[node_id].subscribe(topic) - - # Create topic-queue map for node_id if one does not yet exist - if node_id not in queues_map: - queues_map[node_id] = {} - - # Store queue in topic-queue map for node - queues_map[node_id][topic] = q - """ - tasks_topic.append(pubsub_map[node_id].subscribe(topic)) - tasks_topic_data.append((node_id, topic)) - tasks_topic.append(asyncio.sleep(2)) - - # Gather is like Promise.all - responses = await asyncio.gather(*tasks_topic, return_exceptions=True) - for i in range(len(responses) - 1): - q = responses[i] - node_id, topic = tasks_topic_data[i] - if node_id not in queues_map: - queues_map[node_id] = {} - - # Store queue in topic-queue map for node - queues_map[node_id][topic] = q - - # Allow time for subscribing before continuing - # await asyncio.sleep(0.01) - - # Step 3) Publish messages - topics_in_msgs_ordered = [] - messages = obj["messages"] - tasks_publish = [] - - for msg in messages: - topics = msg["topics"] - data = msg["data"] - node_id = msg["node_id"] - - # Publish message - # FIXME: This should be one RPC packet with several topics - for topic in topics: - tasks_publish.append( - pubsub_map[node_id].publish( - topic, - data, - ) - ) - - # For each topic in topics, add topic, msg_talk tuple to ordered test list - # TODO: Update message sender to be correct message sender before - # adding msg_talk to this list - for topic in topics: - topics_in_msgs_ordered.append((topic, data)) - - # Allow time for publishing before continuing - # await asyncio.sleep(0.4) - tasks_publish.append(asyncio.sleep(2)) - await asyncio.gather(*tasks_publish) - - # Step 4) Check that all messages were received correctly. - # TODO: Check message sender too - for topic, data in topics_in_msgs_ordered: - # Look at each node in each topic - for node_id in topic_map[topic]: - # Get message from subscription queue - msg_on_node = await queues_map[node_id][topic].get() - assert msg_on_node.data == data - - # Success, terminate pending tasks. - await cleanup() - +@pytest.mark.parametrize( + "test_case_obj", + floodsub_protocol_pytest_params, +) @pytest.mark.asyncio -async def test_simple_two_nodes_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_three_nodes_two_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"], - "B": ["C"] - }, - "topic_map": { - "topic1": ["B", "C"], - "topic2": ["B", "C"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - }, - { - "topics": ["topic2"], - "data": "Alex is tall", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "Alex is tall", - "node_id": "B" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_two_nodes_one_topic_two_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "A": ["B"] - }, - "topic_map": { - "topic1": ["B"] - }, - "messages": [ - { - "topics": ["topic1"], - "data": "Alex is tall", - "node_id": "B" - }, - { - "topics": ["topic1"], - "data": "foo", - "node_id": "A" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_one_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_three_topics_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"], - "space": ["2", "3", "4", "5", "6", "7"], - "onions": ["2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["space"], - "data": "foobar", - "node_id": "1" - }, - { - "topics": ["onions"], - "data": "I am allergic", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_seven_nodes_tree_three_topics_diff_origin_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["4", "5"], - "3": ["6", "7"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], - "space": ["1", "2", "3", "4", "5", "6", "7"], - "onions": ["1", "2", "3", "4", "5", "6", "7"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["space"], - "data": "foobar", - "node_id": "4" - }, - { - "topics": ["onions"], - "data": "I am allergic", - "node_id": "7" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3"], - "2": ["3"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3"], - "school": ["1", "2", "3"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2", "3", "4"], - "2": ["1", "3", "4"], - "3": ["1", "2", "4"], - "4": ["1", "2", "3"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4"], - "school": ["1", "2", "3", "4"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar2", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar3", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic3", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) - -@pytest.mark.asyncio -async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): - test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], - "adj_list": { - "1": ["2"], - "2": ["3"], - "3": ["4"], - "4": ["5"], - "5": ["1"] - }, - "topic_map": { - "astrophysics": ["1", "2", "3", "4", "5"], - "school": ["1", "2", "3", "4", "5"] - }, - "messages": [ - { - "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar2", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic2", - "node_id": "1" - }, - { - "topics": ["school"], - "data": "foobar3", - "node_id": "2" - }, - { - "topics": ["astrophysics"], - "data": "I am allergic3", - "node_id": "1" - } - ] - } - await perform_test_from_obj(test_obj) +async def test_gossipsub_run_with_floodsub_tests(test_case_obj): + await perform_test_from_obj( + test_case_obj, + functools.partial( + GossipSub, + degree=3, + degree_low=2, + degree_high=4, + time_to_live=30, + ) + ) diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index bb49a2f..9bfc57e 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,11 +1,12 @@ import asyncio -import multiaddr -import uuid import random import struct from typing import ( Sequence, ) +import uuid + +import multiaddr from libp2p import new_node from libp2p.pubsub.pb import rpc_pb2 @@ -93,8 +94,17 @@ async def create_libp2p_hosts(num_hosts): return hosts -def create_pubsub_and_gossipsub_instances(libp2p_hosts, supported_protocols, degree, degree_low, \ - degree_high, time_to_live, gossip_window, gossip_history, heartbeat_interval): + +def create_pubsub_and_gossipsub_instances( + libp2p_hosts, + supported_protocols, + degree, + degree_low, + degree_high, + time_to_live, + gossip_window, + gossip_history, + heartbeat_interval): pubsubs = [] gossipsubs = [] for node in libp2p_hosts: