Add type hints to mcache.py
This commit is contained in:
parent
63014eeaae
commit
3549f2ff8b
|
@ -1,11 +1,25 @@
|
|||
from typing import (
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from .pb import rpc_pb2
|
||||
|
||||
|
||||
class MessageCache:
|
||||
|
||||
class CacheEntry:
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
||||
mid: Tuple[bytes, bytes]
|
||||
topics: List[str]
|
||||
|
||||
"""
|
||||
A logical representation of an entry in the mcache's _history_.
|
||||
"""
|
||||
def __init__(self, mid, topics):
|
||||
def __init__(self, mid: Tuple[bytes, bytes], topics: List[str]) -> None:
|
||||
"""
|
||||
Constructor.
|
||||
:param mid: (seqno, from_id) of the msg
|
||||
|
@ -14,7 +28,14 @@ class MessageCache:
|
|||
self.mid = mid
|
||||
self.topics = topics
|
||||
|
||||
def __init__(self, window_size, history_size):
|
||||
window_size: int
|
||||
history_size: int
|
||||
|
||||
msgs: Dict[Tuple[bytes, bytes], rpc_pb2.Message]
|
||||
|
||||
history = List[List[CacheEntry]]
|
||||
|
||||
def __init__(self, window_size: int, history_size: int) -> None:
|
||||
"""
|
||||
Constructor.
|
||||
:param window_size: Size of the window desired.
|
||||
|
@ -34,12 +55,12 @@ class MessageCache:
|
|||
for _ in range(history_size):
|
||||
self.history.append([])
|
||||
|
||||
def put(self, msg):
|
||||
def put(self, msg: rpc_pb2.Message) -> None:
|
||||
"""
|
||||
Put a message into the mcache.
|
||||
:param msg: The rpc message to put in. Should contain seqno and from_id
|
||||
"""
|
||||
mid = (msg.seqno, msg.from_id)
|
||||
mid: Tuple[bytes, bytes] = (msg.seqno, msg.from_id)
|
||||
self.msgs[mid] = msg
|
||||
|
||||
if not self.history[0]:
|
||||
|
@ -47,7 +68,7 @@ class MessageCache:
|
|||
|
||||
self.history[0].append(self.CacheEntry(mid, msg.topicIDs))
|
||||
|
||||
def get(self, mid):
|
||||
def get(self, mid: Tuple[bytes, bytes]) -> Optional[rpc_pb2.Message]:
|
||||
"""
|
||||
Get a message from the mcache.
|
||||
:param mid: (seqno, from_id) of the message to get.
|
||||
|
@ -58,13 +79,13 @@ class MessageCache:
|
|||
|
||||
return None
|
||||
|
||||
def window(self, topic):
|
||||
def window(self, topic: str) -> List[Tuple[bytes, bytes]]:
|
||||
"""
|
||||
Get the window for this topic.
|
||||
:param topic: Topic whose message ids we desire.
|
||||
:return: List of mids in the current window.
|
||||
"""
|
||||
mids = []
|
||||
mids: List[Tuple[bytes, bytes]] = []
|
||||
|
||||
for entries_list in self.history[: self.window_size]:
|
||||
for entry in entries_list:
|
||||
|
@ -74,16 +95,16 @@ class MessageCache:
|
|||
|
||||
return mids
|
||||
|
||||
def shift(self):
|
||||
def shift(self) -> None:
|
||||
"""
|
||||
Shift the window over by 1 position, dropping the last element of the history.
|
||||
"""
|
||||
last_entries = self.history[len(self.history) - 1]
|
||||
last_entries: List[CacheEntry] = self.history[len(self.history) - 1]
|
||||
|
||||
for entry in last_entries:
|
||||
del self.msgs[entry.mid]
|
||||
|
||||
i = len(self.history) - 2
|
||||
i: int = len(self.history) - 2
|
||||
|
||||
while i >= 0:
|
||||
self.history[i + 1] = self.history[i]
|
||||
|
|
Loading…
Reference in New Issue
Block a user