Compare commits
25 Commits
master
...
simplified
Author | SHA1 | Date | |
---|---|---|---|
|
28dc842103 | ||
|
0dc500c541 | ||
|
965a76fc9f | ||
|
13fc699b36 | ||
|
d50027c1ca | ||
|
c552134c7c | ||
|
db8eae74e5 | ||
|
ebda40e034 | ||
|
7f80d12007 | ||
|
cd43db0861 | ||
|
8acec4d2ed | ||
|
c59e809ee9 | ||
|
00257238f5 | ||
|
fa7251cb68 | ||
|
9277be98bf | ||
|
470f5e6e51 | ||
|
86dfc07321 | ||
|
7bacc8ca08 | ||
|
fe14c24a89 | ||
|
f570b19db8 | ||
|
82f881a49a | ||
|
08f845cc21 | ||
|
25def0c7d6 | ||
|
66427cd6f5 | ||
|
ba358335df |
|
@ -31,10 +31,12 @@ async def write_data(stream):
|
||||||
|
|
||||||
|
|
||||||
async def run(port, destination):
|
async def run(port, destination):
|
||||||
external_ip = urllib.request.urlopen(
|
external_ip = "192.168.1.39"
|
||||||
'https://v4.ident.me/').read().decode('utf8')
|
transport_opt_str = "/ip4/%s/tcp/%s" % (external_ip, port)
|
||||||
host = await new_node(
|
host = await new_node(
|
||||||
transport_opt=["/ip4/%s/tcp/%s" % (external_ip, port)])
|
transport_opt=[transport_opt_str])
|
||||||
|
addr = multiaddr.Multiaddr(transport_opt_str)
|
||||||
|
await host.get_network().listen(addr)
|
||||||
if not destination: # its the server
|
if not destination: # its the server
|
||||||
async def stream_handler(stream):
|
async def stream_handler(stream):
|
||||||
asyncio.ensure_future(read_data(stream))
|
asyncio.ensure_future(read_data(stream))
|
||||||
|
@ -43,8 +45,8 @@ async def run(port, destination):
|
||||||
|
|
||||||
port = None
|
port = None
|
||||||
ip = None
|
ip = None
|
||||||
for listener in host.network.listeners.values():
|
# for listener in host.get_network().listeners.values():
|
||||||
for addr in listener.get_addrs():
|
# for addr in listener.get_addrs():
|
||||||
ip = addr.value_for_protocol('ip4')
|
ip = addr.value_for_protocol('ip4')
|
||||||
port = int(addr.value_for_protocol('tcp'))
|
port = int(addr.value_for_protocol('tcp'))
|
||||||
|
|
||||||
|
@ -57,6 +59,7 @@ async def run(port, destination):
|
||||||
|
|
||||||
else: # its the client
|
else: # its the client
|
||||||
m = multiaddr.Multiaddr(destination)
|
m = multiaddr.Multiaddr(destination)
|
||||||
|
print(m)
|
||||||
info = info_from_p2p_addr(m)
|
info = info_from_p2p_addr(m)
|
||||||
# Associate the peer with local ip address
|
# Associate the peer with local ip address
|
||||||
await host.connect(info)
|
await host.connect(info)
|
||||||
|
@ -76,7 +79,6 @@ async def run(port, destination):
|
||||||
@click.option('--help', is_flag=True, default=False, help='display help')
|
@click.option('--help', is_flag=True, default=False, help='display help')
|
||||||
# @click.option('--debug', is_flag=True, default=False, help='Debug generates the same node ID on every execution')
|
# @click.option('--debug', is_flag=True, default=False, help='Debug generates the same node ID on every execution')
|
||||||
def main(port, destination, help):
|
def main(port, destination, help):
|
||||||
|
|
||||||
if help:
|
if help:
|
||||||
print("This program demonstrates a simple p2p chat application using libp2p\n\n")
|
print("This program demonstrates a simple p2p chat application using libp2p\n\n")
|
||||||
print("Usage: Run './chat -p <SOURCE_PORT>' where <SOURCE_PORT> can be any port number.")
|
print("Usage: Run './chat -p <SOURCE_PORT>' where <SOURCE_PORT> can be any port number.")
|
||||||
|
|
109
examples/sharding/driver.py
Normal file
109
examples/sharding/driver.py
Normal file
|
@ -0,0 +1,109 @@
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from sender import SenderNode
|
||||||
|
from receiver import ReceiverNode
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from tests.utils import cleanup
|
||||||
|
|
||||||
|
ACK_PROTOCOL = "/ack/1.0.0"
|
||||||
|
|
||||||
|
async def create_receivers(num_receivers, topic_map):
|
||||||
|
receivers = []
|
||||||
|
|
||||||
|
# From topic_map (topic -> list of receivers), create (receiver -> topic)
|
||||||
|
receiver_to_topic_map = {}
|
||||||
|
for topic in topic_map:
|
||||||
|
for receiver in topic_map[topic]:
|
||||||
|
receiver_to_topic_map[receiver] = topic
|
||||||
|
|
||||||
|
# Create receivers
|
||||||
|
for i in range(num_receivers):
|
||||||
|
receivers.append(await ReceiverNode.create(ACK_PROTOCOL, receiver_to_topic_map[i]))
|
||||||
|
return receivers
|
||||||
|
|
||||||
|
async def connect(node1, node2):
|
||||||
|
# node1 connects to node2
|
||||||
|
addr = node2.get_addrs()[0]
|
||||||
|
info = info_from_p2p_addr(addr)
|
||||||
|
await node1.connect(info)
|
||||||
|
|
||||||
|
async def create_topology(adjacency_map, sender, receivers):
|
||||||
|
# Create network
|
||||||
|
|
||||||
|
# Connect senders to receivers
|
||||||
|
for target_num in adjacency_map["sender"]:
|
||||||
|
await connect(sender.libp2p_node, receivers[target_num].libp2p_node)
|
||||||
|
|
||||||
|
# Connect receivers to other receivers
|
||||||
|
for source_num_str in adjacency_map:
|
||||||
|
if source_num_str != "sender":
|
||||||
|
target_nums = adjacency_map[source_num_str]
|
||||||
|
source_num = int(source_num_str)
|
||||||
|
for target_num in target_nums:
|
||||||
|
await connect(receivers[source_num].libp2p_node, \
|
||||||
|
receivers[target_num].libp2p_node)
|
||||||
|
|
||||||
|
def get_num_receivers_in_topology(topology):
|
||||||
|
receiver_ids = []
|
||||||
|
for key_str in topology:
|
||||||
|
if key_str != "sender":
|
||||||
|
key_num = int(key_str)
|
||||||
|
if key_num not in receiver_ids:
|
||||||
|
receiver_ids.append(key_num)
|
||||||
|
for neighbor in topology[key_str]:
|
||||||
|
if neighbor not in receiver_ids:
|
||||||
|
receiver_ids.append(neighbor)
|
||||||
|
return len(receiver_ids)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Create sender
|
||||||
|
sender = await SenderNode.create(ACK_PROTOCOL)
|
||||||
|
print("Sender created")
|
||||||
|
|
||||||
|
# Define connection topology
|
||||||
|
topology_dict = json.loads(open(sys.argv[1]).read())
|
||||||
|
|
||||||
|
topology = topology_dict["topology"]
|
||||||
|
|
||||||
|
num_receivers = get_num_receivers_in_topology(topology)
|
||||||
|
|
||||||
|
# Define topic map
|
||||||
|
topic_map = topology_dict["topic_map"]
|
||||||
|
|
||||||
|
topics = topic_map.keys()
|
||||||
|
|
||||||
|
# Create receivers
|
||||||
|
receivers = await create_receivers(num_receivers, topic_map)
|
||||||
|
print("Receivers created")
|
||||||
|
|
||||||
|
# Create network topology
|
||||||
|
await create_topology(topology, sender, receivers)
|
||||||
|
print("Topology created")
|
||||||
|
|
||||||
|
# Perform throughput test
|
||||||
|
# First, start receivers
|
||||||
|
sender_info = info_from_p2p_addr(sender.libp2p_node.get_addrs()[0])
|
||||||
|
for receiver in receivers:
|
||||||
|
print("Starting receiving")
|
||||||
|
asyncio.ensure_future(receiver.start_receiving(sender_info))
|
||||||
|
|
||||||
|
# Allow time for start receiving to be completed
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
# Start sending messages and perform throughput test
|
||||||
|
# Determine number of receivers in each topic
|
||||||
|
num_receivers_in_each_topic = {}
|
||||||
|
for topic in topic_map:
|
||||||
|
num_receivers_in_each_topic[topic] = len(topic_map[topic])
|
||||||
|
print("Performing test")
|
||||||
|
await sender.perform_test(num_receivers_in_each_topic, topics, 10)
|
||||||
|
|
||||||
|
print("All testing completed")
|
||||||
|
await cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(main())
|
||||||
|
loop.close()
|
39
examples/sharding/graph_generator.py
Normal file
39
examples/sharding/graph_generator.py
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
import json
|
||||||
|
from pyvis.network import Network
|
||||||
|
from sys import argv
|
||||||
|
|
||||||
|
COLORS = ["#7b47bf", "#70ec84", "#ffa07a", "#005582", "#165042", "#dcb2b8"]
|
||||||
|
|
||||||
|
# Read in topology+topics file into map
|
||||||
|
|
||||||
|
# Add nodes
|
||||||
|
def main():
|
||||||
|
net = Network()
|
||||||
|
net.barnes_hut()
|
||||||
|
|
||||||
|
topology_dict = json.loads(open(argv[1]).read())
|
||||||
|
|
||||||
|
adj_list = topology_dict["topology"]
|
||||||
|
topics_map = topology_dict["topic_map"]
|
||||||
|
|
||||||
|
# Assign colors to nodes in topics (note sender is not included in a topic)
|
||||||
|
for topic in topics_map:
|
||||||
|
index = int(topic)
|
||||||
|
color = COLORS[index]
|
||||||
|
net.add_nodes(topics_map[topic], \
|
||||||
|
color=[color for _ in range(len(topics_map[topic]))])
|
||||||
|
|
||||||
|
nodes_to_add = list(adj_list.keys())
|
||||||
|
net.add_nodes(nodes_to_add)
|
||||||
|
for node in adj_list:
|
||||||
|
node_val = node
|
||||||
|
if node != "sender":
|
||||||
|
node_val = int(node_val)
|
||||||
|
neighbors = adj_list[node]
|
||||||
|
for neighbor in neighbors:
|
||||||
|
net.add_edge(node_val, neighbor)
|
||||||
|
|
||||||
|
net.show(argv[2])
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
56
examples/sharding/most_basic_connect/driver.py
Normal file
56
examples/sharding/most_basic_connect/driver.py
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import multiaddr
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
from node import Node
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from tests.utils import cleanup
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from libp2p.peer.id import id_from_public_key
|
||||||
|
|
||||||
|
"""
|
||||||
|
Driver is called in the following way
|
||||||
|
python receiver_driver.py topology_config.json "my_node_id"
|
||||||
|
"""
|
||||||
|
|
||||||
|
SLEEP_TIME = 5
|
||||||
|
|
||||||
|
async def connect(node1, node2_addr):
|
||||||
|
# node1 connects to node2
|
||||||
|
info = info_from_p2p_addr(node2_addr)
|
||||||
|
await node1.connect(info)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Create Node
|
||||||
|
my_transport_opt_str = sys.argv[1]
|
||||||
|
node = await Node.create(my_transport_opt_str)
|
||||||
|
|
||||||
|
# Allow for all nodes to start up
|
||||||
|
# await asyncio.sleep(SLEEP_TIME)
|
||||||
|
|
||||||
|
if len(sys.argv) == 3:
|
||||||
|
neighbor_addr_str = sys.argv[2]
|
||||||
|
|
||||||
|
new_key = RSA.generate(2048, e=65537)
|
||||||
|
id_opt = id_from_public_key(new_key.publickey())
|
||||||
|
|
||||||
|
# Add p2p part
|
||||||
|
neighbor_addr_str += "/p2p/" + id_opt.pretty()
|
||||||
|
|
||||||
|
# Convert neighbor_addr_str to multiaddr
|
||||||
|
neighbor_addr = multiaddr.Multiaddr(neighbor_addr_str)
|
||||||
|
print("Connecting")
|
||||||
|
await connect(node.libp2p_node, neighbor_addr)
|
||||||
|
print("Creating new stream")
|
||||||
|
s = await node.libp2p_node.new_stream(id_opt, ["/foo/1"])
|
||||||
|
await s.write("foo".encode())
|
||||||
|
await asyncio.sleep(15)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
# loop.run_until_complete(main())
|
||||||
|
asyncio.ensure_future(main())
|
||||||
|
loop.run_forever()
|
||||||
|
# loop.close()
|
50
examples/sharding/most_basic_connect/driver_single.py
Normal file
50
examples/sharding/most_basic_connect/driver_single.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import multiaddr
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
from node import Node
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from tests.utils import cleanup
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from libp2p.peer.id import id_from_public_key
|
||||||
|
|
||||||
|
"""
|
||||||
|
Driver is called in the following way
|
||||||
|
python receiver_driver.py topology_config.json "my_node_id"
|
||||||
|
"""
|
||||||
|
|
||||||
|
SLEEP_TIME = 5
|
||||||
|
|
||||||
|
async def connect(node1, node2_addr):
|
||||||
|
# node1 connects to node2
|
||||||
|
info = info_from_p2p_addr(node2_addr)
|
||||||
|
await node1.connect(info)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Create Node
|
||||||
|
my_transport_opt_str = sys.argv[1]
|
||||||
|
node1 = await Node.create(my_transport_opt_str)
|
||||||
|
|
||||||
|
# Allow for all nodes to start up
|
||||||
|
# await asyncio.sleep(SLEEP_TIME)
|
||||||
|
|
||||||
|
neighbor_addr_str = sys.argv[2]
|
||||||
|
node2 = await Node.create(neighbor_addr_str)
|
||||||
|
|
||||||
|
new_key = RSA.generate(2048, e=65537)
|
||||||
|
id_opt = id_from_public_key(new_key.publickey())
|
||||||
|
|
||||||
|
# Add p2p part
|
||||||
|
neighbor_addr_str += "/p2p/" + id_opt.pretty()
|
||||||
|
|
||||||
|
# Convert neighbor_addr_str to multiaddr
|
||||||
|
neighbor_addr = multiaddr.Multiaddr(neighbor_addr_str)
|
||||||
|
await connect(node1.libp2p_node, neighbor_addr)
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(main())
|
||||||
|
loop.close()
|
41
examples/sharding/most_basic_connect/node.py
Normal file
41
examples/sharding/most_basic_connect/node.py
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
import asyncio
|
||||||
|
import multiaddr
|
||||||
|
|
||||||
|
from timeit import default_timer as timer
|
||||||
|
|
||||||
|
from tests.utils import cleanup
|
||||||
|
from tests.pubsub.utils import generate_RPC_packet, message_id_generator
|
||||||
|
from libp2p import new_node
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
|
||||||
|
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||||
|
TOPIC = "eth"
|
||||||
|
|
||||||
|
class Node():
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def create(cls, transport_opt_str):
|
||||||
|
"""
|
||||||
|
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
|
||||||
|
instance to this new node
|
||||||
|
|
||||||
|
We use create as this serves as a factory function and allows us
|
||||||
|
to use async await, unlike the init function
|
||||||
|
"""
|
||||||
|
self = Node()
|
||||||
|
|
||||||
|
id_opt = ID("peer-")
|
||||||
|
|
||||||
|
print("Sender id: " + id_opt.pretty())
|
||||||
|
print("Transport opt is " + transport_opt_str)
|
||||||
|
|
||||||
|
libp2p_node = await new_node(transport_opt=[transport_opt_str])
|
||||||
|
await libp2p_node.get_network().listen(multiaddr.Multiaddr(transport_opt_str))
|
||||||
|
|
||||||
|
self.libp2p_node = libp2p_node
|
||||||
|
|
||||||
|
return self
|
117
examples/sharding/output.html
Normal file
117
examples/sharding/output.html
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/vis/4.16.1/vis.css" type="text/css" />
|
||||||
|
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/vis/4.16.1/vis-network.min.js"> </script>
|
||||||
|
|
||||||
|
<!-- <link rel="stylesheet" href="../node_modules/vis/dist/vis.min.css" type="text/css" />
|
||||||
|
<script type="text/javascript" src="../node_modules/vis/dist/vis.js"> </script>-->
|
||||||
|
|
||||||
|
<style type="text/css">
|
||||||
|
|
||||||
|
#mynetwork {
|
||||||
|
width: 500px;
|
||||||
|
height: 500px;
|
||||||
|
background-color: #ffffff;
|
||||||
|
border: 1px solid lightgray;
|
||||||
|
position: relative;
|
||||||
|
float: left;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
</style>
|
||||||
|
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<body>
|
||||||
|
<div id = "mynetwork"></div>
|
||||||
|
|
||||||
|
|
||||||
|
<script type="text/javascript">
|
||||||
|
|
||||||
|
// initialize global variables.
|
||||||
|
var edges;
|
||||||
|
var nodes;
|
||||||
|
var network;
|
||||||
|
var container;
|
||||||
|
var options, data;
|
||||||
|
|
||||||
|
|
||||||
|
// This method is responsible for drawing the graph, returns the drawn network
|
||||||
|
function drawGraph() {
|
||||||
|
var container = document.getElementById('mynetwork');
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// parsing and collecting nodes and edges from the python
|
||||||
|
nodes = new vis.DataSet([{"color": "#70ec84", "id": 0, "label": 0, "shape": "dot"}, {"color": "#70ec84", "id": 1, "label": 1, "shape": "dot"}, {"color": "#ffa07a", "id": 2, "label": 2, "shape": "dot"}, {"color": "#ffa07a", "id": 3, "label": 3, "shape": "dot"}, {"color": "#005582", "id": 4, "label": 4, "shape": "dot"}, {"color": "#005582", "id": 5, "label": 5, "shape": "dot"}, {"id": "sender", "label": "sender", "shape": "dot"}]);
|
||||||
|
edges = new vis.DataSet([{"from": "sender", "to": 0}, {"from": "sender", "to": 2}, {"from": "sender", "to": 4}, {"from": 0, "to": 1}, {"from": 2, "to": 3}, {"from": 4, "to": 5}]);
|
||||||
|
|
||||||
|
// adding nodes and edges to the graph
|
||||||
|
data = {nodes: nodes, edges: edges};
|
||||||
|
|
||||||
|
var options = {
|
||||||
|
"configure": {
|
||||||
|
"enabled": false
|
||||||
|
},
|
||||||
|
"edges": {
|
||||||
|
"color": {
|
||||||
|
"inherit": true
|
||||||
|
},
|
||||||
|
"smooth": {
|
||||||
|
"enabled": false,
|
||||||
|
"type": "continuous"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"interaction": {
|
||||||
|
"dragNodes": true,
|
||||||
|
"hideEdgesOnDrag": false,
|
||||||
|
"hideNodesOnDrag": false
|
||||||
|
},
|
||||||
|
"physics": {
|
||||||
|
"barnesHut": {
|
||||||
|
"avoidOverlap": 0,
|
||||||
|
"centralGravity": 0.3,
|
||||||
|
"damping": 0.09,
|
||||||
|
"gravitationalConstant": -80000,
|
||||||
|
"springConstant": 0.001,
|
||||||
|
"springLength": 250
|
||||||
|
},
|
||||||
|
"enabled": true,
|
||||||
|
"stabilization": {
|
||||||
|
"enabled": true,
|
||||||
|
"fit": true,
|
||||||
|
"iterations": 1000,
|
||||||
|
"onlyDynamicEdges": false,
|
||||||
|
"updateInterval": 50
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// default to using dot shape for nodes
|
||||||
|
options.nodes = {
|
||||||
|
shape: "dot"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
network = new vis.Network(container, data, options);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return network;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
drawGraph();
|
||||||
|
|
||||||
|
</script>
|
||||||
|
</body>
|
||||||
|
</html>
|
82
examples/sharding/receiver.py
Normal file
82
examples/sharding/receiver.py
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
import multiaddr
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from libp2p import new_node
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
from tests.pubsub.utils import message_id_generator
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
|
||||||
|
TOPIC = "eth"
|
||||||
|
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||||
|
|
||||||
|
class ReceiverNode():
|
||||||
|
"""
|
||||||
|
Node which has an internal balance mapping, meant to serve as
|
||||||
|
a dummy crypto blockchain. There is no actual blockchain, just a simple
|
||||||
|
map indicating how much crypto each user in the mappings holds
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.next_msg_id_func = message_id_generator(0)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def create(cls, node_id, transport_opt_str, ack_protocol, topic):
|
||||||
|
"""
|
||||||
|
Create a new ReceiverNode and attach a libp2p node, a floodsub, and a pubsub
|
||||||
|
instance to this new node
|
||||||
|
|
||||||
|
We use create as this serves as a factory function and allows us
|
||||||
|
to use async await, unlike the init function
|
||||||
|
"""
|
||||||
|
self = ReceiverNode()
|
||||||
|
|
||||||
|
id_opt = ID("peer-" + node_id)
|
||||||
|
|
||||||
|
libp2p_node = await new_node(transport_opt=[transport_opt_str])
|
||||||
|
await libp2p_node.get_network().listen(multiaddr.Multiaddr(transport_opt_str))
|
||||||
|
|
||||||
|
self.libp2p_node = libp2p_node
|
||||||
|
|
||||||
|
self.floodsub = None #FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
|
||||||
|
self.pubsub = None #Pubsub(self.libp2p_node, self.floodsub, "a")
|
||||||
|
|
||||||
|
self.pubsub_messages = None #await self.pubsub.subscribe(topic)
|
||||||
|
self.topic = topic
|
||||||
|
|
||||||
|
self.ack_protocol = ack_protocol
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def wait_for_end(self, ack_stream):
|
||||||
|
# Continue waiting for end message, even if None (i.e. timeout) is received
|
||||||
|
msg = await ack_stream.read()
|
||||||
|
while msg is None:
|
||||||
|
msg = await ack_stream.read()
|
||||||
|
msg = msg.decode()
|
||||||
|
if msg == "end":
|
||||||
|
self.should_listen = False
|
||||||
|
print("End received")
|
||||||
|
|
||||||
|
async def start_receiving(self, sender_node_info):
|
||||||
|
print("Receiving started")
|
||||||
|
|
||||||
|
await self.libp2p_node.connect(sender_node_info)
|
||||||
|
|
||||||
|
print("Connection to sender confirmed")
|
||||||
|
print("Creating ack stream with ack protocol " + self.ack_protocol \
|
||||||
|
+ ", peer_id " + sender_node_info.peer_id.pretty())
|
||||||
|
|
||||||
|
ack_stream = await self.libp2p_node.new_stream(sender_node_info.peer_id, [self.ack_protocol])
|
||||||
|
print("Ack stream created")
|
||||||
|
asyncio.ensure_future(self.wait_for_end(ack_stream))
|
||||||
|
|
||||||
|
print("Listening for ack messages")
|
||||||
|
self.should_listen = True
|
||||||
|
ack_msg = self.topic
|
||||||
|
encoded_ack_msg = ack_msg.encode()
|
||||||
|
while self.should_listen:
|
||||||
|
msg = await self.pubsub_messages.get()
|
||||||
|
await ack_stream.write(encoded_ack_msg)
|
||||||
|
print("Receiver closed")
|
126
examples/sharding/receiver_driver.py
Normal file
126
examples/sharding/receiver_driver.py
Normal file
|
@ -0,0 +1,126 @@
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import multiaddr
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
from sender import SenderNode
|
||||||
|
from receiver import ReceiverNode
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from tests.utils import cleanup
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from libp2p.peer.id import id_from_public_key
|
||||||
|
|
||||||
|
"""
|
||||||
|
Driver is called in the following way
|
||||||
|
python receiver_driver.py topology_config.json "my_node_id"
|
||||||
|
"""
|
||||||
|
|
||||||
|
SLEEP_TIME = 5
|
||||||
|
|
||||||
|
async def connect(node1, node2_addr):
|
||||||
|
# node1 connects to node2
|
||||||
|
info = info_from_p2p_addr(node2_addr)
|
||||||
|
await node1.connect(info)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""
|
||||||
|
Read in topology config file, which contains
|
||||||
|
a map of node IDs to peer IDs, an adjacency list (named topology) using node IDs,
|
||||||
|
a map of node IDs to topics, and ACK_PROTOCOL
|
||||||
|
|
||||||
|
{
|
||||||
|
"node_id_map": {
|
||||||
|
"sender": "sender multiaddr",
|
||||||
|
"some id 0": "some multiaddr",
|
||||||
|
"some id 1": "some multiaddr",
|
||||||
|
...
|
||||||
|
},
|
||||||
|
"topology": {
|
||||||
|
"sender": ["some id 0", "some id 1", ...],
|
||||||
|
"0": ["some id 0", "some id 1", ...],
|
||||||
|
"1": ["some id 0", "some id 1", ...],
|
||||||
|
...
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"some id 0": "some topic name 1",
|
||||||
|
"some id 1": "some topic name 2",
|
||||||
|
"some id 2": "some topic name 3"
|
||||||
|
},
|
||||||
|
"ACK_PROTOCOL": "some ack protocol"
|
||||||
|
}
|
||||||
|
|
||||||
|
Ex.
|
||||||
|
|
||||||
|
{
|
||||||
|
"node_id_map": {
|
||||||
|
"sender": "/ip4/127.0.0.1/tcp/8000",
|
||||||
|
"0": "/ip4/127.0.0.1/tcp/8001",
|
||||||
|
"1": "/ip4/127.0.0.1/tcp/8002",
|
||||||
|
"2": "/ip4/127.0.0.1/tcp/8003"
|
||||||
|
},
|
||||||
|
"topology": {
|
||||||
|
"sender": ["0"],
|
||||||
|
"0": ["1", "2"],
|
||||||
|
"1": ["0"],
|
||||||
|
"2": ["0"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"0": "topic1",
|
||||||
|
"1": "topic1",
|
||||||
|
"2": "topic1"
|
||||||
|
},
|
||||||
|
"ACK_PROTOCOL": "/ack/1.0.0"
|
||||||
|
}
|
||||||
|
|
||||||
|
"""
|
||||||
|
topology_config_dict = json.loads(open(sys.argv[1]).read())
|
||||||
|
my_node_id = sys.argv[2]
|
||||||
|
|
||||||
|
# Get my topic
|
||||||
|
my_topic = topology_config_dict["topic_map"][my_node_id]
|
||||||
|
|
||||||
|
ack_protocol = topology_config_dict["ACK_PROTOCOL"]
|
||||||
|
|
||||||
|
# Create Receiver Node
|
||||||
|
print("Creating receiver")
|
||||||
|
my_transport_opt_str = topology_config_dict["node_id_map"][my_node_id]
|
||||||
|
receiver_node = await ReceiverNode.create(my_node_id, my_transport_opt_str, ack_protocol, my_topic)
|
||||||
|
print("Receiver created")
|
||||||
|
|
||||||
|
# Allow for all nodes to start up
|
||||||
|
# await asyncio.sleep(SLEEP_TIME)
|
||||||
|
|
||||||
|
new_key = RSA.generate(2048, e=65537)
|
||||||
|
id_opt = id_from_public_key(new_key.publickey())
|
||||||
|
|
||||||
|
# Connect receiver node to all other relevant receiver nodes
|
||||||
|
for neighbor in topology_config_dict["topology"][my_node_id]:
|
||||||
|
neighbor_addr_str = topology_config_dict["node_id_map"][neighbor]
|
||||||
|
|
||||||
|
# Add p2p part
|
||||||
|
neighbor_addr_str += "/p2p/" + id_opt.pretty()
|
||||||
|
print(neighbor_addr_str)
|
||||||
|
# Convert neighbor_addr_str to multiaddr
|
||||||
|
neighbor_addr = multiaddr.Multiaddr(neighbor_addr_str)
|
||||||
|
await connect(receiver_node.libp2p_node, neighbor_addr)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get sender info as multiaddr
|
||||||
|
sender_addr_str = topology_config_dict["node_id_map"]["sender"] + "/p2p/" + id_opt.pretty()
|
||||||
|
|
||||||
|
# Convert sender_info_str to multiaddr
|
||||||
|
sender_addr = multiaddr.Multiaddr(sender_addr_str)
|
||||||
|
|
||||||
|
# Convert sender_addr to sender_info
|
||||||
|
sender_info = info_from_p2p_addr(sender_addr)
|
||||||
|
|
||||||
|
# Start listening for messages from sender
|
||||||
|
print("Start receiving called")
|
||||||
|
asyncio.ensure_future(receiver_node.start_receiving(sender_info))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(main())
|
||||||
|
loop.close()
|
167
examples/sharding/sender.py
Normal file
167
examples/sharding/sender.py
Normal file
|
@ -0,0 +1,167 @@
|
||||||
|
import asyncio
|
||||||
|
import multiaddr
|
||||||
|
|
||||||
|
from timeit import default_timer as timer
|
||||||
|
|
||||||
|
from tests.utils import cleanup
|
||||||
|
from tests.pubsub.utils import generate_RPC_packet, message_id_generator
|
||||||
|
from libp2p import new_node
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
from libp2p.pubsub.pubsub import Pubsub
|
||||||
|
from libp2p.pubsub.floodsub import FloodSub
|
||||||
|
|
||||||
|
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"]
|
||||||
|
TOPIC = "eth"
|
||||||
|
|
||||||
|
class SenderNode():
|
||||||
|
"""
|
||||||
|
SenderNode which pings ReceiverNodes continuously to perform benchmarking
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.next_msg_id_func = message_id_generator(0)
|
||||||
|
self.ack_queue = asyncio.Queue()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def create(cls, node_id, transport_opt_str, ack_protocol):
|
||||||
|
"""
|
||||||
|
Create a new DummyAccountNode and attach a libp2p node, a floodsub, and a pubsub
|
||||||
|
instance to this new node
|
||||||
|
|
||||||
|
We use create as this serves as a factory function and allows us
|
||||||
|
to use async await, unlike the init function
|
||||||
|
"""
|
||||||
|
self = SenderNode()
|
||||||
|
|
||||||
|
id_opt = ID("peer-" + node_id)
|
||||||
|
|
||||||
|
print("Sender id: " + id_opt.pretty())
|
||||||
|
print("Transport opt is " + transport_opt_str)
|
||||||
|
|
||||||
|
libp2p_node = await new_node(transport_opt=[transport_opt_str])
|
||||||
|
await libp2p_node.get_network().listen(multiaddr.Multiaddr(transport_opt_str))
|
||||||
|
|
||||||
|
self.libp2p_node = libp2p_node
|
||||||
|
|
||||||
|
self.floodsub = FloodSub(SUPPORTED_PUBSUB_PROTOCOLS)
|
||||||
|
self.pubsub = Pubsub(self.libp2p_node, self.floodsub, "a")
|
||||||
|
await self.pubsub.subscribe(TOPIC)
|
||||||
|
|
||||||
|
self.test_being_performed = True
|
||||||
|
|
||||||
|
self.all_streams = []
|
||||||
|
async def ack_stream_handler(stream):
|
||||||
|
self.all_streams.append(stream)
|
||||||
|
|
||||||
|
while self.test_being_performed:
|
||||||
|
# This Ack is what times out when multi-topic tests finish
|
||||||
|
ack = await stream.read()
|
||||||
|
if ack is not None:
|
||||||
|
await self.ack_queue.put(ack)
|
||||||
|
else:
|
||||||
|
print("FUCK")
|
||||||
|
break
|
||||||
|
# Reached once test_being_performed is False
|
||||||
|
# Notify receivers test is over
|
||||||
|
print("Writing end")
|
||||||
|
await stream.write("end".encode())
|
||||||
|
|
||||||
|
|
||||||
|
print("Sender ack protocol: " + ack_protocol)
|
||||||
|
# Set handler for acks
|
||||||
|
self.ack_protocol = ack_protocol
|
||||||
|
self.libp2p_node.set_stream_handler(self.ack_protocol, ack_stream_handler)
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def perform_test(self, num_receivers_in_each_topic, topics, time_length):
|
||||||
|
# Time and loop
|
||||||
|
|
||||||
|
my_id = str(self.libp2p_node.get_id())
|
||||||
|
msg_contents = "transaction"
|
||||||
|
|
||||||
|
num_sent_in_each_topic = {}
|
||||||
|
num_acks_in_each_topic = {}
|
||||||
|
for topic in topics:
|
||||||
|
num_sent_in_each_topic[topic] = 0
|
||||||
|
num_acks_in_each_topic[topic] = 0
|
||||||
|
|
||||||
|
self.topic_ack_queues = {}
|
||||||
|
for topic in topics:
|
||||||
|
self.topic_ack_queues[topic] = asyncio.Queue()
|
||||||
|
completed_topics_count = 0
|
||||||
|
num_topics = len(topics)
|
||||||
|
async def handle_ack_queues():
|
||||||
|
start = timer()
|
||||||
|
curr_time = timer()
|
||||||
|
|
||||||
|
print("Handling ack queues")
|
||||||
|
nonlocal completed_topics_count, num_topics
|
||||||
|
while completed_topics_count < num_topics:
|
||||||
|
ack = await self.ack_queue.get()
|
||||||
|
decoded_ack = ack.decode()
|
||||||
|
|
||||||
|
await self.topic_ack_queues[decoded_ack].put(decoded_ack)
|
||||||
|
|
||||||
|
curr_time = timer()
|
||||||
|
|
||||||
|
async def end_all_async():
|
||||||
|
# Add None to ack_queue to break out of the loop.
|
||||||
|
# Note: This is necessary given the current code or the code will never
|
||||||
|
# terminate
|
||||||
|
await self.ack_queue.put(None)
|
||||||
|
|
||||||
|
# This is not necessary but is useful for turning off the receivers gracefully
|
||||||
|
for stream in self.all_streams:
|
||||||
|
await stream.write("end".encode())
|
||||||
|
|
||||||
|
async def perform_test_on_topic(topic):
|
||||||
|
print("Performing test on topic " + topic)
|
||||||
|
start = timer()
|
||||||
|
curr_time = timer()
|
||||||
|
|
||||||
|
# Perform test while time is not up here AND
|
||||||
|
# while time is not up in handle_ack_queues, which is checked with the
|
||||||
|
# self.test_being_performed boolean
|
||||||
|
while (curr_time - start) < time_length:
|
||||||
|
# Send message on single topic
|
||||||
|
packet = generate_RPC_packet(my_id, [topic], msg_contents, self.next_msg_id_func())
|
||||||
|
|
||||||
|
await self.floodsub.publish(my_id, packet.SerializeToString())
|
||||||
|
num_sent_in_each_topic[topic] += 1
|
||||||
|
|
||||||
|
# Wait for acks
|
||||||
|
num_acks = 0
|
||||||
|
|
||||||
|
# While number of acks is below threshold AND
|
||||||
|
# while time is not up in handle_ack_queues, which is checked with the
|
||||||
|
# self.test_being_performed boolean
|
||||||
|
# TODO: Check safety of this. Does this make sense in the asyncio
|
||||||
|
# event-driven setting?
|
||||||
|
while num_acks < num_receivers_in_each_topic[topic]:
|
||||||
|
ack = await self.topic_ack_queues[topic].get()
|
||||||
|
num_acks += 1
|
||||||
|
num_acks_in_each_topic[topic] += 1
|
||||||
|
curr_time = timer()
|
||||||
|
|
||||||
|
nonlocal completed_topics_count, num_topics
|
||||||
|
print("Test completed " + topic)
|
||||||
|
completed_topics_count += 1
|
||||||
|
if completed_topics_count == num_topics:
|
||||||
|
self.test_being_performed = False
|
||||||
|
print("End all async")
|
||||||
|
await end_all_async()
|
||||||
|
|
||||||
|
tasks = [asyncio.ensure_future(handle_ack_queues())]
|
||||||
|
|
||||||
|
for topic in topics:
|
||||||
|
tasks.append(asyncio.ensure_future(perform_test_on_topic(topic)))
|
||||||
|
|
||||||
|
gathered = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
# Do something interesting with test results
|
||||||
|
print("Num sent: " + str(num_sent_in_each_topic))
|
||||||
|
print("Num fully ack: " + str(num_acks_in_each_topic))
|
||||||
|
|
||||||
|
# End test
|
||||||
|
self.test_being_performed = False
|
70
examples/sharding/sender_driver.py
Normal file
70
examples/sharding/sender_driver.py
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import multiaddr
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from libp2p.peer.id import ID
|
||||||
|
from sender import SenderNode
|
||||||
|
from receiver import ReceiverNode
|
||||||
|
from libp2p.peer.peerinfo import info_from_p2p_addr
|
||||||
|
from tests.utils import cleanup
|
||||||
|
|
||||||
|
SLEEP_TIME = 5
|
||||||
|
|
||||||
|
async def connect(node1, node2_addr):
|
||||||
|
# node1 connects to node2
|
||||||
|
info = info_from_p2p_addr(node2_addr)
|
||||||
|
await node1.connect(info)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""
|
||||||
|
Read in topology config file, which contains
|
||||||
|
a map of node IDs to peer IDs, an adjacency list (named topology) using node IDs,
|
||||||
|
a map of node IDs to topics, and ACK_PROTOCOL
|
||||||
|
"""
|
||||||
|
topology_config_dict = json.loads(open(sys.argv[1]).read())
|
||||||
|
my_node_id = sys.argv[2]
|
||||||
|
|
||||||
|
ack_protocol = topology_config_dict["ACK_PROTOCOL"]
|
||||||
|
|
||||||
|
# Create sender
|
||||||
|
print("Creating sender")
|
||||||
|
my_transport_opt_str = topology_config_dict["node_id_map"][my_node_id]
|
||||||
|
sender_node = await SenderNode.create(my_node_id, my_transport_opt_str, ack_protocol)
|
||||||
|
print("Sender created")
|
||||||
|
|
||||||
|
# Allow for all nodes to start up
|
||||||
|
# await asyncio.sleep(SLEEP_TIME)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
new_key = RSA.generate(2048, e=65537)
|
||||||
|
id_opt = id_from_public_key(new_key.publickey())
|
||||||
|
|
||||||
|
# Connect sender node to all other relevant sender nodes
|
||||||
|
for neighbor in topology_config_dict["topology"][my_node_id]:
|
||||||
|
neighbor_addr_str = topology_config_dict["node_id_map"][neighbor]
|
||||||
|
|
||||||
|
# Add p2p part
|
||||||
|
neighbor_addr_str += "/p2p/" + id_opt.pretty()
|
||||||
|
|
||||||
|
# Convert neighbor_addr_str to multiaddr
|
||||||
|
neighbor_addr = multiaddr.Multiaddr(neighbor_addr_str)
|
||||||
|
await connect(sender_node.libp2p_node, neighbor_addr)
|
||||||
|
|
||||||
|
# Perform throughput test
|
||||||
|
# Start sending messages and perform throughput test
|
||||||
|
# Determine number of receivers in each topic
|
||||||
|
topic_map = topology_config_dict["topic_map"]
|
||||||
|
topics = topic_map.keys()
|
||||||
|
|
||||||
|
num_receivers_in_each_topic = {}
|
||||||
|
for topic in topic_map:
|
||||||
|
num_receivers_in_each_topic[topic] = len(topic_map[topic])
|
||||||
|
print("Performing test")
|
||||||
|
await sender_node.perform_test(num_receivers_in_each_topic, topics, 10)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(main())
|
||||||
|
loop.close()
|
13
examples/sharding/topologies/simple_1_chain.json
Normal file
13
examples/sharding/topologies/simple_1_chain.json
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
{
|
||||||
|
"topology": {
|
||||||
|
"sender": [0],
|
||||||
|
"0": [1],
|
||||||
|
"1": [2],
|
||||||
|
"2": [3],
|
||||||
|
"3": [4],
|
||||||
|
"4": [5]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"1": [0, 1, 2, 3, 4, 5]
|
||||||
|
}
|
||||||
|
}
|
11
examples/sharding/topologies/simple_1_tree.json
Normal file
11
examples/sharding/topologies/simple_1_tree.json
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"topology": {
|
||||||
|
"sender": [0],
|
||||||
|
"0": [1, 2],
|
||||||
|
"1": [3, 4],
|
||||||
|
"2": [5, 6]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"1": [0, 1, 2, 3, 4, 5, 6]
|
||||||
|
}
|
||||||
|
}
|
13
examples/sharding/topologies/simple_3_chains.json
Normal file
13
examples/sharding/topologies/simple_3_chains.json
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
{
|
||||||
|
"topology": {
|
||||||
|
"sender": [0, 2, 4],
|
||||||
|
"0": [1],
|
||||||
|
"2": [3],
|
||||||
|
"4": [5]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"1": [0, 1],
|
||||||
|
"2": [2, 3],
|
||||||
|
"3": [4, 5]
|
||||||
|
}
|
||||||
|
}
|
14
examples/sharding/topologies_isolated/simple_1_chain.json
Normal file
14
examples/sharding/topologies_isolated/simple_1_chain.json
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
{
|
||||||
|
"node_id_map": {
|
||||||
|
"sender": "/ip4/127.0.0.1/tcp/8004",
|
||||||
|
"0": "/ip4/127.0.0.1/tcp/8005"
|
||||||
|
},
|
||||||
|
"topology": {
|
||||||
|
"sender": ["0"],
|
||||||
|
"0": ["sender"]
|
||||||
|
},
|
||||||
|
"topic_map": {
|
||||||
|
"0": "topic1"
|
||||||
|
},
|
||||||
|
"ACK_PROTOCOL": "/ack/1.0.0"
|
||||||
|
}
|
|
@ -65,21 +65,22 @@ class Swarm(INetwork):
|
||||||
# set muxed connection equal to existing muxed connection
|
# set muxed connection equal to existing muxed connection
|
||||||
muxed_conn = self.connections[peer_id]
|
muxed_conn = self.connections[peer_id]
|
||||||
else:
|
else:
|
||||||
|
print("Dialing " + str(peer_id))
|
||||||
# Dial peer (connection to peer does not yet exist)
|
# Dial peer (connection to peer does not yet exist)
|
||||||
# Transport dials peer (gets back a raw conn)
|
# Transport dials peer (gets back a raw conn)
|
||||||
raw_conn = await self.transport.dial(multiaddr, self.self_id)
|
raw_conn = await self.transport.dial(multiaddr, self.self_id)
|
||||||
|
print("raw conn created")
|
||||||
# Use upgrader to upgrade raw conn to muxed conn
|
# Use upgrader to upgrade raw conn to muxed conn
|
||||||
muxed_conn = self.upgrader.upgrade_connection(raw_conn, \
|
muxed_conn = self.upgrader.upgrade_connection(raw_conn, \
|
||||||
self.generic_protocol_handler, peer_id)
|
self.generic_protocol_handler, peer_id)
|
||||||
|
print("mux conn created")
|
||||||
# Store muxed connection in connections
|
# Store muxed connection in connections
|
||||||
self.connections[peer_id] = muxed_conn
|
self.connections[peer_id] = muxed_conn
|
||||||
|
|
||||||
# Call notifiers since event occurred
|
# Call notifiers since event occurred
|
||||||
for notifee in self.notifees:
|
# for notifee in self.notifees:
|
||||||
await notifee.connected(self, muxed_conn)
|
# await notifee.connected(self, muxed_conn)
|
||||||
|
print("Muxed conn returned")
|
||||||
return muxed_conn
|
return muxed_conn
|
||||||
|
|
||||||
async def new_stream(self, peer_id, protocol_ids):
|
async def new_stream(self, peer_id, protocol_ids):
|
||||||
|
@ -88,6 +89,7 @@ class Swarm(INetwork):
|
||||||
:param protocol_id: protocol id
|
:param protocol_id: protocol id
|
||||||
:return: net stream instance
|
:return: net stream instance
|
||||||
"""
|
"""
|
||||||
|
print("New stream")
|
||||||
# Get peer info from peer store
|
# Get peer info from peer store
|
||||||
addrs = self.peerstore.addrs(peer_id)
|
addrs = self.peerstore.addrs(peer_id)
|
||||||
|
|
||||||
|
@ -96,24 +98,35 @@ class Swarm(INetwork):
|
||||||
|
|
||||||
multiaddr = addrs[0]
|
multiaddr = addrs[0]
|
||||||
|
|
||||||
|
print("Dialing peer")
|
||||||
muxed_conn = await self.dial_peer(peer_id)
|
muxed_conn = await self.dial_peer(peer_id)
|
||||||
|
|
||||||
|
print("Opening stream")
|
||||||
|
|
||||||
# Use muxed conn to open stream, which returns
|
# Use muxed conn to open stream, which returns
|
||||||
# a muxed stream
|
# a muxed stream
|
||||||
# TODO: Remove protocol id from being passed into muxed_conn
|
# TODO: Remove protocol id from being passed into muxed_conn
|
||||||
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr)
|
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr)
|
||||||
|
|
||||||
|
print("Selecting protocol " + str(protocol_ids))
|
||||||
|
|
||||||
# Perform protocol muxing to determine protocol to use
|
# Perform protocol muxing to determine protocol to use
|
||||||
selected_protocol = await self.multiselect_client.select_one_of(protocol_ids, muxed_stream)
|
selected_protocol = await self.multiselect_client.select_one_of(protocol_ids, muxed_stream)
|
||||||
|
|
||||||
|
print("Creating net stream")
|
||||||
|
|
||||||
# Create a net stream with the selected protocol
|
# Create a net stream with the selected protocol
|
||||||
net_stream = NetStream(muxed_stream)
|
net_stream = NetStream(muxed_stream)
|
||||||
net_stream.set_protocol(selected_protocol)
|
net_stream.set_protocol(selected_protocol)
|
||||||
|
|
||||||
|
print("Calling notifees")
|
||||||
|
|
||||||
# Call notifiers since event occurred
|
# Call notifiers since event occurred
|
||||||
for notifee in self.notifees:
|
for notifee in self.notifees:
|
||||||
await notifee.opened_stream(self, net_stream)
|
await notifee.opened_stream(self, net_stream)
|
||||||
|
|
||||||
|
print("Returning net stream")
|
||||||
|
|
||||||
return net_stream
|
return net_stream
|
||||||
|
|
||||||
async def listen(self, *args):
|
async def listen(self, *args):
|
||||||
|
@ -135,8 +148,11 @@ class Swarm(INetwork):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def conn_handler(reader, writer):
|
async def conn_handler(reader, writer):
|
||||||
|
print("conn handler hit on listen")
|
||||||
|
print(multiaddr)
|
||||||
# Read in first message (should be peer_id of initiator) and ack
|
# Read in first message (should be peer_id of initiator) and ack
|
||||||
peer_id = id_b58_decode((await reader.read(1024)).decode())
|
peer_id = id_b58_decode((await reader.read(1024)).decode())
|
||||||
|
print("Conn handler hit peer_id " + str(peer_id))
|
||||||
|
|
||||||
writer.write("received peer id".encode())
|
writer.write("received peer id".encode())
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
|
@ -152,8 +168,8 @@ class Swarm(INetwork):
|
||||||
self.connections[peer_id] = muxed_conn
|
self.connections[peer_id] = muxed_conn
|
||||||
|
|
||||||
# Call notifiers since event occurred
|
# Call notifiers since event occurred
|
||||||
for notifee in self.notifees:
|
# for notifee in self.notifees:
|
||||||
await notifee.connected(self, muxed_conn)
|
# await notifee.connected(self, muxed_conn)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Success
|
# Success
|
||||||
|
|
|
@ -24,6 +24,9 @@ class ID:
|
||||||
return "<peer.ID %s>" % pid
|
return "<peer.ID %s>" % pid
|
||||||
return "<peer.ID %s*%s>" % (pid[:2], pid[len(pid)-6:])
|
return "<peer.ID %s*%s>" % (pid[:2], pid[len(pid)-6:])
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self._id_str)
|
||||||
|
|
||||||
__repr__ = __str__
|
__repr__ = __str__
|
||||||
|
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
|
|
|
@ -25,6 +25,7 @@ class Multiselect(IMultiselectMuxer):
|
||||||
self.handlers[protocol] = handler
|
self.handlers[protocol] = handler
|
||||||
|
|
||||||
async def negotiate(self, stream):
|
async def negotiate(self, stream):
|
||||||
|
print("negotiate")
|
||||||
"""
|
"""
|
||||||
Negotiate performs protocol selection
|
Negotiate performs protocol selection
|
||||||
:param stream: stream to negotiate on
|
:param stream: stream to negotiate on
|
||||||
|
@ -34,13 +35,18 @@ class Multiselect(IMultiselectMuxer):
|
||||||
# Create a communicator to handle all communication across the stream
|
# Create a communicator to handle all communication across the stream
|
||||||
communicator = MultiselectCommunicator(stream)
|
communicator = MultiselectCommunicator(stream)
|
||||||
|
|
||||||
|
print("sender pre handshake")
|
||||||
|
|
||||||
# Perform handshake to ensure multiselect protocol IDs match
|
# Perform handshake to ensure multiselect protocol IDs match
|
||||||
await self.handshake(communicator)
|
await self.handshake(communicator)
|
||||||
|
|
||||||
|
print("sender post handshake")
|
||||||
|
|
||||||
# Read and respond to commands until a valid protocol ID is sent
|
# Read and respond to commands until a valid protocol ID is sent
|
||||||
while True:
|
while True:
|
||||||
# Read message
|
# Read message
|
||||||
command = await communicator.read_stream_until_eof()
|
command = await communicator.read_stream_until_eof()
|
||||||
|
print("sender command " + command)
|
||||||
|
|
||||||
# Command is ls or a protocol
|
# Command is ls or a protocol
|
||||||
if command == "ls":
|
if command == "ls":
|
||||||
|
@ -48,12 +54,16 @@ class Multiselect(IMultiselectMuxer):
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
protocol = command
|
protocol = command
|
||||||
|
print("sender checking protocol in handlers")
|
||||||
if protocol in self.handlers:
|
if protocol in self.handlers:
|
||||||
# Tell counterparty we have decided on a protocol
|
# Tell counterparty we have decided on a protocol
|
||||||
|
print("sender writing protocol " + protocol)
|
||||||
await communicator.write(protocol)
|
await communicator.write(protocol)
|
||||||
|
print("sender protocol written")
|
||||||
|
|
||||||
# Return the decided on protocol
|
# Return the decided on protocol
|
||||||
return protocol, self.handlers[protocol]
|
return protocol, self.handlers[protocol]
|
||||||
|
print("sender protocol not found")
|
||||||
# Tell counterparty this protocol was not found
|
# Tell counterparty this protocol was not found
|
||||||
await communicator.write(PROTOCOL_NOT_FOUND_MSG)
|
await communicator.write(PROTOCOL_NOT_FOUND_MSG)
|
||||||
|
|
||||||
|
@ -66,14 +76,21 @@ class Multiselect(IMultiselectMuxer):
|
||||||
|
|
||||||
# TODO: Use format used by go repo for messages
|
# TODO: Use format used by go repo for messages
|
||||||
|
|
||||||
|
print("sender pre write " + MULTISELECT_PROTOCOL_ID)
|
||||||
|
|
||||||
# Send our MULTISELECT_PROTOCOL_ID to other party
|
# Send our MULTISELECT_PROTOCOL_ID to other party
|
||||||
await communicator.write(MULTISELECT_PROTOCOL_ID)
|
await communicator.write(MULTISELECT_PROTOCOL_ID)
|
||||||
|
|
||||||
|
print("sender pre read")
|
||||||
|
|
||||||
# Read in the protocol ID from other party
|
# Read in the protocol ID from other party
|
||||||
handshake_contents = await communicator.read_stream_until_eof()
|
handshake_contents = await communicator.read_stream_until_eof()
|
||||||
|
|
||||||
|
print("sender pre validate " + handshake_contents)
|
||||||
|
|
||||||
# Confirm that the protocols are the same
|
# Confirm that the protocols are the same
|
||||||
if not validate_handshake(handshake_contents):
|
if not validate_handshake(handshake_contents):
|
||||||
|
print("sender multiselect protocol ID mismatch")
|
||||||
raise MultiselectError("multiselect protocol ID mismatch")
|
raise MultiselectError("multiselect protocol ID mismatch")
|
||||||
|
|
||||||
# Handshake succeeded if this point is reached
|
# Handshake succeeded if this point is reached
|
||||||
|
|
|
@ -26,13 +26,16 @@ class MultiselectClient(IMultiselectClient):
|
||||||
# TODO: Use format used by go repo for messages
|
# TODO: Use format used by go repo for messages
|
||||||
|
|
||||||
# Send our MULTISELECT_PROTOCOL_ID to counterparty
|
# Send our MULTISELECT_PROTOCOL_ID to counterparty
|
||||||
|
print("handshake entered")
|
||||||
await communicator.write(MULTISELECT_PROTOCOL_ID)
|
await communicator.write(MULTISELECT_PROTOCOL_ID)
|
||||||
|
print("MULTISELECT_PROTOCOL_ID written " + MULTISELECT_PROTOCOL_ID)
|
||||||
# Read in the protocol ID from other party
|
# Read in the protocol ID from other party
|
||||||
handshake_contents = await communicator.read_stream_until_eof()
|
handshake_contents = await communicator.read_stream_until_eof()
|
||||||
|
print("handshake read " + handshake_contents)
|
||||||
|
|
||||||
# Confirm that the protocols are the same
|
# Confirm that the protocols are the same
|
||||||
if not validate_handshake(handshake_contents):
|
if not validate_handshake(handshake_contents):
|
||||||
|
print("multiselect protocol ID mismatch")
|
||||||
raise MultiselectClientError("multiselect protocol ID mismatch")
|
raise MultiselectClientError("multiselect protocol ID mismatch")
|
||||||
|
|
||||||
# Handshake succeeded if this point is reached
|
# Handshake succeeded if this point is reached
|
||||||
|
@ -67,12 +70,18 @@ class MultiselectClient(IMultiselectClient):
|
||||||
:return: selected protocol
|
:return: selected protocol
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
print("select_one_of")
|
||||||
|
|
||||||
# Create a communicator to handle all communication across the stream
|
# Create a communicator to handle all communication across the stream
|
||||||
communicator = MultiselectCommunicator(stream)
|
communicator = MultiselectCommunicator(stream)
|
||||||
|
|
||||||
|
print("Pre handshake")
|
||||||
|
|
||||||
# Perform handshake to ensure multiselect protocol IDs match
|
# Perform handshake to ensure multiselect protocol IDs match
|
||||||
await self.handshake(communicator)
|
await self.handshake(communicator)
|
||||||
|
|
||||||
|
print("Post handshake")
|
||||||
|
|
||||||
# For each protocol, attempt to select that protocol
|
# For each protocol, attempt to select that protocol
|
||||||
# and return the first protocol selected
|
# and return the first protocol selected
|
||||||
for protocol in protocols:
|
for protocol in protocols:
|
||||||
|
@ -80,6 +89,7 @@ class MultiselectClient(IMultiselectClient):
|
||||||
selected_protocol = await self.try_select(communicator, protocol)
|
selected_protocol = await self.try_select(communicator, protocol)
|
||||||
return selected_protocol
|
return selected_protocol
|
||||||
except MultiselectClientError:
|
except MultiselectClientError:
|
||||||
|
print("MultiselectClientError")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# No protocols were found, so return no protocols supported error
|
# No protocols were found, so return no protocols supported error
|
||||||
|
|
|
@ -58,9 +58,12 @@ class Mplex(IMuxedConn):
|
||||||
"""
|
"""
|
||||||
# TODO: propagate up timeout exception and catch
|
# TODO: propagate up timeout exception and catch
|
||||||
# TODO: pass down timeout from user and use that
|
# TODO: pass down timeout from user and use that
|
||||||
|
print("read buffer hit")
|
||||||
|
print(self.buffers)
|
||||||
if stream_id in self.buffers:
|
if stream_id in self.buffers:
|
||||||
try:
|
try:
|
||||||
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=8)
|
data = await asyncio.wait_for(self.buffers[stream_id].get(), timeout=30)
|
||||||
|
print("data received")
|
||||||
return data
|
return data
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
return None
|
return None
|
||||||
|
@ -99,6 +102,7 @@ class Mplex(IMuxedConn):
|
||||||
:return: True if success
|
:return: True if success
|
||||||
"""
|
"""
|
||||||
# << by 3, then or with flag
|
# << by 3, then or with flag
|
||||||
|
print("sending message")
|
||||||
header = (stream_id << 3) | flag
|
header = (stream_id << 3) | flag
|
||||||
header = encode_uvarint(header)
|
header = encode_uvarint(header)
|
||||||
|
|
||||||
|
@ -128,7 +132,9 @@ class Mplex(IMuxedConn):
|
||||||
# TODO Deal with other types of messages using flag (currently _)
|
# TODO Deal with other types of messages using flag (currently _)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
print("Message read waiting")
|
||||||
stream_id, flag, message = await self.read_message()
|
stream_id, flag, message = await self.read_message()
|
||||||
|
print("Message read occ")
|
||||||
|
|
||||||
if stream_id is not None and flag is not None and message is not None:
|
if stream_id is not None and flag is not None and message is not None:
|
||||||
if stream_id not in self.buffers:
|
if stream_id not in self.buffers:
|
||||||
|
@ -153,9 +159,11 @@ class Mplex(IMuxedConn):
|
||||||
|
|
||||||
# Timeout is set to a relatively small value to alleviate wait time to exit
|
# Timeout is set to a relatively small value to alleviate wait time to exit
|
||||||
# loop in handle_incoming
|
# loop in handle_incoming
|
||||||
timeout = 0.1
|
timeout = 10
|
||||||
try:
|
try:
|
||||||
|
print("Getting header")
|
||||||
header = await decode_uvarint_from_stream(self.raw_conn.reader, timeout)
|
header = await decode_uvarint_from_stream(self.raw_conn.reader, timeout)
|
||||||
|
print("Got header")
|
||||||
length = await decode_uvarint_from_stream(self.raw_conn.reader, timeout)
|
length = await decode_uvarint_from_stream(self.raw_conn.reader, timeout)
|
||||||
message = await asyncio.wait_for(self.raw_conn.reader.read(length), timeout=timeout)
|
message = await asyncio.wait_for(self.raw_conn.reader.read(length), timeout=timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
|
|
@ -38,6 +38,9 @@ class MplexStream(IMuxedStream):
|
||||||
write to stream
|
write to stream
|
||||||
:return: number of bytes written
|
:return: number of bytes written
|
||||||
"""
|
"""
|
||||||
|
print("Writing message")
|
||||||
|
print(self.stream_id)
|
||||||
|
print(self.mplex_conn.peer_id)
|
||||||
return await self.mplex_conn.send_message(
|
return await self.mplex_conn.send_message(
|
||||||
get_flag(self.initiator, "MESSAGE"), data, self.stream_id)
|
get_flag(self.initiator, "MESSAGE"), data, self.stream_id)
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,14 @@ async def decode_uvarint_from_stream(reader, timeout):
|
||||||
shift = 0
|
shift = 0
|
||||||
result = 0
|
result = 0
|
||||||
while True:
|
while True:
|
||||||
byte = await asyncio.wait_for(reader.read(1), timeout=timeout)
|
print("Decoding byte")
|
||||||
|
print("Trying byte")
|
||||||
|
print(reader)
|
||||||
|
byte = await asyncio.wait_for(reader.read(1), timeout=2000)
|
||||||
|
print("HIT me")
|
||||||
|
print("Byte is ")
|
||||||
|
print(type(byte))
|
||||||
|
print(len(byte))
|
||||||
i = struct.unpack('>H', b'\x00' + byte)[0]
|
i = struct.unpack('>H', b'\x00' + byte)[0]
|
||||||
result |= (i & 0x7f) << shift
|
result |= (i & 0x7f) << shift
|
||||||
shift += 7
|
shift += 7
|
||||||
|
|
|
@ -79,10 +79,12 @@ class TCP(ITransport):
|
||||||
|
|
||||||
# First: send our peer ID so receiver knows it
|
# First: send our peer ID so receiver knows it
|
||||||
writer.write(id_b58_encode(self_id).encode())
|
writer.write(id_b58_encode(self_id).encode())
|
||||||
|
print("Length of written id is " + str(len(id_b58_encode(self_id).encode())))
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
|
|
||||||
# Await ack for peer id
|
# Await ack for peer id
|
||||||
ack = (await reader.read(1024)).decode()
|
ack = (await reader.read(1024)).decode()
|
||||||
|
print(ack)
|
||||||
|
|
||||||
if ack != "received peer id":
|
if ack != "received peer id":
|
||||||
raise Exception("Receiver did not receive peer id")
|
raise Exception("Receiver did not receive peer id")
|
||||||
|
|
|
@ -6,3 +6,4 @@ pylint
|
||||||
grpcio
|
grpcio
|
||||||
grpcio-tools
|
grpcio-tools
|
||||||
lru-dict>=1.1.6
|
lru-dict>=1.1.6
|
||||||
|
pyvis
|
Loading…
Reference in New Issue
Block a user