diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 69b2f8c..06c1cbd 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -21,7 +21,7 @@ async def connect(node1, node2): await node1.connect(info) @pytest.mark.asyncio -async def test_simple_two_nodes_RPC(): +async def test_simple_two_nodes(): node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) @@ -58,6 +58,80 @@ async def test_simple_two_nodes_RPC(): # Success, terminate pending tasks. await cleanup() +@pytest.mark.asyncio +async def test_lru_cache_two_nodes(): + # two nodes with cache_size of 4 + # node_a send the following messages to node_b + # [1, 1, 2, 1, 3, 1, 4, 1, 5, 1] + # node_b should only receive the following + # [1, 2, 3, 4, 5, 1] + node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) + + await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + + supported_protocols = ["/floodsub/1.0.0"] + + # initialize PubSub with a cache_size of 4 + floodsub_a = FloodSub(supported_protocols) + pubsub_a = Pubsub(node_a, floodsub_a, "a", 4) + floodsub_b = FloodSub(supported_protocols) + pubsub_b = Pubsub(node_b, floodsub_b, "b", 4) + + await connect(node_a, node_b) + + await asyncio.sleep(0.25) + qb = await pubsub_b.subscribe("my_topic") + + await asyncio.sleep(0.25) + + node_a_id = str(node_a.get_id()) + + # initialize message_id_generator + # store first message + next_msg_id_func = message_id_generator(0) + first_message = generate_RPC_packet(node_a_id, ["my_topic"], "some data 1", next_msg_id_func()) + + await floodsub_a.publish(node_a_id, first_message.SerializeToString()) + await asyncio.sleep(0.25) + print (first_message) + + messages = [first_message] + # for the next 5 messages + for i in range(2, 6): + # write first message + await floodsub_a.publish(node_a_id, first_message.SerializeToString()) + await asyncio.sleep(0.25) + + # generate and write next message + msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data " + str(i), next_msg_id_func()) + messages.append(msg) + + await floodsub_a.publish(node_a_id, msg.SerializeToString()) + await asyncio.sleep(0.25) + + # write first message again + await floodsub_a.publish(node_a_id, first_message.SerializeToString()) + await asyncio.sleep(0.25) + + # check the first five messages in queue + # should only see 1 first_message + for i in range(5): + # Check that the msg received by node_b is the same + # as the message sent by node_a + res_b = await qb.get() + assert res_b.SerializeToString() == messages[i].publish[0].SerializeToString() + + # the 6th message should be first_message + res_b = await qb.get() + assert res_b.SerializeToString() == first_message.publish[0].SerializeToString() + assert qb.empty() + + # Success, terminate pending tasks. + await cleanup() + + async def perform_test_from_obj(obj): """ Perform a floodsub test from a test obj.