diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index a4f2e86..14394c7 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -74,13 +74,12 @@ class FloodSub(IPubsubRouter): # message sender and are not the message origin if peer_id_in_topic not in (msg_sender, decoded_from_id): stream = self.pubsub.peers[peer_id_in_topic] - # create new packet with just publish message + # Create new packet with just publish message new_packet = rpc_pb2.RPC() new_packet.publish.extend([message]) + + # Publish the packet await stream.write(new_packet.SerializeToString()) - else: - # Implies publish did not write - print("publish did not write") def join(self, topic): """ diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index bee5fba..40c4036 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -86,9 +86,10 @@ class Pubsub(): if rpc_incoming.publish: # deal with RPC.publish for message in rpc_incoming.publish: - if message.seqno not in self.seen_messages: + id_in_seen_msgs = (message.seqno, message.from_id) + if id_in_seen_msgs not in self.seen_messages: should_publish = True - self.seen_messages.append(message.seqno) + self.seen_messages.append(id_in_seen_msgs) await self.handle_talk(message) if rpc_incoming.subscriptions: diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 05a02bf..a640997 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,7 +1,8 @@ import asyncio import multiaddr +import uuid -from utils import generate_message_id, generate_RPC_packet +from utils import message_id_generator, generate_RPC_packet from libp2p import new_node from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub @@ -25,6 +26,8 @@ class DummyAccountNode(): def __init__(self): self.balances = {} + self.next_msg_id_func = message_id_generator(0) + self.node_id = str(uuid.uuid1()) @classmethod async def create(cls): @@ -51,7 +54,7 @@ class DummyAccountNode(): Handle all incoming messages on the CRYPTO_TOPIC from peers """ while True: - incoming = await self.q.get() + incoming = await self.q.get() msg_comps = incoming.data.decode('utf-8').split(",") if msg_comps[0] == "send": @@ -77,7 +80,7 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) - packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) await self.floodsub.publish(my_id, packet.SerializeToString()) async def publish_set_crypto(self, user, amount): @@ -88,7 +91,7 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "set," + user + "," + str(amount) - packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, self.next_msg_id_func()) await self.floodsub.publish(my_id, packet.SerializeToString()) diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index 1f08c8d..9fa2aa7 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -185,3 +185,28 @@ async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): assert dummy_node.get_balance("rob") == 12 await perform_test(num_nodes, adj_map, action_func, assertion_func) + +@pytest.mark.asyncio +async def test_set_then_send_from_five_diff_nodes_five_nodes_ring_topography(): + num_nodes = 5 + adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} + + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("alex", 20) + await asyncio.sleep(1) + await dummy_nodes[1].publish_send_crypto("alex", "rob", 3) + await asyncio.sleep(1) + await dummy_nodes[2].publish_send_crypto("rob", "aspyn", 2) + await asyncio.sleep(1) + await dummy_nodes[3].publish_send_crypto("aspyn", "zx", 1) + await asyncio.sleep(1) + await dummy_nodes[4].publish_send_crypto("zx", "raul", 1) + + def assertion_func(dummy_node): + assert dummy_node.get_balance("alex") == 17 + assert dummy_node.get_balance("rob") == 1 + assert dummy_node.get_balance("aspyn") == 1 + assert dummy_node.get_balance("zx") == 0 + assert dummy_node.get_balance("raul") == 1 + + await perform_test(num_nodes, adj_map, action_func, assertion_func) diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 272af79..69b2f8c 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -8,7 +8,7 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from utils import generate_message_id, generate_RPC_packet +from utils import message_id_generator, generate_RPC_packet # pylint: disable=too-many-locals @@ -44,7 +44,8 @@ async def test_simple_two_nodes_RPC(): node_a_id = str(node_a.get_id()) - msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id()) + next_msg_id_func = message_id_generator(0) + msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", next_msg_id_func()) await floodsub_a.publish(node_a_id, msg.SerializeToString()) await asyncio.sleep(0.25) @@ -173,6 +174,8 @@ async def perform_test_from_obj(obj): topics_in_msgs_ordered = [] messages = obj["messages"] tasks_publish = [] + next_msg_id_func = message_id_generator(0) + for msg in messages: topics = msg["topics"] @@ -183,7 +186,7 @@ async def perform_test_from_obj(obj): actual_node_id = str(node_map[node_id].get_id()) # Create correctly formatted message - msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id()) + msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func()) # Publish message # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) @@ -205,6 +208,8 @@ async def perform_test_from_obj(obj): # TODO: Check message sender too for i in range(len(topics_in_msgs_ordered)): topic, actual_msg = topics_in_msgs_ordered[i] + + # Look at each node in each topic for node_id in topic_map[topic]: # Get message from subscription queue msg_on_node_str = await queues_map[node_id][topic].get() @@ -426,3 +431,112 @@ async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): ] } await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_four_nodes_clique_two_topic_diff_origin_many_msgs_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2", "3", "4"], + "2": ["1", "3", "4"], + "3": ["1", "2", "4"], + "4": ["1", "2", "3"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4"], + "school": ["1", "2", "3", "4"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar2", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar3", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic3", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) + +@pytest.mark.asyncio +async def test_five_nodes_ring_two_topic_diff_origin_many_msgs_test_obj(): + test_obj = { + "supported_protocols": ["/floodsub/1.0.0"], + "adj_list": { + "1": ["2"], + "2": ["3"], + "3": ["4"], + "4": ["5"], + "5": ["1"] + }, + "topic_map": { + "astrophysics": ["1", "2", "3", "4", "5"], + "school": ["1", "2", "3", "4", "5"] + }, + "messages": [ + { + "topics": ["astrophysics"], + "data": "e=mc^2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar2", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic2", + "node_id": "1" + }, + { + "topics": ["school"], + "data": "foobar3", + "node_id": "2" + }, + { + "topics": ["astrophysics"], + "data": "I am allergic3", + "node_id": "1" + } + ] + } + await perform_test_from_obj(test_obj) \ No newline at end of file diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index d4695d7..056baac 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,12 +1,26 @@ import uuid +import struct from libp2p.pubsub.pb import rpc_pb2 -def generate_message_id(): + +def message_id_generator(start_val): """ Generate a unique message id - :return: messgae id + :param start_val: value to start generating messages at + :return: message id """ - return str(uuid.uuid1()) + val = start_val + def generator(): + # Allow manipulation of val within closure + nonlocal val + + # Increment id + val += 1 + + # Convert val to big endian + return struct.pack('>Q', val) + + return generator def generate_RPC_packet(origin_id, topics, msg_content, msg_id): """ @@ -19,7 +33,7 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): packet = rpc_pb2.RPC() message = rpc_pb2.Message( from_id=origin_id.encode('utf-8'), - seqno=msg_id.encode('utf-8'), + seqno=msg_id, data=msg_content.encode('utf-8'), )