9052e8f8bd
* Add handle_rpc call to pubsub * Scaffold gossipsub functions * Add timer * Implement most of mesh construction * Implement emit and handle * Implement fanout heartbeat * Refactor emit * some gossipsub cleanup and test * minor lint stuff, more to come * Implement publish * Fix comment * Modify pubsub/gossipsub so that floodsub tests pass using gossipsub router * Add floodsub tests to gossipsub * Handle case where select_from_minus, num_to_select > size(pool-minus) * Add topic membership * Implement handle ihave * Implement most of iwant * Add mcache.add and comments * Refactor handle_ihave * Implement stream write in handle_iwant * Implement gossip heartbeat * unresolved vars * initial mcache code * documenting mcache * writing test/debugging mcache * finished mcache test and debugged * Make gossipsub backward compatibility its own file * remove mcache prints * DEBUGGING * Add sender_peer_id to handle_rpc to get gossip test passing * Modify gossipsub to make fanout work * fanout maintenance test * debugging gsub GOSSIP * DEBUGGING * debugged sender seen cachce * adding lru, removing prints * pylint cleanup * Fix github comments in PR * minor floodsub possible bugfix
130 lines
2.5 KiB
Python
130 lines
2.5 KiB
Python
import pytest
|
|
from libp2p.pubsub.mcache import MessageCache
|
|
|
|
|
|
class Msg:
|
|
|
|
def __init__(self, topicIDs, seqno, from_id):
|
|
self.topicIDs = topicIDs
|
|
self.seqno = seqno,
|
|
self.from_id = from_id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mcache():
|
|
# Ported from:
|
|
# https://github.com/libp2p/go-libp2p-pubsub
|
|
# /blob/51b7501433411b5096cac2b4994a36a68515fc03/mcache_test.go
|
|
mcache = MessageCache(3, 5)
|
|
msgs = []
|
|
|
|
for i in range(60):
|
|
msgs.append(Msg(["test"], i, "test"))
|
|
|
|
for i in range(10):
|
|
mcache.put(msgs[i])
|
|
|
|
for i in range(10):
|
|
msg = msgs[i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
get_msg = mcache.get(mid)
|
|
|
|
# successful read
|
|
assert get_msg == msg
|
|
|
|
gids = mcache.window('test')
|
|
|
|
assert len(gids) == 10
|
|
|
|
for i in range(10):
|
|
msg = msgs[i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
|
|
assert mid == gids[i]
|
|
|
|
mcache.shift()
|
|
|
|
for i in range(10, 20):
|
|
mcache.put(msgs[i])
|
|
|
|
for i in range(20):
|
|
msg = msgs[i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
get_msg = mcache.get(mid)
|
|
|
|
assert get_msg == msg
|
|
|
|
gids = mcache.window('test')
|
|
|
|
assert len(gids) == 20
|
|
|
|
for i in range(10):
|
|
msg = msgs[i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
|
|
assert mid == gids[10 + i]
|
|
|
|
for i in range(10, 20):
|
|
msg = msgs[i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
|
|
assert mid == gids[i - 10]
|
|
|
|
mcache.shift()
|
|
|
|
for i in range(20, 30):
|
|
mcache.put(msgs[i])
|
|
|
|
mcache.shift()
|
|
|
|
for i in range(30, 40):
|
|
mcache.put(msgs[i])
|
|
|
|
mcache.shift()
|
|
|
|
for i in range(40, 50):
|
|
mcache.put(msgs[i])
|
|
|
|
mcache.shift()
|
|
|
|
for i in range(50, 60):
|
|
mcache.put(msgs[i])
|
|
|
|
assert len(mcache.msgs) == 50
|
|
|
|
for i in range(10):
|
|
msg = msgs[i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
get_msg = mcache.get(mid)
|
|
|
|
# Should be evicted from cache
|
|
assert not get_msg
|
|
|
|
for i in range(10, 60):
|
|
msg = msgs[i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
get_msg = mcache.get(mid)
|
|
|
|
assert get_msg == msg
|
|
|
|
gids = mcache.window('test')
|
|
|
|
assert len(gids) == 30
|
|
|
|
for i in range(10):
|
|
msg = msgs[50 + i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
|
|
assert mid == gids[i]
|
|
|
|
for i in range(10, 20):
|
|
msg = msgs[30 + i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
|
|
assert mid == gids[i]
|
|
|
|
for i in range(20, 30):
|
|
msg = msgs[10 + i]
|
|
mid = (msg.seqno, msg.from_id)
|
|
|
|
assert mid == gids[i]
|