diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 4fc059b..ba9d33c 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -8,7 +8,7 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from utils import generate_message_id, generate_RPC_packet +from utils import message_id_generator, generate_RPC_packet # pylint: disable=too-many-locals @@ -44,7 +44,8 @@ async def test_simple_two_nodes_RPC(): node_a_id = str(node_a.get_id()) - msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id()) + next_msg_id_func = message_id_generator(0) + msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", next_msg_id_func()) await floodsub_a.publish(node_a_id, msg.SerializeToString()) await asyncio.sleep(0.25) @@ -175,6 +176,8 @@ async def perform_test_from_obj(obj): topics_in_msgs_ordered = [] messages = obj["messages"] tasks_publish = [] + next_msg_id_func = message_id_generator(0) + for msg in messages: topics = msg["topics"] @@ -185,7 +188,7 @@ async def perform_test_from_obj(obj): actual_node_id = str(node_map[node_id].get_id()) # Create correctly formatted message - msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id()) + msg_talk = generate_RPC_packet(actual_node_id, topics, data, next_msg_id_func()) # Publish message # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) @@ -207,6 +210,8 @@ async def perform_test_from_obj(obj): # TODO: Check message sender too for i in range(len(topics_in_msgs_ordered)): topic, actual_msg = topics_in_msgs_ordered[i] + + # Look at each node in each topic for node_id in topic_map[topic]: # Get message from subscription queue msg_on_node_str = await queues_map[node_id][topic].get() diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py index d4695d7..8aa3f88 100644 --- a/tests/pubsub/utils.py +++ b/tests/pubsub/utils.py @@ -1,12 +1,26 @@ import uuid +import struct from libp2p.pubsub.pb import rpc_pb2 -def generate_message_id(): + +def message_id_generator(start_val): """ Generate a unique message id - :return: messgae id + :param start_val: value to start generating messages at + :return: message id """ - return str(uuid.uuid1()) + val = start_val + def generator(): + # Allow manipulation of val within closure + nonlocal val + + # Increment id + val += 1 + + # Convert val to big endian + return struct.pack('>I', val) + + return generator def generate_RPC_packet(origin_id, topics, msg_content, msg_id): """ @@ -19,7 +33,7 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): packet = rpc_pb2.RPC() message = rpc_pb2.Message( from_id=origin_id.encode('utf-8'), - seqno=msg_id.encode('utf-8'), + seqno=msg_id, data=msg_content.encode('utf-8'), )