From 035d08b8bdc33a11b4f91e07ca6b3aab545749b4 Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 25 Jul 2019 23:11:27 +0800 Subject: [PATCH] Fix `test_floodsub.py` --- libp2p/pubsub/floodsub.py | 7 +- tests/pubsub/__init__.py | 0 tests/pubsub/test_floodsub.py | 317 +++++++++++++++------------------- tests/pubsub/utils.py | 8 - tests/utils.py | 10 ++ 5 files changed, 159 insertions(+), 183 deletions(-) create mode 100644 tests/pubsub/__init__.py diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 03b7330..0f9ebf4 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -76,6 +76,8 @@ class FloodSub(IPubsubRouter): ) for peer_id in peers_gen: stream = self.pubsub.peers[str(peer_id)] + # FIXME: We should add a `WriteMsg` similar to write delimited messages. + # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/master/comm.go#L107 await stream.write(rpc_msg.SerializeToString()) def _get_peers_to_send( @@ -84,7 +86,10 @@ class FloodSub(IPubsubRouter): src: ID, origin: ID) -> Iterable[ID]: """ - :return: the list of protocols supported by the router + Get the eligible peers to send the data to. + :param src: the peer id of the peer who forwards the message to me. + :param origin: the peer id of the peer who originally broadcast the message. + :return: a generator of the peer ids who we send data to. """ for topic in topic_ids: if topic not in self.pubsub.peer_topics: diff --git a/tests/pubsub/__init__.py b/tests/pubsub/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 82831b8..3b4de2b 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -2,13 +2,15 @@ import asyncio import multiaddr import pytest -from tests.utils import cleanup from libp2p import new_node from libp2p.peer.id import ID -from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub +from tests.utils import ( + cleanup, + connect, +) from .utils import ( message_id_generator, generate_RPC_packet, @@ -16,25 +18,21 @@ from .utils import ( # pylint: disable=too-many-locals +FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" +SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] -async def connect(node1, node2): - """ - Connect node1 to node2 - """ - addr = node2.get_addrs()[0] - info = info_from_p2p_addr(addr) - await node1.connect(info) +LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") @pytest.mark.asyncio async def test_simple_two_nodes(): - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + node_a = await new_node(transport_opt=[str(LISTEN_MADDR)]) + node_b = await new_node(transport_opt=[str(LISTEN_MADDR)]) - await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node_a.get_network().listen(LISTEN_MADDR) + await node_b.get_network().listen(LISTEN_MADDR) - supported_protocols = ["/floodsub/1.0.0"] + supported_protocols = [FLOODSUB_PROTOCOL_ID] topic = "my_topic" data = b"some data" @@ -72,14 +70,13 @@ async def test_lru_cache_two_nodes(monkeypatch): # `node_b` should only receive the following expected_received_indices = [1, 2, 3, 4, 5, 1] - listen_maddr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") - node_a = await new_node() - node_b = await new_node() + node_a = await new_node(transport_opt=[str(LISTEN_MADDR)]) + node_b = await new_node(transport_opt=[str(LISTEN_MADDR)]) - await node_a.get_network().listen(listen_maddr) - await node_b.get_network().listen(listen_maddr) + await node_a.get_network().listen(LISTEN_MADDR) + await node_b.get_network().listen(LISTEN_MADDR) - supported_protocols = ["/floodsub/1.0.0"] + supported_protocols = SUPPORTED_PROTOCOLS topic = "my_topic" # Mock `get_msg_id` to make us easier to manipulate `msg_id` by `data`. @@ -106,7 +103,7 @@ async def test_lru_cache_two_nodes(monkeypatch): def _make_testing_data(i: int) -> bytes: num_int_bytes = 4 if i >= 2**(num_int_bytes * 8): - raise ValueError("") + raise ValueError("integer is too large to be serialized") return b"data" + i.to_bytes(num_int_bytes, "big") for index in message_indices: @@ -140,7 +137,7 @@ async def perform_test_from_obj(obj): "messages": [ { "topics": ["topic1_for_message", "topic2_for_message", ...], - "data": "some contents of the message (newlines are not supported)", + "data": b"some contents of the message (newlines are not supported)", "node_id": "message sender node id" }, ... @@ -157,45 +154,34 @@ async def perform_test_from_obj(obj): floodsub_map = {} pubsub_map = {} + async def add_node(node_id: str) -> None: + node = await new_node(transport_opt=[str(LISTEN_MADDR)]) + await node.get_network().listen(LISTEN_MADDR) + node_map[node_id] = node + floodsub = FloodSub(supported_protocols) + floodsub_map[node_id] = floodsub + pubsub = Pubsub(node, floodsub, ID(node_id.encode())) + pubsub_map[node_id] = pubsub + supported_protocols = obj["supported_protocols"] tasks_connect = [] for start_node_id in adj_list: # Create node if node does not yet exist if start_node_id not in node_map: - node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - node_map[start_node_id] = node - - floodsub = FloodSub(supported_protocols) - floodsub_map[start_node_id] = floodsub - pubsub = Pubsub(node, floodsub, start_node_id) - pubsub_map[start_node_id] = pubsub + await add_node(start_node_id) # For each neighbor of start_node, create if does not yet exist, # then connect start_node to neighbor for neighbor_id in adj_list[start_node_id]: # Create neighbor if neighbor does not yet exist if neighbor_id not in node_map: - neighbor_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await neighbor_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - - node_map[neighbor_id] = neighbor_node - - floodsub = FloodSub(supported_protocols) - floodsub_map[neighbor_id] = floodsub - pubsub = Pubsub(neighbor_node, floodsub, neighbor_id) - pubsub_map[neighbor_id] = pubsub - - # Connect node and neighbor - # await connect(node_map[start_node_id], node_map[neighbor_id]) - tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id]))) - tasks_connect.append(asyncio.sleep(2)) - await asyncio.gather(*tasks_connect) - - # Allow time for graph creation before continuing - # await asyncio.sleep(0.25) + await add_node(neighbor_id) + tasks_connect.append( + connect(node_map[start_node_id], node_map[neighbor_id]) + ) + # Connect nodes and wait at least for 2 seconds + await asyncio.gather(*tasks_connect, asyncio.sleep(2)) # Step 2) Subscribe to topics queues_map = {} @@ -203,89 +189,70 @@ async def perform_test_from_obj(obj): tasks_topic = [] tasks_topic_data = [] - for topic in topic_map: - for node_id in topic_map[topic]: - """ - # Subscribe node to topic - q = await pubsub_map[node_id].subscribe(topic) - - # Create topic-queue map for node_id if one does not yet exist - if node_id not in queues_map: - queues_map[node_id] = {} - - # Store queue in topic-queue map for node - queues_map[node_id][topic] = q - """ - tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic))) + for topic, node_ids in topic_map.items(): + for node_id in node_ids: + tasks_topic.append(pubsub_map[node_id].subscribe(topic)) tasks_topic_data.append((node_id, topic)) tasks_topic.append(asyncio.sleep(2)) # Gather is like Promise.all responses = await asyncio.gather(*tasks_topic, return_exceptions=True) for i in range(len(responses) - 1): - q = responses[i] node_id, topic = tasks_topic_data[i] if node_id not in queues_map: queues_map[node_id] = {} - # Store queue in topic-queue map for node - queues_map[node_id][topic] = q + queues_map[node_id][topic] = responses[i] # Allow time for subscribing before continuing - # await asyncio.sleep(0.01) + await asyncio.sleep(0.01) # Step 3) Publish messages topics_in_msgs_ordered = [] messages = obj["messages"] tasks_publish = [] - next_msg_id_func = message_id_generator(0) for msg in messages: topics = msg["topics"] - data = msg["data"] node_id = msg["node_id"] - # Get actual id for sender node (not the id from the test obj) - actual_node_id = str(node_map[node_id].get_id()) - - # Create correctly formatted message - msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func()) - # Publish message - # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) - tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\ - actual_node_id, msg_talk.SerializeToString()))) + # FIXME: Should be single RPC package with several topics + for topic in topics: + tasks_publish.append( + pubsub_map[node_id].publish( + topic, + data, + ) + ) # For each topic in topics, add topic, msg_talk tuple to ordered test list # TODO: Update message sender to be correct message sender before # adding msg_talk to this list for topic in topics: - topics_in_msgs_ordered.append((topic, msg_talk)) + topics_in_msgs_ordered.append((topic, data)) # Allow time for publishing before continuing - # await asyncio.sleep(0.4) - tasks_publish.append(asyncio.sleep(2)) - await asyncio.gather(*tasks_publish) + await asyncio.gather(*tasks_publish, asyncio.sleep(2)) # Step 4) Check that all messages were received correctly. # TODO: Check message sender too - for i in range(len(topics_in_msgs_ordered)): - topic, actual_msg = topics_in_msgs_ordered[i] - + for topic, data in topics_in_msgs_ordered: # Look at each node in each topic for node_id in topic_map[topic]: # Get message from subscription queue - msg_on_node_str = await queues_map[node_id][topic].get() - assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString() + msg = await queues_map[node_id][topic].get() + assert data == msg.data # Success, terminate pending tasks. await cleanup() + @pytest.mark.asyncio async def test_simple_two_nodes_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { "A": ["B"] }, @@ -295,7 +262,7 @@ async def test_simple_two_nodes_test_obj(): "messages": [ { "topics": ["topic1"], - "data": "foo", + "data": b"foo", "node_id": "A" } ] @@ -305,25 +272,25 @@ async def test_simple_two_nodes_test_obj(): @pytest.mark.asyncio async def test_three_nodes_two_topics_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { "A": ["B"], - "B": ["C"] + "B": ["C"], }, "topic_map": { "topic1": ["B", "C"], - "topic2": ["B", "C"] + "topic2": ["B", "C"], }, "messages": [ { "topics": ["topic1"], - "data": "foo", - "node_id": "A" + "data": b"foo", + "node_id": "A", }, { "topics": ["topic2"], - "data": "Alex is tall", - "node_id": "A" + "data": b"Alex is tall", + "node_id": "A", } ] } @@ -332,18 +299,18 @@ async def test_three_nodes_two_topics_test_obj(): @pytest.mark.asyncio async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { - "A": ["B"] + "A": ["B"], }, "topic_map": { - "topic1": ["B"] + "topic1": ["B"], }, "messages": [ { "topics": ["topic1"], - "data": "Alex is tall", - "node_id": "B" + "data": b"Alex is tall", + "node_id": "B", } ] } @@ -352,23 +319,23 @@ async def test_two_nodes_one_topic_single_subscriber_is_sender_test_obj(): @pytest.mark.asyncio async def test_two_nodes_one_topic_two_msgs_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { - "A": ["B"] + "A": ["B"], }, "topic_map": { - "topic1": ["B"] + "topic1": ["B"], }, "messages": [ { "topics": ["topic1"], - "data": "Alex is tall", - "node_id": "B" + "data": b"Alex is tall", + "node_id": "B", }, { "topics": ["topic1"], - "data": "foo", - "node_id": "A" + "data": b"foo", + "node_id": "A", } ] } @@ -377,20 +344,20 @@ async def test_two_nodes_one_topic_two_msgs_test_obj(): @pytest.mark.asyncio async def test_seven_nodes_tree_one_topics_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { "1": ["2", "3"], "2": ["4", "5"], - "3": ["6", "7"] + "3": ["6", "7"], }, "topic_map": { - "astrophysics": ["2", "3", "4", "5", "6", "7"] + "astrophysics": ["2", "3", "4", "5", "6", "7"], }, "messages": [ { "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" + "data": b"e=mc^2", + "node_id": "1", } ] } @@ -399,32 +366,32 @@ async def test_seven_nodes_tree_one_topics_test_obj(): @pytest.mark.asyncio async def test_seven_nodes_tree_three_topics_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { "1": ["2", "3"], "2": ["4", "5"], - "3": ["6", "7"] + "3": ["6", "7"], }, "topic_map": { "astrophysics": ["2", "3", "4", "5", "6", "7"], "space": ["2", "3", "4", "5", "6", "7"], - "onions": ["2", "3", "4", "5", "6", "7"] + "onions": ["2", "3", "4", "5", "6", "7"], }, "messages": [ { "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" + "data": b"e=mc^2", + "node_id": "1", }, { "topics": ["space"], - "data": "foobar", - "node_id": "1" + "data": b"foobar", + "node_id": "1", }, { "topics": ["onions"], - "data": "I am allergic", - "node_id": "1" + "data": b"I am allergic", + "node_id": "1", } ] } @@ -433,32 +400,32 @@ async def test_seven_nodes_tree_three_topics_test_obj(): @pytest.mark.asyncio async def test_seven_nodes_tree_three_topics_diff_origin_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { "1": ["2", "3"], "2": ["4", "5"], - "3": ["6", "7"] + "3": ["6", "7"], }, "topic_map": { "astrophysics": ["1", "2", "3", "4", "5", "6", "7"], "space": ["1", "2", "3", "4", "5", "6", "7"], - "onions": ["1", "2", "3", "4", "5", "6", "7"] + "onions": ["1", "2", "3", "4", "5", "6", "7"], }, "messages": [ { "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" + "data": b"e=mc^2", + "node_id": "1", }, { "topics": ["space"], - "data": "foobar", - "node_id": "4" + "data": b"foobar", + "node_id": "4", }, { "topics": ["onions"], - "data": "I am allergic", - "node_id": "7" + "data": b"I am allergic", + "node_id": "7", } ] } @@ -467,139 +434,141 @@ async def test_seven_nodes_tree_three_topics_diff_origin_test_obj(): @pytest.mark.asyncio async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { "1": ["2", "3"], - "2": ["3"] + "2": ["3"], }, "topic_map": { "astrophysics": ["1", "2", "3"], - "school": ["1", "2", "3"] + "school": ["1", "2", "3"], }, "messages": [ { "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" + "data": b"e=mc^2", + "node_id": "1", }, { "topics": ["school"], - "data": "foobar", - "node_id": "2" + "data": b"foobar", + "node_id": "2", }, { "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" + "data": b"I am allergic", + "node_id": "1", } ] } await perform_test_from_obj(test_obj) + @pytest.mark.asyncio async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { "1": ["2", "3", "4"], "2": ["1", "3", "4"], "3": ["1", "2", "4"], - "4": ["1", "2", "3"] + "4": ["1", "2", "3"], }, "topic_map": { "astrophysics": ["1", "2", "3", "4"], - "school": ["1", "2", "3", "4"] + "school": ["1", "2", "3", "4"], }, "messages": [ { "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" + "data": b"e=mc^2", + "node_id": "1", }, { "topics": ["school"], - "data": "foobar", - "node_id": "2" + "data": b"foobar", + "node_id": "2", }, { "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" + "data": b"I am allergic", + "node_id": "1", }, { "topics": ["school"], - "data": "foobar2", - "node_id": "2" + "data": b"foobar2", + "node_id": "2", }, { "topics": ["astrophysics"], - "data": "I am allergic2", - "node_id": "1" + "data": b"I am allergic2", + "node_id": "1", }, { "topics": ["school"], - "data": "foobar3", - "node_id": "2" + "data": b"foobar3", + "node_id": "2", }, { "topics": ["astrophysics"], - "data": "I am allergic3", - "node_id": "1" + "data": b"I am allergic3", + "node_id": "1", } ] } await perform_test_from_obj(test_obj) + @pytest.mark.asyncio async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): test_obj = { - "supported_protocols": ["/floodsub/1.0.0"], + "supported_protocols": SUPPORTED_PROTOCOLS, "adj_list": { "1": ["2"], "2": ["3"], "3": ["4"], "4": ["5"], - "5": ["1"] + "5": ["1"], }, "topic_map": { "astrophysics": ["1", "2", "3", "4", "5"], - "school": ["1", "2", "3", "4", "5"] + "school": ["1", "2", "3", "4", "5"], }, "messages": [ { "topics": ["astrophysics"], - "data": "e=mc^2", - "node_id": "1" + "data": b"e=mc^2", + "node_id": "1", }, { "topics": ["school"], - "data": "foobar", - "node_id": "2" + "data": b"foobar", + "node_id": "2", }, { "topics": ["astrophysics"], - "data": "I am allergic", - "node_id": "1" + "data": b"I am allergic", + "node_id": "1", }, { "topics": ["school"], - "data": "foobar2", - "node_id": "2" + "data": b"foobar2", + "node_id": "2", }, { "topics": ["astrophysics"], - "data": "I am allergic2", - "node_id": "1" + "data": b"I am allergic2", + "node_id": "1", }, { "topics": ["school"], - "data": "foobar3", - "node_id": "2" + "data": b"foobar3", + "node_id": "2", }, { "topics": ["astrophysics"], - "data": "I am allergic3", - "node_id": "1" + "data": b"I am allergic3", + "node_id": "1", } ] } diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index 24db5cf..72d796f 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -70,14 +70,6 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): return packet -async def connect(node1, node2): - """ - Connect node1 to node2 - """ - addr = node2.get_addrs()[0] - info = info_from_p2p_addr(addr) - await node1.connect(info) - async def create_libp2p_hosts(num_hosts): """ Create libp2p hosts diff --git a/tests/utils.py b/tests/utils.py index 4efde83..686d086 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,6 +3,16 @@ import asyncio import multiaddr from libp2p import new_node +from libp2p.peer.peerinfo import info_from_p2p_addr + + +async def connect(node1, node2): + """ + Connect node1 to node2 + """ + addr = node2.get_addrs()[0] + info = info_from_p2p_addr(addr) + await node1.connect(info) async def cleanup():