diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 0d587b5..babb61c 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -60,7 +60,7 @@ class Mplex(IMuxedConn): # TODO: pass down timeout from user and use that if stream_id in self.buffers: try: - data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=8) + data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=5000) return data except asyncio.TimeoutError: return None diff --git a/requirements_dev.txt b/requirements_dev.txt index e3d8c0e..f1eedcc 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -5,3 +5,4 @@ pytest-asyncio pylint grpcio grpcio-tools +pyvis diff --git a/tests/pubsub/test_random_topology.py b/tests/pubsub/test_random_topology.py index ffb78b3..f37c7d3 100644 --- a/tests/pubsub/test_random_topology.py +++ b/tests/pubsub/test_random_topology.py @@ -3,6 +3,7 @@ import multiaddr import pytest import random +from pyvis.network import Network from tests.utils import cleanup from libp2p import new_node from libp2p.peer.peerinfo import info_from_p2p_addr @@ -21,7 +22,7 @@ async def connect(node1, node2): info = info_from_p2p_addr(addr) await node1.connect(info) -async def perform_test_from_obj(obj): +async def perform_test_from_obj(obj, timeout_len=3): """ Perform a floodsub test from a test obj. test obj are composed as follows: @@ -90,7 +91,7 @@ async def perform_test_from_obj(obj): # Connect node and neighbor # await connect(node_map[start_node_id], node_map[neighbor_id]) tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id]))) - tasks_connect.append(asyncio.sleep(2)) + tasks_connect.append(asyncio.sleep(timeout_len)) await asyncio.gather(*tasks_connect) # Allow time for graph creation before continuing @@ -117,7 +118,7 @@ async def perform_test_from_obj(obj): """ tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic))) tasks_topic_data.append((node_id, topic)) - tasks_topic.append(asyncio.sleep(2)) + tasks_topic.append(asyncio.sleep(timeout_len)) # Gather is like Promise.all responses = await asyncio.gather(*tasks_topic, return_exceptions=True) @@ -199,8 +200,10 @@ async def perform_test_from_obj(obj): # await perform_test_from_obj(test_obj) def generate_random_topology(num_nodes, density, num_topics, max_nodes_per_topic, max_msgs_per_topic): - nodes = [str(i).zfill(2) for i in range(0,num_nodes)] - + # Give nodes string labels so that perform_test_with_obj works correctly + # Note: "n" is appended so that visualizations work properly ('00' caused issues) + nodes = ["n" + str(i).zfill(2) for i in range(0,num_nodes)] + # 1) Generate random graph structure # Create initial graph by connecting each node to its previous node @@ -275,15 +278,32 @@ def generate_random_topology(num_nodes, density, num_topics, max_nodes_per_topic @pytest.mark.asyncio async def test_simple_random(): - num_nodes = 4 - topic_density = 1 - num_topics = 2 - max_nodes_per_topic = 4 + num_nodes = 100 + density = 0.1 + num_topics = 20 + max_nodes_per_topic = 69 max_msgs_per_topic = 1 - topology_test_obj = generate_random_topology(num_nodes, topic_density, num_topics,\ + topology_test_obj = generate_random_topology(num_nodes, density, num_topics,\ max_nodes_per_topic, max_msgs_per_topic) print(topology_test_obj) - await perform_test_from_obj(topology_test_obj) - # print("TOPOLOGY") - # print(topology) + await perform_test_from_obj(topology_test_obj, timeout_len=20) + + # Save graph + save_graph = True + if save_graph: + net = Network() + net.barnes_hut() + + adj_list = topology_test_obj["adj_list"] + # print(list(adj_list.keys())) + nodes_to_add = list(adj_list.keys()) + net.add_nodes(nodes_to_add) + print(nodes_to_add) + print(net.nodes) + for node in adj_list: + neighbors = adj_list[node] + for neighbor in neighbors: + net.add_edge(node, neighbor) + + net.show("random_topology.html")