diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 305607e..a4f2e86 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,8 +1,9 @@ -from .pubsub_router_interface import IPubsubRouter from .pb import rpc_pb2 +from .pubsub_router_interface import IPubsubRouter class FloodSub(IPubsubRouter): + # pylint: disable=no-member def __init__(self, protocols): self.protocols = protocols @@ -55,42 +56,22 @@ class FloodSub(IPubsubRouter): :param sender_peer_id: peer_id of message sender :param rpc_message: pubsub message in RPC string format """ - packet = rpc_pb2.RPC() packet.ParseFromString(rpc_message) - print ("IN FLOOODSUB PUBLISH") - print (packet) - print ("++++++++++++++++") msg_sender = str(sender_peer_id) # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message for message in packet.publish: decoded_from_id = message.from_id.decode('utf-8') - - print ("MESSAGE SENDER") - print (msg_sender) - print ("FROM ID") - print (message.from_id) - print (str(self.pubsub.host.get_id())) - - if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): - await self.pubsub.handle_talk(sender_peer_id, message) + await self.pubsub.handle_talk(message) - - print ("OHOHOHOH") - print (self.pubsub.peer_topics) - print ("UUUJUJUJ") - print (self.pubsub.peers) - print ("********") # Deliver to self and peers for topic in message.topicIDs: if topic in self.pubsub.peer_topics: for peer_id_in_topic in self.pubsub.peer_topics[topic]: # Forward to all known peers in the topic that are not the # message sender and are not the message origin - print ("PEERID") - print (peer_id_in_topic) if peer_id_in_topic not in (msg_sender, decoded_from_id): stream = self.pubsub.peers[peer_id_in_topic] # create new packet with just publish message diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index c7bfe06..bfad873 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,13 +1,12 @@ import asyncio import uuid -from .pb import rpc_pb2_grpc from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee class Pubsub(): - # pylint: disable=too-many-instance-attributes + # pylint: disable=too-many-instance-attributes, no-member def __init__(self, host, router, my_id): """ @@ -65,12 +64,6 @@ class Pubsub(): for topic_id in self.my_topics: packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe=True, topicid=topic_id)]) - - # message = rpc_pb2.Message( - # from_id=str(self.host.get_id()).encode('utf-8'), - # seqno=str(generate_message_id()).encode('utf-8') - # ) - # packet.publish.extend([message]) return packet.SerializeToString() @@ -82,19 +75,13 @@ class Pubsub(): """ # TODO check on types here - print ("++++++++++ASPYN+++++++++++++++++") peer_id = str(stream.mplex_conn.peer_id) while True: - print ("HIT ME") incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) - print ("IN PUBSUB CONTINUOUSLY READ") - print (rpc_incoming) - print ("###########################") - should_publish = False if rpc_incoming.publish: @@ -103,8 +90,7 @@ class Pubsub(): if message.seqno not in self.seen_messages: should_publish = True self.seen_messages.append(message.seqno) - await self.handle_talk(peer_id, message) - + await self.handle_talk(message) if rpc_incoming.subscriptions: # deal with RPC.subscriptions @@ -150,10 +136,8 @@ class Pubsub(): pubsub protocols we support """ while True: - print ("PUBSUB HANDLE PEER QUEUE") + peer_id = await self.peer_queue.get() - print (peer_id) - print ("++++++++++++++++++++++++") # Open a stream to peer on existing connection # (we know connection exists since that's the only way @@ -175,7 +159,7 @@ class Pubsub(): # Force context switch await asyncio.sleep(0) - def handle_subscription(self, peer_id, sub_message): + def handle_subscription(self, origin_id, sub_message): """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as @@ -183,23 +167,19 @@ class Pubsub(): :param origin_id: id of the peer who subscribe to the message :param sub_message: RPC.SubOpts """ - # TODO verify logic here - - if sub_message.subscribe: if sub_message.topicid not in self.peer_topics: - self.peer_topics[sub_message.topicid] = [peer_id] - elif peer_id not in self.peer_topics[sub_message.topicid]: + self.peer_topics[sub_message.topicid] = [origin_id] + elif origin_id not in self.peer_topics[sub_message.topicid]: # Add peer to topic - self.peer_topics[sub_message.topicid].append(peer_id) + self.peer_topics[sub_message.topicid].append(origin_id) else: # TODO: Remove peer from topic pass - async def handle_talk(self, peer_id, publish_message): + async def handle_talk(self, publish_message): """ Put incoming message from a peer onto my blocking queue - :param peer_id: peer id whom forwarded this message :param talk: RPC.Message format """ @@ -216,9 +196,8 @@ class Pubsub(): Subscribe ourself to a topic :param topic_id: topic_id to subscribe to """ - # Map topic_id to blocking queue - print ("**PUBSUB** in SUBSCRIBE") + # Map topic_id to blocking queue self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message @@ -228,12 +207,10 @@ class Pubsub(): # seqno=str(generate_message_id()).encode('utf-8') # )]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe = True, - topicid = topic_id.encode('utf-8') + subscribe=True, + topicid=topic_id.encode('utf-8') )]) - print (packet) - print ("**PUBSUB** PEEERS") - print (self.peers) + # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -255,13 +232,9 @@ class Pubsub(): # Create unsubscribe message packet = rpc_pb2.RPC() - # packet.publish.extend([rpc_pb2.Message( - # from_id=str(self.host.get_id()).encode('utf-8'), - # seqno=str(generate_message_id()).encode('utf-8') - # )]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe = False, - topicid = topic_id.encode('utf-8') + subscribe=False, + topicid=topic_id.encode('utf-8') )]) # Send out unsubscribe message to all peers @@ -275,8 +248,6 @@ class Pubsub(): Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast """ - print ("**PUBSU** IN MESSAGE ALL PEERS") - print (rpc_msg) # Broadcast message for peer in self.peers: diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 727b39e..ec5132e 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -39,11 +39,11 @@ class IPubsubRouter(ABC): """ @abstractmethod - def publish(self, sender_peer_id, message): + def publish(self, sender_peer_id, rpc_message): """ Invoked to forward a new message that has been validated :param sender_peer_id: peer_id of message sender - :param message: message to forward + :param rpc_message: message to forward """ @abstractmethod diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 1951fc6..05a02bf 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,10 +1,8 @@ import asyncio -import uuid import multiaddr +from utils import generate_message_id, generate_RPC_packet from libp2p import new_node -from libp2p.pubsub.pb import rpc_pb2_grpc -from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub @@ -37,7 +35,6 @@ class DummyAccountNode(): We use create as this serves as a factory function and allows us to use async await, unlike the init function """ - print ("**DUMMY** CREATE ACCOUNT NODE") self = DummyAccountNode() libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) @@ -54,14 +51,9 @@ class DummyAccountNode(): Handle all incoming messages on the CRYPTO_TOPIC from peers """ while True: - incoming = await self.q.get() - print ("**DUMMY** HANDLE INCOMING") - print (incoming) - print ("========================") - + incoming = await self.q.get() msg_comps = incoming.data.decode('utf-8').split(",") - print (msg_comps) - print ("--------") + if msg_comps[0] == "send": self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) elif msg_comps[0] == "set": @@ -107,7 +99,6 @@ class DummyAccountNode(): :param dest_user: user to send crypto to :param amount: amount of crypto to send """ - print ("**DUMMY** IN HANDLE SEND CRYPTO") if source_user in self.balances: self.balances[source_user] -= amount else: @@ -124,12 +115,7 @@ class DummyAccountNode(): :param dest_user: user to set crypto for :param amount: amount of crypto """ - print ("**DUMMY** IN HANDLE SET CRYPTO") - print (dest_user) - print (amount) self.balances[dest_user] = amount - print (self.balances) - print ("^^ balance") def get_balance(self, user): """ @@ -137,35 +123,8 @@ class DummyAccountNode(): :param user: user to get balance for :return: balance of user """ - print ("GET BALACNCE") - print (user) - print (self.balances) if user in self.balances: return self.balances[user] else: return -1 -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) - -def generate_RPC_packet(origin_id, topics, msg_content, msg_id): - packet = rpc_pb2.RPC() - message = rpc_pb2.Message( - from_id=origin_id.encode('utf-8'), - seqno=msg_id.encode('utf-8'), - data=msg_content.encode('utf-8') - ) - - for topic in topics: - message.topicIDs.extend([topic.encode('utf-8')]) - packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe=True, - topicid = topic.encode('utf-8') - )]) - - packet.publish.extend([message]) - return packet diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index c4c2681..4fc059b 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -1,5 +1,4 @@ import asyncio -import uuid import multiaddr import pytest @@ -9,6 +8,7 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub +from utils import generate_message_id, generate_RPC_packet # pylint: disable=too-many-locals @@ -46,16 +46,10 @@ async def test_simple_two_nodes_RPC(): msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id()) await floodsub_a.publish(node_a_id, msg.SerializeToString()) - print ("MESSAGE B") - print (msg.SerializeToString()) - print ("=========") await asyncio.sleep(0.25) res_b = await qb.get() - print ("RES B") - print (res_b) - print ("-----") # Check that the msg received by node_b is the same # as the message sent by node_a assert res_b.SerializeToString() == msg.publish[0].SerializeToString() @@ -65,57 +59,6 @@ async def test_simple_two_nodes_RPC(): # Success, terminate pending tasks. await cleanup() -# @pytest.mark.asyncio -# async def test_simple_three_nodes(): -# # Want to pass message from A -> B -> C -# node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) -# node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) -# node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - -# await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) -# await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) -# await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - -# supported_protocols = ["/floodsub/1.0.0"] - -# floodsub_a = FloodSub(supported_protocols) -# pubsub_a = Pubsub(node_a, floodsub_a, "a") -# floodsub_b = FloodSub(supported_protocols) -# pubsub_b = Pubsub(node_b, floodsub_b, "b") -# floodsub_c = FloodSub(supported_protocols) -# pubsub_c = Pubsub(node_c, floodsub_c, "c") - -# await connect(node_a, node_b) -# await connect(node_b, node_c) - -# await asyncio.sleep(0.25) -# qb = await pubsub_b.subscribe("my_topic") -# qc = await pubsub_c.subscribe("my_topic") -# await asyncio.sleep(0.25) - -# node_a_id = str(node_a.get_id()) - -# msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) - -# await floodsub_a.publish(node_a.get_id(), msg.to_str()) - -# await asyncio.sleep(0.25) -# res_b = await qb.get() -# res_c = await qc.get() - -# # Check that the msg received by node_b is the same -# # as the message sent by node_a -# assert res_b == msg.to_str() - -# # res_c should match original msg but with b as sender -# node_b_id = str(node_b.get_id()) -# msg.from_id = node_b_id - -# assert res_c == msg.to_str() - -# # Success, terminate pending tasks. -# await cleanup() - async def perform_test_from_obj(obj): """ Perform a floodsub test from a test obj. @@ -243,9 +186,7 @@ async def perform_test_from_obj(obj): # Create correctly formatted message msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id()) - - print ("**TEST FLOODSUB** MESSAGE TALK") - print (msg_talk) + # Publish message # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\ @@ -269,20 +210,7 @@ async def perform_test_from_obj(obj): for node_id in topic_map[topic]: # Get message from subscription queue msg_on_node_str = await queues_map[node_id][topic].get() - - print ("MESSAGE ON NODE STR") - print (msg_on_node_str) - - print ("ACTUAL MESSSSAGE") - print (actual_msg) - assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString() - # msg_on_node = create_message_talk(msg_on_node_str) - - # Perform checks - # assert actual_msg.origin_id == msg_on_node.origin_id - # assert actual_msg.topics == msg_on_node.topics - # assert actual_msg.data == msg_on_node.data # Success, terminate pending tasks. await cleanup() @@ -500,30 +428,3 @@ async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): ] } await perform_test_from_obj(test_obj) - -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) - -def generate_RPC_packet(origin_id, topics, msg_content, msg_id): - packet = rpc_pb2.RPC() - message = rpc_pb2.Message( - from_id=origin_id.encode('utf-8'), - seqno=msg_id.encode('utf-8'), - data=msg_content.encode('utf-8'), - ) - for topic in topics: - message.topicIDs.extend([topic.encode('utf-8')]) - - # for topic in topics: - # message.topicIDs.extend([topic.encode('utf-8')]) - # packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - # subscribe=True, - # topicid = topic.encode('utf-8') - # )]) - - packet.publish.extend([message]) - return packet diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py new file mode 100644 index 0000000..d4695d7 --- /dev/null +++ b/tests/pubsub/utils.py @@ -0,0 +1,30 @@ +import uuid +from libp2p.pubsub.pb import rpc_pb2 + +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) + +def generate_RPC_packet(origin_id, topics, msg_content, msg_id): + """ + Generate RPC packet to send over wire + :param origin_id: peer id of the message origin + :param topics: list of topics + :param msg_content: string of content in data + :param msg_id: seqno for the message + """ + packet = rpc_pb2.RPC() + message = rpc_pb2.Message( + from_id=origin_id.encode('utf-8'), + seqno=msg_id.encode('utf-8'), + data=msg_content.encode('utf-8'), + ) + + for topic in topics: + message.topicIDs.extend([topic.encode('utf-8')]) + + packet.publish.extend([message]) + return packet