Add read topology from file
This commit is contained in:
parent
86dfc07321
commit
470f5e6e51
@ -1,4 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
from sender import SenderNode
|
||||
from receiver import ReceiverNode
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
@ -34,20 +36,22 @@ async def create_topology(adjacency_map, sender, receivers):
|
||||
await connect(sender.libp2p_node, receivers[target_num].libp2p_node)
|
||||
|
||||
# Connect receivers to other receivers
|
||||
for source_num in adjacency_map:
|
||||
if source_num != "sender":
|
||||
target_nums = adjacency_map[source_num]
|
||||
for source_num_str in adjacency_map:
|
||||
if source_num_str != "sender":
|
||||
target_nums = adjacency_map[source_num_str]
|
||||
source_num = int(source_num_str)
|
||||
for target_num in target_nums:
|
||||
await connect(receivers[source_num].libp2p_node, \
|
||||
receivers[target_num].libp2p_node)
|
||||
|
||||
def get_num_receivers_in_topology(topology):
|
||||
receiver_ids = []
|
||||
for key in topology:
|
||||
if key != "sender":
|
||||
if key not in receiver_ids:
|
||||
receiver_ids.append(key)
|
||||
for neighbor in topology[key]:
|
||||
for key_str in topology:
|
||||
if key_str != "sender":
|
||||
key_num = int(key_str)
|
||||
if key_num not in receiver_ids:
|
||||
receiver_ids.append(key_num)
|
||||
for neighbor in topology[key_str]:
|
||||
if neighbor not in receiver_ids:
|
||||
receiver_ids.append(neighbor)
|
||||
return len(receiver_ids)
|
||||
@ -61,21 +65,14 @@ async def main():
|
||||
print("Receivers created")
|
||||
|
||||
# Define connection topology
|
||||
topology = {
|
||||
"sender": [0, 2, 4],
|
||||
0: [1],
|
||||
2: [3],
|
||||
4: [5]
|
||||
}
|
||||
topology_dict = json.loads(open(sys.argv[1]).read())
|
||||
|
||||
topology = topology_dict["topology"]
|
||||
|
||||
num_receivers = get_num_receivers_in_topology(topology)
|
||||
|
||||
# Define topic map
|
||||
topic_map = {
|
||||
"1": [0, 1],
|
||||
"2": [2, 3],
|
||||
"3": [4, 5]
|
||||
}
|
||||
topic_map = topology_dict["topic_map"]
|
||||
|
||||
topics = topic_map.keys()
|
||||
|
||||
|
@ -56,8 +56,8 @@ class SenderNode():
|
||||
ack = await stream.read()
|
||||
if ack is not None:
|
||||
await self.ack_queue.put(ack)
|
||||
# else:
|
||||
# break
|
||||
else:
|
||||
break
|
||||
# Reached once test_being_performed is False
|
||||
# Notify receivers test is over
|
||||
print("Writing end")
|
||||
|
11
examples/sharding/topologies/simple_1_chain.json
Normal file
11
examples/sharding/topologies/simple_1_chain.json
Normal file
@ -0,0 +1,11 @@
|
||||
{
|
||||
"topology": {
|
||||
"sender": [0],
|
||||
"0": [1],
|
||||
"1": [2],
|
||||
"2": [3]
|
||||
},
|
||||
"topic_map": {
|
||||
"1": [0, 1, 2, 3]
|
||||
}
|
||||
}
|
11
examples/sharding/topologies/simple_1_tree.json
Normal file
11
examples/sharding/topologies/simple_1_tree.json
Normal file
@ -0,0 +1,11 @@
|
||||
{
|
||||
"topology": {
|
||||
"sender": [0],
|
||||
"0": [1, 2],
|
||||
"1": [3, 4],
|
||||
"2": [5, 6]
|
||||
},
|
||||
"topic_map": {
|
||||
"1": [0, 1, 2, 3, 4, 5, 6]
|
||||
}
|
||||
}
|
13
examples/sharding/topologies/simple_3_chains.json
Normal file
13
examples/sharding/topologies/simple_3_chains.json
Normal file
@ -0,0 +1,13 @@
|
||||
{
|
||||
"topology": {
|
||||
"sender": [0, 2, 4],
|
||||
"0": [1],
|
||||
"2": [3],
|
||||
"4": [5]
|
||||
},
|
||||
"topic_map": {
|
||||
"1": [0, 1],
|
||||
"2": [2, 3],
|
||||
"3": [4, 5]
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user