diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 2d4ed8d..bb1126e 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -176,8 +176,13 @@ async def perform_test_from_obj(obj): topics_in_msgs_ordered = [] messages = obj["messages"] tasks_publish = [] +<<<<<<< HEAD next_msg_id_func = message_id_generator(0) +======= + + all_actual_msgs = {} +>>>>>>> Modify perform_test_obj to handle messages received in any order for msg in messages: topics = msg["topics"] @@ -199,7 +204,10 @@ async def perform_test_from_obj(obj): # TODO: Update message sender to be correct message sender before # adding msg_talk to this list for topic in topics: - topics_in_msgs_ordered.append((topic, msg_talk)) + if topic in all_actual_msgs: + all_actual_msgs[topic].append(msg_talk.publish[0].SerializeToString()) + else: + all_actual_msgs[topic] = [msg_talk.publish[0].SerializeToString()] # Allow time for publishing before continuing # await asyncio.sleep(0.4) @@ -207,15 +215,24 @@ async def perform_test_from_obj(obj): await asyncio.gather(*tasks_publish) # Step 4) Check that all messages were received correctly. - # TODO: Check message sender too - for i in range(len(topics_in_msgs_ordered)): - topic, actual_msg = topics_in_msgs_ordered[i] - - # Look at each node in each topic + for topic in all_actual_msgs: for node_id in topic_map[topic]: - # Get message from subscription queue - msg_on_node_str = await queues_map[node_id][topic].get() - assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString() + all_received_msgs_in_topic = [] + + # Add all messages to message received list for given node in given topic + while (queues_map[node_id][topic].qsize() > 0): + # Get message from subscription queue + msg_on_node = (await queues_map[node_id][topic].get()).SerializeToString() + all_received_msgs_in_topic.append(msg_on_node) + + # Ensure each message received was the same as one sent + print(all_received_msgs_in_topic) + print(all_actual_msgs) + for msg_on_node in all_received_msgs_in_topic: + assert msg_on_node in all_actual_msgs[topic] + + # Ensure same number of messages received as sent + assert len(all_received_msgs_in_topic) == len(all_actual_msgs[topic]) # Success, terminate pending tasks. await cleanup()