Increase timeouts significantly. Add visualization of graph structure
This commit is contained in:
parent
be5cbfb7d1
commit
2ba281b511
|
@ -60,7 +60,7 @@ class Mplex(IMuxedConn):
|
||||||
# TODO: pass down timeout from user and use that
|
# TODO: pass down timeout from user and use that
|
||||||
if stream_id in self.buffers:
|
if stream_id in self.buffers:
|
||||||
try:
|
try:
|
||||||
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=8)
|
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=5000)
|
||||||
return data
|
return data
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -5,3 +5,4 @@ pytest-asyncio
|
||||||
pylint
|
pylint
|
||||||
grpcio
|
grpcio
|
||||||
grpcio-tools
|
grpcio-tools
|
||||||
|
pyvis
|
||||||
|
|
|
@ -3,6 +3,7 @@ import multiaddr
|
||||||
import pytest
|
import pytest
|
||||||
import random
|
import random
|
||||||
|
|
||||||
|
from pyvis.network import Network
|
||||||
from tests.utils import cleanup
|
from tests.utils import cleanup
|
||||||
from libp2p import new_node
|
from libp2p import new_node
|
||||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
@ -21,7 +22,7 @@ async def connect(node1, node2):
|
||||||
info = info_from_p2p_addr(addr)
|
info = info_from_p2p_addr(addr)
|
||||||
await node1.connect(info)
|
await node1.connect(info)
|
||||||
|
|
||||||
async def perform_test_from_obj(obj):
|
async def perform_test_from_obj(obj, timeout_len=3):
|
||||||
"""
|
"""
|
||||||
Perform a floodsub test from a test obj.
|
Perform a floodsub test from a test obj.
|
||||||
test obj are composed as follows:
|
test obj are composed as follows:
|
||||||
|
@ -90,7 +91,7 @@ async def perform_test_from_obj(obj):
|
||||||
# Connect node and neighbor
|
# Connect node and neighbor
|
||||||
# await connect(node_map[start_node_id], node_map[neighbor_id])
|
# await connect(node_map[start_node_id], node_map[neighbor_id])
|
||||||
tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id])))
|
tasks_connect.append(asyncio.ensure_future(connect(node_map[start_node_id], node_map[neighbor_id])))
|
||||||
tasks_connect.append(asyncio.sleep(2))
|
tasks_connect.append(asyncio.sleep(timeout_len))
|
||||||
await asyncio.gather(*tasks_connect)
|
await asyncio.gather(*tasks_connect)
|
||||||
|
|
||||||
# Allow time for graph creation before continuing
|
# Allow time for graph creation before continuing
|
||||||
|
@ -117,7 +118,7 @@ async def perform_test_from_obj(obj):
|
||||||
"""
|
"""
|
||||||
tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic)))
|
tasks_topic.append(asyncio.ensure_future(pubsub_map[node_id].subscribe(topic)))
|
||||||
tasks_topic_data.append((node_id, topic))
|
tasks_topic_data.append((node_id, topic))
|
||||||
tasks_topic.append(asyncio.sleep(2))
|
tasks_topic.append(asyncio.sleep(timeout_len))
|
||||||
|
|
||||||
# Gather is like Promise.all
|
# Gather is like Promise.all
|
||||||
responses = await asyncio.gather(*tasks_topic, return_exceptions=True)
|
responses = await asyncio.gather(*tasks_topic, return_exceptions=True)
|
||||||
|
@ -199,8 +200,10 @@ async def perform_test_from_obj(obj):
|
||||||
# await perform_test_from_obj(test_obj)
|
# await perform_test_from_obj(test_obj)
|
||||||
|
|
||||||
def generate_random_topology(num_nodes, density, num_topics, max_nodes_per_topic, max_msgs_per_topic):
|
def generate_random_topology(num_nodes, density, num_topics, max_nodes_per_topic, max_msgs_per_topic):
|
||||||
nodes = [str(i).zfill(2) for i in range(0,num_nodes)]
|
# Give nodes string labels so that perform_test_with_obj works correctly
|
||||||
|
# Note: "n" is appended so that visualizations work properly ('00' caused issues)
|
||||||
|
nodes = ["n" + str(i).zfill(2) for i in range(0,num_nodes)]
|
||||||
|
|
||||||
# 1) Generate random graph structure
|
# 1) Generate random graph structure
|
||||||
|
|
||||||
# Create initial graph by connecting each node to its previous node
|
# Create initial graph by connecting each node to its previous node
|
||||||
|
@ -275,15 +278,32 @@ def generate_random_topology(num_nodes, density, num_topics, max_nodes_per_topic
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_simple_random():
|
async def test_simple_random():
|
||||||
num_nodes = 4
|
num_nodes = 100
|
||||||
topic_density = 1
|
density = 0.1
|
||||||
num_topics = 2
|
num_topics = 20
|
||||||
max_nodes_per_topic = 4
|
max_nodes_per_topic = 69
|
||||||
max_msgs_per_topic = 1
|
max_msgs_per_topic = 1
|
||||||
topology_test_obj = generate_random_topology(num_nodes, topic_density, num_topics,\
|
topology_test_obj = generate_random_topology(num_nodes, density, num_topics,\
|
||||||
max_nodes_per_topic, max_msgs_per_topic)
|
max_nodes_per_topic, max_msgs_per_topic)
|
||||||
print(topology_test_obj)
|
print(topology_test_obj)
|
||||||
await perform_test_from_obj(topology_test_obj)
|
await perform_test_from_obj(topology_test_obj, timeout_len=20)
|
||||||
# print("TOPOLOGY")
|
|
||||||
# print(topology)
|
# Save graph
|
||||||
|
save_graph = True
|
||||||
|
if save_graph:
|
||||||
|
net = Network()
|
||||||
|
net.barnes_hut()
|
||||||
|
|
||||||
|
adj_list = topology_test_obj["adj_list"]
|
||||||
|
# print(list(adj_list.keys()))
|
||||||
|
nodes_to_add = list(adj_list.keys())
|
||||||
|
net.add_nodes(nodes_to_add)
|
||||||
|
print(nodes_to_add)
|
||||||
|
print(net.nodes)
|
||||||
|
for node in adj_list:
|
||||||
|
neighbors = adj_list[node]
|
||||||
|
for neighbor in neighbors:
|
||||||
|
net.add_edge(node, neighbor)
|
||||||
|
|
||||||
|
net.show("random_topology.html")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user