py-libp2p/libp2p/pubsub/mcache.py
Robert Zajac 9052e8f8bd
The Gossipsub PR (#162)
* Add handle_rpc call to pubsub

* Scaffold gossipsub functions

* Add timer

* Implement most of mesh construction

* Implement emit and handle

* Implement fanout heartbeat

* Refactor emit

* some gossipsub cleanup and test

* minor lint stuff, more to come

* Implement publish

* Fix comment

* Modify pubsub/gossipsub so that floodsub tests pass using gossipsub router

* Add floodsub tests to gossipsub

* Handle case where select_from_minus, num_to_select > size(pool-minus)

* Add topic membership

* Implement handle ihave

* Implement most of iwant

* Add mcache.add and comments

* Refactor handle_ihave

* Implement stream write in handle_iwant

* Implement gossip heartbeat

* unresolved vars

* initial mcache code

* documenting mcache

* writing test/debugging mcache

* finished mcache test and debugged

* Make gossipsub backward compatibility its own file

* remove mcache prints

* DEBUGGING

* Add sender_peer_id to handle_rpc to get gossip test passing

* Modify gossipsub to make fanout work

* fanout maintenance test

* debugging gsub GOSSIP

* DEBUGGING

* debugged sender seen cachce

* adding lru, removing prints

* pylint cleanup

* Fix github comments in PR

* minor floodsub possible bugfix
2019-05-06 23:44:13 -04:00

93 lines
2.6 KiB
Python

class MessageCache:
class CacheEntry:
# pylint: disable=too-few-public-methods
"""
A logical representation of an entry in the mcache's _history_.
"""
def __init__(self, mid, topics):
"""
Constructor.
:param mid: (seqno, from_id) of the msg
:param topics: list of topics this message was sent on
"""
self.mid = mid
self.topics = topics
def __init__(self, window_size, history_size):
"""
Constructor.
:param window_size: Size of the window desired.
:param history_size: Size of the history desired.
:return: the MessageCache
"""
self.window_size = window_size
self.history_size = history_size
# (seqno, from_id) -> rpc message
self.msgs = dict()
# max length of history_size. each item is a list of CacheEntry.
# messages lost upon shift().
self.history = []
for _ in range(history_size):
self.history.append([])
def put(self, msg):
"""
Put a message into the mcache.
:param msg: The rpc message to put in. Should contain seqno and from_id
"""
mid = (msg.seqno, msg.from_id)
self.msgs[mid] = msg
if not self.history[0]:
self.history[0] = []
self.history[0].append(self.CacheEntry(mid, msg.topicIDs))
def get(self, mid):
"""
Get a message from the mcache.
:param mid: (seqno, from_id) of the message to get.
:return: The rpc message associated with this mid
"""
if mid in self.msgs:
return self.msgs[mid]
return None
def window(self, topic):
"""
Get the window for this topic.
:param topic: Topic whose message ids we desire.
:return: List of mids in the current window.
"""
mids = []
for entries_list in self.history[: self.window_size]:
for entry in entries_list:
for entry_topic in entry.topics:
if entry_topic == topic:
mids.append(entry.mid)
return mids
def shift(self):
"""
Shift the window over by 1 position, dropping the last element of the history.
"""
last_entries = self.history[len(self.history) - 1]
for entry in last_entries:
del self.msgs[entry.mid]
i = len(self.history) - 2
while i >= 0:
self.history[i + 1] = self.history[i]
i -= 1
self.history[0] = []