add simple lru test
This commit is contained in:
parent
d04798ce7c
commit
7a298adc33
|
@ -21,7 +21,7 @@ async def connect(node1, node2):
|
|||
await node1.connect(info)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simple_two_nodes_RPC():
|
||||
async def test_simple_two_nodes():
|
||||
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
|
||||
|
@ -58,6 +58,80 @@ async def test_simple_two_nodes_RPC():
|
|||
# Success, terminate pending tasks.
|
||||
await cleanup()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lru_cache_two_nodes():
|
||||
# two nodes with cache_size of 4
|
||||
# node_a send the following messages to node_b
|
||||
# [1, 1, 2, 1, 3, 1, 4, 1, 5, 1]
|
||||
# node_b should only receive the following
|
||||
# [1, 2, 3, 4, 5, 1]
|
||||
node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"])
|
||||
|
||||
await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||
await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0"))
|
||||
|
||||
supported_protocols = ["/floodsub/1.0.0"]
|
||||
|
||||
# initialize PubSub with a cache_size of 4
|
||||
floodsub_a = FloodSub(supported_protocols)
|
||||
pubsub_a = Pubsub(node_a, floodsub_a, "a", 4)
|
||||
floodsub_b = FloodSub(supported_protocols)
|
||||
pubsub_b = Pubsub(node_b, floodsub_b, "b", 4)
|
||||
|
||||
await connect(node_a, node_b)
|
||||
|
||||
await asyncio.sleep(0.25)
|
||||
qb = await pubsub_b.subscribe("my_topic")
|
||||
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
node_a_id = str(node_a.get_id())
|
||||
|
||||
# initialize message_id_generator
|
||||
# store first message
|
||||
next_msg_id_func = message_id_generator(0)
|
||||
first_message = generate_RPC_packet(node_a_id, ["my_topic"], "some data 1", next_msg_id_func())
|
||||
|
||||
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
print (first_message)
|
||||
|
||||
messages = [first_message]
|
||||
# for the next 5 messages
|
||||
for i in range(2, 6):
|
||||
# write first message
|
||||
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# generate and write next message
|
||||
msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data " + str(i), next_msg_id_func())
|
||||
messages.append(msg)
|
||||
|
||||
await floodsub_a.publish(node_a_id, msg.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# write first message again
|
||||
await floodsub_a.publish(node_a_id, first_message.SerializeToString())
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# check the first five messages in queue
|
||||
# should only see 1 first_message
|
||||
for i in range(5):
|
||||
# Check that the msg received by node_b is the same
|
||||
# as the message sent by node_a
|
||||
res_b = await qb.get()
|
||||
assert res_b.SerializeToString() == messages[i].publish[0].SerializeToString()
|
||||
|
||||
# the 6th message should be first_message
|
||||
res_b = await qb.get()
|
||||
assert res_b.SerializeToString() == first_message.publish[0].SerializeToString()
|
||||
assert qb.empty()
|
||||
|
||||
# Success, terminate pending tasks.
|
||||
await cleanup()
|
||||
|
||||
|
||||
async def perform_test_from_obj(obj):
|
||||
"""
|
||||
Perform a floodsub test from a test obj.
|
||||
|
|
Loading…
Reference in New Issue
Block a user