Compare commits
16 Commits
master
...
demo-dev-m
Author | SHA1 | Date | |
---|---|---|---|
|
cd43db0861 | ||
|
8acec4d2ed | ||
|
c59e809ee9 | ||
|
00257238f5 | ||
|
fa7251cb68 | ||
|
9277be98bf | ||
|
470f5e6e51 | ||
|
86dfc07321 | ||
|
7bacc8ca08 | ||
|
fe14c24a89 | ||
|
f570b19db8 | ||
|
82f881a49a | ||
|
08f845cc21 | ||
|
25def0c7d6 | ||
|
66427cd6f5 | ||
|
ba358335df |
109
examples/sharding/driver.py
Normal file
109
examples/sharding/driver.py
Normal file
|
@ -0,0 +1,109 @@
|
|||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
from sender import SenderNode
|
||||
from receiver import ReceiverNode
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
from tests.utils import cleanup
|
||||
|
||||
ACK_PROTOCOL = "/ack/1.0.0"
|
||||
|
||||
async def create_receivers(num_receivers, topic_map):
|
||||
receivers = []
|
||||
|
||||
# From topic_map (topic -> list of receivers), create (receiver -> topic)
|
||||
receiver_to_topic_map = {}
|
||||
for topic in topic_map:
|
||||
for receiver in topic_map[topic]:
|
||||
receiver_to_topic_map[receiver] = topic
|
||||
|
||||
# Create receivers
|
||||
for i in range(num_receivers):
|
||||
receivers.append(await ReceiverNode.create(ACK_PROTOCOL, receiver_to_topic_map[i]))
|
||||
return receivers
|
||||
|
||||
async def connect(node1, node2):
|
||||
# node1 connects to node2
|
||||
addr = node2.get_addrs()[0]
|
||||
info = info_from_p2p_addr(addr)
|
||||
await node1.connect(info)
|
||||
|
||||
async def create_topology(adjacency_map, sender, receivers):
|
||||
# Create network
|
||||
|
||||
# Connect senders to receivers
|
||||
for target_num in adjacency_map["sender"]:
|
||||
await connect(sender.libp2p_node, receivers[target_num].libp2p_node)
|
||||
|
||||
# Connect receivers to other receivers
|
||||
for source_num_str in adjacency_map:
|
||||
if source_num_str != "sender":
|
||||
target_nums = adjacency_map[source_num_str]
|
||||
source_num = int(source_num_str)
|
||||
for target_num in target_nums:
|
||||
await connect(receivers[source_num].libp2p_node, \
|
||||
receivers[target_num].libp2p_node)
|
||||
|
||||
def get_num_receivers_in_topology(topology):
|
||||
receiver_ids = []
|
||||
for key_str in topology:
|
||||
if key_str != "sender":
|
||||
key_num = int(key_str)
|
||||
if key_num not in receiver_ids:
|
||||
receiver_ids.append(key_num)
|
||||
for neighbor in topology[key_str]:
|
||||
if neighbor not in receiver_ids:
|
||||
receiver_ids.append(neighbor)
|
||||
return len(receiver_ids)
|
||||
|
||||
async def main():
|
||||
# Create sender
|
||||
sender = await SenderNode.create(ACK_PROTOCOL)
|
||||
print("Sender created")
|
||||
|
||||
# Define connection topology
|
||||
topology_dict = json.loads(open(sys.argv[1]).read())
|
||||
|
||||
topology = topology_dict["topology"]
|
||||
|
||||
num_receivers = get_num_receivers_in_topology(topology)
|
||||
|
||||
# Define topic map
|
||||
topic_map = topology_dict["topic_map"]
|
||||
|
||||
topics = topic_map.keys()
|
||||
|
||||
# Create receivers
|
||||
receivers = await create_receivers(num_receivers, topic_map)
|
||||
print("Receivers created")
|
||||
|
||||
# Create network topology
|
||||
await create_topology(topology, sender, receivers)
|
||||
print("Topology created")
|
||||
|
||||
# Perform throughput test
|
||||
# First, start receivers
|
||||
sender_info = info_from_p2p_addr(sender.libp2p_node.get_addrs()[0])
|
||||
for receiver in receivers:
|
||||
print("Starting receiving")
|
||||
asyncio.ensure_future(receiver.start_receiving(sender_info))
|
||||
|
||||
# Allow time for start receiving to be completed
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Start sending messages and perform throughput test
|
||||
# Determine number of receivers in each topic
|
||||
num_receivers_in_each_topic = {}
|
||||
for topic in topic_map:
|
||||
num_receivers_in_each_topic[topic] = len(topic_map[topic])
|
||||
print("Performing test")
|
||||
await sender.perform_test(num_receivers_in_each_topic, topics, 10)
|
||||
|
||||
print("All testing completed")
|
||||
await cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
loop.close()
|
39
examples/sharding/graph_generator.py
Normal file
39
examples/sharding/graph_generator.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
import json
|
||||
from pyvis.network import Network
|
||||
from sys import argv
|
||||
|
||||
COLORS = ["#7b47bf", "#70ec84", "#ffa07a", "#005582", "#165042", "#dcb2b8"]
|
||||
|
||||
# Read in topology+topics file into map
|
||||
|
||||
# Add nodes
|
||||
def main():
|
||||
net = Network()
|
||||
net.barnes_hut()
|
||||
|
||||
topology_dict = json.loads(open(argv[1]).read())
|
||||
|
||||
adj_list = topology_dict["topology"]
|
||||
topics_map = topology_dict["topic_map"]
|
||||
|
||||
# Assign colors to nodes in topics (note sender is not included in a topic)
|
||||
for topic in topics_map:
|
||||
index = int(topic)
|
||||
color = COLORS[index]
|
||||
net.add_nodes(topics_map[topic], \
|
||||
color=[color for _ in range(len(topics_map[topic]))])
|
||||
|
||||
nodes_to_add = list(adj_list.keys())
|
||||
net.add_nodes(nodes_to_add)
|
||||
for node in adj_list:
|
||||
node_val = node
|
||||
if node != "sender":
|
||||
node_val = int(node_val)
|
||||
neighbors = adj_list[node]
|
||||
for neighbor in neighbors:
|
||||
net.add_edge(node_val, neighbor)
|
||||
|
||||
net.show(argv[2])
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
117
examples/sharding/output.html
Normal file
117
examples/sharding/output.html
Normal file
|
@ -0,0 +1,117 @@
|
|||
<html>
|
||||
<head>
|
||||
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/vis/4.16.1/vis.css" type="text/css" />
|
||||
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/vis/4.16.1/vis-network.min.js"> </script>
|
||||
|
||||
<!-- <link rel="stylesheet" href="../node_modules/vis/dist/vis.min.css" type="text/css" />
|
||||
<script type="text/javascript" src="../node_modules/vis/dist/vis.js"> </script>-->
|
||||
|
||||
<style type="text/css">
|
||||
|
||||
#mynetwork {
|
||||
width: 500px;
|
||||
height: 500px;
|
||||
background-color: #ffffff;
|
||||
border: 1px solid lightgray;
|
||||
position: relative;
|
||||
float: left;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
</style>
|
||||
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<div id = "mynetwork"></div>
|
||||
|
||||
|
||||
<script type="text/javascript">
|
||||
|
||||
// initialize global variables.
|
||||
var edges;
|
||||
var nodes;
|
||||
var network;
|
||||
var container;
|
||||
var options, data;
|
||||
|
||||
|
||||
// This method is responsible for drawing the graph, returns the drawn network
|
||||
function drawGraph() {
|
||||
var container = document.getElementById('mynetwork');
|
||||
|
||||
|
||||
|
||||
// parsing and collecting nodes and edges from the python
|
||||
nodes = new vis.DataSet([{"color": "#70ec84", "id": 0, "label": 0, "shape": "dot"}, {"color": "#70ec84", "id": 1, "label": 1, "shape": "dot"}, {"color": "#ffa07a", "id": 2, "label": 2, "shape": "dot"}, {"color": "#ffa07a", "id": 3, "label": 3, "shape": "dot"}, {"color": "#005582", "id": 4, "label": 4, "shape": "dot"}, {"color": "#005582", "id": 5, "label": 5, "shape": "dot"}, {"id": "sender", "label": "sender", "shape": "dot"}]);
|
||||
edges = new vis.DataSet([{"from": "sender", "to": 0}, {"from": "sender", "to": 2}, {"from": "sender", "to": 4}, {"from": 0, "to": 1}, {"from": 2, "to": 3}, {"from": 4, "to": 5}]);
|
||||
|
||||
// adding nodes and edges to the graph
|
||||
data = {nodes: nodes, edges: edges};
|
||||
|
||||
var options = {
|
||||
"configure": {
|
||||
"enabled": false
|
||||
},
|
||||
"edges": {
|
||||
"color": {
|
||||
"inherit": true
|
||||
},
|
||||
"smooth": {
|
||||
"enabled": false,
|
||||
"type": "continuous"
|
||||
}
|
||||
},
|
||||
"interaction": {
|
||||
"dragNodes": true,
|
||||
"hideEdgesOnDrag": false,
|
||||
"hideNodesOnDrag": false
|
||||
},
|
||||
"physics": {
|
||||
"barnesHut": {
|
||||
"avoidOverlap": 0,
|
||||
"centralGravity": 0.3,
|
||||
"damping": 0.09,
|
||||
"gravitationalConstant": -80000,
|
||||
"springConstant": 0.001,
|
||||
"springLength": 250
|
||||
},
|
||||
"enabled": true,
|
||||
"stabilization": {
|
||||
"enabled": true,
|
||||
"fit": true,
|
||||
"iterations": 1000,
|
||||
"onlyDynamicEdges": false,
|
||||
"updateInterval": 50
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
// default to using dot shape for nodes
|
||||
options.nodes = {
|
||||
shape: "dot"
|
||||
}
|
||||
|
||||
|
||||
network = new vis.Network(container, data, options);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
return network;
|
||||
|
||||
}
|
||||
|
||||
drawGraph();
|
||||
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
70
examples/sharding/receiver.py
Normal file
70
examples/sharding/receiver.py
Normal file
|
@ -0,0 +1,70 @@
|
|||
import multiaddr
|
||||
import asyncio
|
||||
|
||||
from libp2p import new_node
|
||||
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||
from libp2p.pubsub.pubsub import Pubsub
|
||||
from libp2p.pubsub.floodsub import FloodSub
|
||||
from tests.pubsub.utils import message_id_generator
|
||||
|
||||
TOPIC = "eth"
|
||||
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||
|
||||
class ReceiverNode():
|
||||
"""
|
||||
Node which has an internal balance mapping, meant to serve as
|
||||
a dummy crypto blockchain. There is no actual blockchain, just a simple
|
||||
map indicating how much crypto each user in the mappings holds
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.next_msg_id_func = message_id_generator(0)
|
||||
|
||||
@classmethod
|
||||
async def create(cls, ack_protocol, topic):
|
||||
"""
|
||||
Create a new ReceiverNode and attach a libp2p node, a floodsub, and a pubsub
|
||||
instance to this new node
|
||||
|
||||
We use create as this serves as a factory function and allows us
|
||||
to use async await, unlike the init function
|
||||
"""
|
||||
self = ReceiverNode()
|
||||
|
||||
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||
|
||||
self.libp2p_node = libp2p_node
|
||||
|
||||
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
|
||||
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
|
||||
|
||||
self.pubsub_messages = await self.pubsub.subscribe(topic)
|
||||
self.topic = topic
|
||||
|
||||
self.ack_protocol = ack_protocol
|
||||
|
||||
return self
|
||||
|
||||
async def wait_for_end(self, ack_stream):
|
||||
# Continue waiting for end message, even if None (i.e. timeout) is received
|
||||
msg = await ack_stream.read()
|
||||
while msg is None:
|
||||
msg = await ack_stream.read()
|
||||
msg = msg.decode()
|
||||
if msg == "end":
|
||||
self.should_listen = False
|
||||
print("End received")
|
||||
|
||||
async def start_receiving(self, sender_node_info):
|
||||
await self.libp2p_node.connect(sender_node_info)
|
||||
ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, [self.ack_protocol])
|
||||
asyncio.ensure_future(self.wait_for_end(ack_stream))
|
||||
|
||||
self.should_listen = True
|
||||
ack_msg = self.topic
|
||||
encoded_ack_msg = ack_msg.encode()
|
||||
while self.should_listen:
|
||||
msg = await self.pubsub_messages.get()
|
||||
await ack_stream.write(encoded_ack_msg)
|
||||
print("Receiver closed")
|
159
examples/sharding/sender.py
Normal file
159
examples/sharding/sender.py
Normal file
|
@ -0,0 +1,159 @@
|
|||
import asyncio
|
||||
import multiaddr
|
||||
|
||||
from timeit import default_timer as timer
|
||||
|
||||
from tests.utils import cleanup
|
||||
from tests.pubsub.utils import generate_RPC_packet, message_id_generator
|
||||
from libp2p import new_node
|
||||
from libp2p.pubsub.pubsub import Pubsub
|
||||
from libp2p.pubsub.floodsub import FloodSub
|
||||
|
||||
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||
TOPIC = "eth"
|
||||
|
||||
class SenderNode():
|
||||
"""
|
||||
SenderNode which pings ReceiverNodes continuously to perform benchmarking
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.next_msg_id_func = message_id_generator(0)
|
||||
self.ack_queue = asyncio.Queue()
|
||||
|
||||
@classmethod
|
||||
async def create(cls, ack_protocol):
|
||||
"""
|
||||
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
|
||||
instance to this new node
|
||||
|
||||
We use create as this serves as a factory function and allows us
|
||||
to use async await, unlike the init function
|
||||
"""
|
||||
self = SenderNode()
|
||||
|
||||
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||
|
||||
self.libp2p_node = libp2p_node
|
||||
|
||||
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
|
||||
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
|
||||
await self.pubsub.subscribe(TOPIC)
|
||||
|
||||
self.test_being_performed = True
|
||||
|
||||
self.all_streams = []
|
||||
async def ack_stream_handler(stream):
|
||||
self.all_streams.append(stream)
|
||||
|
||||
while self.test_being_performed:
|
||||
# This Ack is what times out when multi-topic tests finish
|
||||
ack = await stream.read()
|
||||
if ack is not None:
|
||||
await self.ack_queue.put(ack)
|
||||
else:
|
||||
print("FUCK")
|
||||
break
|
||||
# Reached once test_being_performed is False
|
||||
# Notify receivers test is over
|
||||
print("Writing end")
|
||||
await stream.write("end".encode())
|
||||
|
||||
# Set handler for acks
|
||||
self.ack_protocol = ack_protocol
|
||||
self.libp2p_node.set_stream_handler(self.ack_protocol, ack_stream_handler)
|
||||
|
||||
return self
|
||||
|
||||
async def perform_test(self, num_receivers_in_each_topic, topics, time_length):
|
||||
# Time and loop
|
||||
|
||||
my_id = str(self.libp2p_node.get_id())
|
||||
msg_contents = "transaction"
|
||||
|
||||
num_sent_in_each_topic = {}
|
||||
num_acks_in_each_topic = {}
|
||||
for topic in topics:
|
||||
num_sent_in_each_topic[topic] = 0
|
||||
num_acks_in_each_topic[topic] = 0
|
||||
|
||||
self.topic_ack_queues = {}
|
||||
for topic in topics:
|
||||
self.topic_ack_queues[topic] = asyncio.Queue()
|
||||
completed_topics_count = 0
|
||||
num_topics = len(topics)
|
||||
async def handle_ack_queues():
|
||||
start = timer()
|
||||
curr_time = timer()
|
||||
|
||||
print("Handling ack queues")
|
||||
nonlocal completed_topics_count, num_topics
|
||||
while completed_topics_count < num_topics:
|
||||
ack = await self.ack_queue.get()
|
||||
decoded_ack = ack.decode()
|
||||
|
||||
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
|
||||
|
||||
curr_time = timer()
|
||||
|
||||
async def end_all_async():
|
||||
# Add None to ack_queue to break out of the loop.
|
||||
# Note: This is necessary given the current code or the code will never
|
||||
# terminate
|
||||
await self.ack_queue.put(None)
|
||||
|
||||
# This is not necessary but is useful for turning off the receivers gracefully
|
||||
for stream in self.all_streams:
|
||||
await stream.write("end".encode())
|
||||
|
||||
async def perform_test_on_topic(topic):
|
||||
print("Performing test on topic " + topic)
|
||||
start = timer()
|
||||
curr_time = timer()
|
||||
|
||||
# Perform test while time is not up here AND
|
||||
# while time is not up in handle_ack_queues, which is checked with the
|
||||
# self.test_being_performed boolean
|
||||
while (curr_time - start) < time_length:
|
||||
# Send message on single topic
|
||||
packet = generate_RPC_packet(my_id, [topic], msg_contents, self.next_msg_id_func())
|
||||
|
||||
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||
num_sent_in_each_topic[topic] += 1
|
||||
|
||||
# Wait for acks
|
||||
num_acks = 0
|
||||
|
||||
# While number of acks is below threshold AND
|
||||
# while time is not up in handle_ack_queues, which is checked with the
|
||||
# self.test_being_performed boolean
|
||||
# TODO: Check safety of this. Does this make sense in the asyncio
|
||||
# event-driven setting?
|
||||
while num_acks < num_receivers_in_each_topic[topic]:
|
||||
ack = await self.topic_ack_queues[topic].get()
|
||||
num_acks += 1
|
||||
num_acks_in_each_topic[topic] += 1
|
||||
curr_time = timer()
|
||||
|
||||
nonlocal completed_topics_count, num_topics
|
||||
print("Test completed " + topic)
|
||||
completed_topics_count += 1
|
||||
if completed_topics_count == num_topics:
|
||||
self.test_being_performed = False
|
||||
print("End all async")
|
||||
await end_all_async()
|
||||
|
||||
tasks = [asyncio.ensure_future(handle_ack_queues())]
|
||||
|
||||
for topic in topics:
|
||||
tasks.append(asyncio.ensure_future(perform_test_on_topic(topic)))
|
||||
|
||||
gathered = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Do something interesting with test results
|
||||
print("Num sent: " + str(num_sent_in_each_topic))
|
||||
print("Num fully ack: " + str(num_acks_in_each_topic))
|
||||
|
||||
# End test
|
||||
self.test_being_performed = False
|
13
examples/sharding/topologies/simple_1_chain.json
Normal file
13
examples/sharding/topologies/simple_1_chain.json
Normal file
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"topology": {
|
||||
"sender": [0],
|
||||
"0": [1],
|
||||
"1": [2],
|
||||
"2": [3],
|
||||
"3": [4],
|
||||
"4": [5]
|
||||
},
|
||||
"topic_map": {
|
||||
"1": [0, 1, 2, 3, 4, 5]
|
||||
}
|
||||
}
|
11
examples/sharding/topologies/simple_1_tree.json
Normal file
11
examples/sharding/topologies/simple_1_tree.json
Normal file
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"topology": {
|
||||
"sender": [0],
|
||||
"0": [1, 2],
|
||||
"1": [3, 4],
|
||||
"2": [5, 6]
|
||||
},
|
||||
"topic_map": {
|
||||
"1": [0, 1, 2, 3, 4, 5, 6]
|
||||
}
|
||||
}
|
13
examples/sharding/topologies/simple_3_chains.json
Normal file
13
examples/sharding/topologies/simple_3_chains.json
Normal file
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"topology": {
|
||||
"sender": [0, 2, 4],
|
||||
"0": [1],
|
||||
"2": [3],
|
||||
"4": [5]
|
||||
},
|
||||
"topic_map": {
|
||||
"1": [0, 1],
|
||||
"2": [2, 3],
|
||||
"3": [4, 5]
|
||||
}
|
||||
}
|
|
@ -5,4 +5,5 @@ pytest-asyncio
|
|||
pylint
|
||||
grpcio
|
||||
grpcio-tools
|
||||
lru-dict>=1.1.6
|
||||
lru-dict>=1.1.6
|
||||
pyvis
|
Loading…
Reference in New Issue
Block a user