Compare commits

...

16 Commits

Author SHA1 Message Date
Stuckinaboot
cd43db0861 Clean up code 2019-04-07 21:29:45 -04:00
Stuckinaboot
8acec4d2ed Update simple_1_chain 2019-04-07 21:06:57 -04:00
Stuckinaboot
c59e809ee9 Add pyvis 2019-04-07 20:59:42 -04:00
Stuckinaboot
00257238f5 Remove unnecessary loop conditions 2019-04-07 20:56:59 -04:00
Stuckinaboot
fa7251cb68 Add graph generator 2019-04-07 19:59:33 -04:00
Stuckinaboot
9277be98bf Add end messages into simplified logic 2019-04-07 18:40:01 -04:00
Stuckinaboot
470f5e6e51 Add read topology from file 2019-04-07 16:36:05 -04:00
Stuckinaboot
86dfc07321 Simplify sender and receiver logic 2019-04-07 15:24:56 -04:00
Stuckinaboot
7bacc8ca08 Remove unused code 2019-04-07 14:48:59 -04:00
Stuckinaboot
fe14c24a89 Add working multi-topic test 2019-04-07 14:46:01 -04:00
Stuckinaboot
f570b19db8 Add multi-topic benchmark capability 2019-04-07 14:25:59 -04:00
Stuckinaboot
82f881a49a Not working as of now. Debugging 2019-04-06 23:09:26 -04:00
Stuckinaboot
08f845cc21 Lay groundwork for transition to multiple topics 2019-04-06 17:48:06 -04:00
Stuckinaboot
25def0c7d6 Implement end-to-end perform testing 2019-04-06 17:16:37 -04:00
Stuckinaboot
66427cd6f5 Modify sender and receiver to be classes 2019-04-06 14:55:06 -04:00
Robert Zajac
ba358335df scaffolding sender/receiver 2019-04-06 14:23:52 -04:00
9 changed files with 533 additions and 1 deletions

109
examples/sharding/driver.py Normal file
View File

@ -0,0 +1,109 @@
import asyncio
import json
import sys
from sender import SenderNode
from receiver import ReceiverNode
from libp2p.peer.peerinfo import info_from_p2p_addr
from tests.utils import cleanup
ACK_PROTOCOL = "/ack/1.0.0"
async def create_receivers(num_receivers, topic_map):
receivers = []
# From topic_map (topic -> list of receivers), create (receiver -> topic)
receiver_to_topic_map = {}
for topic in topic_map:
for receiver in topic_map[topic]:
receiver_to_topic_map[receiver] = topic
# Create receivers
for i in range(num_receivers):
receivers.append(await ReceiverNode.create(ACK_PROTOCOL, receiver_to_topic_map[i]))
return receivers
async def connect(node1, node2):
# node1 connects to node2
addr = node2.get_addrs()[0]
info = info_from_p2p_addr(addr)
await node1.connect(info)
async def create_topology(adjacency_map, sender, receivers):
# Create network
# Connect senders to receivers
for target_num in adjacency_map["sender"]:
await connect(sender.libp2p_node, receivers[target_num].libp2p_node)
# Connect receivers to other receivers
for source_num_str in adjacency_map:
if source_num_str != "sender":
target_nums = adjacency_map[source_num_str]
source_num = int(source_num_str)
for target_num in target_nums:
await connect(receivers[source_num].libp2p_node, \
receivers[target_num].libp2p_node)
def get_num_receivers_in_topology(topology):
receiver_ids = []
for key_str in topology:
if key_str != "sender":
key_num = int(key_str)
if key_num not in receiver_ids:
receiver_ids.append(key_num)
for neighbor in topology[key_str]:
if neighbor not in receiver_ids:
receiver_ids.append(neighbor)
return len(receiver_ids)
async def main():
# Create sender
sender = await SenderNode.create(ACK_PROTOCOL)
print("Sender created")
# Define connection topology
topology_dict = json.loads(open(sys.argv[1]).read())
topology = topology_dict["topology"]
num_receivers = get_num_receivers_in_topology(topology)
# Define topic map
topic_map = topology_dict["topic_map"]
topics = topic_map.keys()
# Create receivers
receivers = await create_receivers(num_receivers, topic_map)
print("Receivers created")
# Create network topology
await create_topology(topology, sender, receivers)
print("Topology created")
# Perform throughput test
# First, start receivers
sender_info = info_from_p2p_addr(sender.libp2p_node.get_addrs()[0])
for receiver in receivers:
print("Starting receiving")
asyncio.ensure_future(receiver.start_receiving(sender_info))
# Allow time for start receiving to be completed
await asyncio.sleep(0.5)
# Start sending messages and perform throughput test
# Determine number of receivers in each topic
num_receivers_in_each_topic = {}
for topic in topic_map:
num_receivers_in_each_topic[topic] = len(topic_map[topic])
print("Performing test")
await sender.perform_test(num_receivers_in_each_topic, topics, 10)
print("All testing completed")
await cleanup()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

View File

@ -0,0 +1,39 @@
import json
from pyvis.network import Network
from sys import argv
COLORS = ["#7b47bf", "#70ec84", "#ffa07a", "#005582", "#165042", "#dcb2b8"]
# Read in topology+topics file into map
# Add nodes
def main():
net = Network()
net.barnes_hut()
topology_dict = json.loads(open(argv[1]).read())
adj_list = topology_dict["topology"]
topics_map = topology_dict["topic_map"]
# Assign colors to nodes in topics (note sender is not included in a topic)
for topic in topics_map:
index = int(topic)
color = COLORS[index]
net.add_nodes(topics_map[topic], \
color=[color for _ in range(len(topics_map[topic]))])
nodes_to_add = list(adj_list.keys())
net.add_nodes(nodes_to_add)
for node in adj_list:
node_val = node
if node != "sender":
node_val = int(node_val)
neighbors = adj_list[node]
for neighbor in neighbors:
net.add_edge(node_val, neighbor)
net.show(argv[2])
if __name__ == "__main__":
main()

View File

@ -0,0 +1,117 @@
<html>
<head>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/vis/4.16.1/vis.css" type="text/css" />
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/vis/4.16.1/vis-network.min.js"> </script>
<!-- <link rel="stylesheet" href="../node_modules/vis/dist/vis.min.css" type="text/css" />
<script type="text/javascript" src="../node_modules/vis/dist/vis.js"> </script>-->
<style type="text/css">
#mynetwork {
width: 500px;
height: 500px;
background-color: #ffffff;
border: 1px solid lightgray;
position: relative;
float: left;
}
</style>
</head>
<body>
<div id = "mynetwork"></div>
<script type="text/javascript">
// initialize global variables.
var edges;
var nodes;
var network;
var container;
var options, data;
// This method is responsible for drawing the graph, returns the drawn network
function drawGraph() {
var container = document.getElementById('mynetwork');
// parsing and collecting nodes and edges from the python
nodes = new vis.DataSet([{"color": "#70ec84", "id": 0, "label": 0, "shape": "dot"}, {"color": "#70ec84", "id": 1, "label": 1, "shape": "dot"}, {"color": "#ffa07a", "id": 2, "label": 2, "shape": "dot"}, {"color": "#ffa07a", "id": 3, "label": 3, "shape": "dot"}, {"color": "#005582", "id": 4, "label": 4, "shape": "dot"}, {"color": "#005582", "id": 5, "label": 5, "shape": "dot"}, {"id": "sender", "label": "sender", "shape": "dot"}]);
edges = new vis.DataSet([{"from": "sender", "to": 0}, {"from": "sender", "to": 2}, {"from": "sender", "to": 4}, {"from": 0, "to": 1}, {"from": 2, "to": 3}, {"from": 4, "to": 5}]);
// adding nodes and edges to the graph
data = {nodes: nodes, edges: edges};
var options = {
"configure": {
"enabled": false
},
"edges": {
"color": {
"inherit": true
},
"smooth": {
"enabled": false,
"type": "continuous"
}
},
"interaction": {
"dragNodes": true,
"hideEdgesOnDrag": false,
"hideNodesOnDrag": false
},
"physics": {
"barnesHut": {
"avoidOverlap": 0,
"centralGravity": 0.3,
"damping": 0.09,
"gravitationalConstant": -80000,
"springConstant": 0.001,
"springLength": 250
},
"enabled": true,
"stabilization": {
"enabled": true,
"fit": true,
"iterations": 1000,
"onlyDynamicEdges": false,
"updateInterval": 50
}
}
};
// default to using dot shape for nodes
options.nodes = {
shape: "dot"
}
network = new vis.Network(container, data, options);
return network;
}
drawGraph();
</script>
</body>
</html>

View File

@ -0,0 +1,70 @@
import multiaddr
import asyncio
from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
from tests.pubsub.utils import message_id_generator
TOPIC = "eth"
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
class ReceiverNode():
"""
Node which has an internal balance mapping, meant to serve as
a dummy crypto blockchain. There is no actual blockchain, just a simple
map indicating how much crypto each user in the mappings holds
"""
def __init__(self):
self.next_msg_id_func = message_id_generator(0)
@classmethod
async def create(cls, ack_protocol, topic):
"""
Create a new ReceiverNode and attach a libp2p node, a floodsub, and a pubsub
instance to this new node
We use create as this serves as a factory function and allows us
to use async await, unlike the init function
"""
self = ReceiverNode()
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
self.libp2p_node = libp2p_node
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
self.pubsub_messages = await self.pubsub.subscribe(topic)
self.topic = topic
self.ack_protocol = ack_protocol
return self
async def wait_for_end(self, ack_stream):
# Continue waiting for end message, even if None (i.e. timeout) is received
msg = await ack_stream.read()
while msg is None:
msg = await ack_stream.read()
msg = msg.decode()
if msg == "end":
self.should_listen = False
print("End received")
async def start_receiving(self, sender_node_info):
await self.libp2p_node.connect(sender_node_info)
ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, [self.ack_protocol])
asyncio.ensure_future(self.wait_for_end(ack_stream))
self.should_listen = True
ack_msg = self.topic
encoded_ack_msg = ack_msg.encode()
while self.should_listen:
msg = await self.pubsub_messages.get()
await ack_stream.write(encoded_ack_msg)
print("Receiver closed")

159
examples/sharding/sender.py Normal file
View File

@ -0,0 +1,159 @@
import asyncio
import multiaddr
from timeit import default_timer as timer
from tests.utils import cleanup
from tests.pubsub.utils import generate_RPC_packet, message_id_generator
from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
TOPIC = "eth"
class SenderNode():
"""
SenderNode which pings ReceiverNodes continuously to perform benchmarking
"""
def __init__(self):
self.next_msg_id_func = message_id_generator(0)
self.ack_queue = asyncio.Queue()
@classmethod
async def create(cls, ack_protocol):
"""
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
instance to this new node
We use create as this serves as a factory function and allows us
to use async await, unlike the init function
"""
self = SenderNode()
libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
await libp2p_node.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
self.libp2p_node = libp2p_node
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
await self.pubsub.subscribe(TOPIC)
self.test_being_performed = True
self.all_streams = []
async def ack_stream_handler(stream):
self.all_streams.append(stream)
while self.test_being_performed:
# This Ack is what times out when multi-topic tests finish
ack = await stream.read()
if ack is not None:
await self.ack_queue.put(ack)
else:
print("FUCK")
break
# Reached once test_being_performed is False
# Notify receivers test is over
print("Writing end")
await stream.write("end".encode())
# Set handler for acks
self.ack_protocol = ack_protocol
self.libp2p_node.set_stream_handler(self.ack_protocol, ack_stream_handler)
return self
async def perform_test(self, num_receivers_in_each_topic, topics, time_length):
# Time and loop
my_id = str(self.libp2p_node.get_id())
msg_contents = "transaction"
num_sent_in_each_topic = {}
num_acks_in_each_topic = {}
for topic in topics:
num_sent_in_each_topic[topic] = 0
num_acks_in_each_topic[topic] = 0
self.topic_ack_queues = {}
for topic in topics:
self.topic_ack_queues[topic] = asyncio.Queue()
completed_topics_count = 0
num_topics = len(topics)
async def handle_ack_queues():
start = timer()
curr_time = timer()
print("Handling ack queues")
nonlocal completed_topics_count, num_topics
while completed_topics_count < num_topics:
ack = await self.ack_queue.get()
decoded_ack = ack.decode()
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
curr_time = timer()
async def end_all_async():
# Add None to ack_queue to break out of the loop.
# Note: This is necessary given the current code or the code will never
# terminate
await self.ack_queue.put(None)
# This is not necessary but is useful for turning off the receivers gracefully
for stream in self.all_streams:
await stream.write("end".encode())
async def perform_test_on_topic(topic):
print("Performing test on topic " + topic)
start = timer()
curr_time = timer()
# Perform test while time is not up here AND
# while time is not up in handle_ack_queues, which is checked with the
# self.test_being_performed boolean
while (curr_time - start) < time_length:
# Send message on single topic
packet = generate_RPC_packet(my_id, [topic], msg_contents, self.next_msg_id_func())
await self.floodsub.publish(my_id, packet.SerializeToString())
num_sent_in_each_topic[topic] += 1
# Wait for acks
num_acks = 0
# While number of acks is below threshold AND
# while time is not up in handle_ack_queues, which is checked with the
# self.test_being_performed boolean
# TODO: Check safety of this. Does this make sense in the asyncio
# event-driven setting?
while num_acks < num_receivers_in_each_topic[topic]:
ack = await self.topic_ack_queues[topic].get()
num_acks += 1
num_acks_in_each_topic[topic] += 1
curr_time = timer()
nonlocal completed_topics_count, num_topics
print("Test completed " + topic)
completed_topics_count += 1
if completed_topics_count == num_topics:
self.test_being_performed = False
print("End all async")
await end_all_async()
tasks = [asyncio.ensure_future(handle_ack_queues())]
for topic in topics:
tasks.append(asyncio.ensure_future(perform_test_on_topic(topic)))
gathered = await asyncio.gather(*tasks, return_exceptions=True)
# Do something interesting with test results
print("Num sent: " + str(num_sent_in_each_topic))
print("Num fully ack: " + str(num_acks_in_each_topic))
# End test
self.test_being_performed = False

View File

@ -0,0 +1,13 @@
{
"topology": {
"sender": [0],
"0": [1],
"1": [2],
"2": [3],
"3": [4],
"4": [5]
},
"topic_map": {
"1": [0, 1, 2, 3, 4, 5]
}
}

View File

@ -0,0 +1,11 @@
{
"topology": {
"sender": [0],
"0": [1, 2],
"1": [3, 4],
"2": [5, 6]
},
"topic_map": {
"1": [0, 1, 2, 3, 4, 5, 6]
}
}

View File

@ -0,0 +1,13 @@
{
"topology": {
"sender": [0, 2, 4],
"0": [1],
"2": [3],
"4": [5]
},
"topic_map": {
"1": [0, 1],
"2": [2, 3],
"3": [4, 5]
}
}

View File

@ -5,4 +5,5 @@ pytest-asyncio
pylint
grpcio
grpcio-tools
lru-dict>=1.1.6
lru-dict>=1.1.6
pyvis