py-libp2p/libp2p/peer/peerdata.py

48 lines
1.2 KiB
Python
Raw Normal View History

2019-07-31 15:00:12 -07:00
from typing import Any, Dict, List, Sequence
2019-07-27 16:27:01 +08:00
from multiaddr import Multiaddr
2018-10-20 17:11:44 -04:00
from .peerdata_interface import IPeerData
2019-01-09 21:38:56 +03:00
2018-10-20 17:11:44 -04:00
class PeerData(IPeerData):
2019-07-27 16:27:01 +08:00
metadata: Dict[Any, Any]
protocols: List[str]
addrs: List[Multiaddr]
def __init__(self) -> None:
2018-10-21 15:15:53 -04:00
self.metadata = {}
2018-10-20 17:11:44 -04:00
self.protocols = []
self.addrs = []
2019-07-27 16:27:01 +08:00
def get_protocols(self) -> List[str]:
2018-10-21 15:15:53 -04:00
return self.protocols
2018-10-20 17:11:44 -04:00
2019-07-27 16:27:01 +08:00
def add_protocols(self, protocols: Sequence[str]) -> None:
self.protocols.extend(list(protocols))
2018-10-20 17:11:44 -04:00
2019-07-27 16:27:01 +08:00
def set_protocols(self, protocols: Sequence[str]) -> None:
self.protocols = list(protocols)
2019-07-27 16:27:01 +08:00
def add_addrs(self, addrs: Sequence[Multiaddr]) -> None:
2019-07-30 23:41:28 +08:00
self.addrs.extend(addrs)
2018-10-20 17:11:44 -04:00
2019-07-27 16:27:01 +08:00
def get_addrs(self) -> List[Multiaddr]:
2018-10-21 15:15:53 -04:00
return self.addrs
2018-10-20 17:11:44 -04:00
2019-07-27 16:27:01 +08:00
def clear_addrs(self) -> None:
2018-10-21 15:15:53 -04:00
self.addrs = []
2018-10-20 17:11:44 -04:00
2019-07-30 15:31:02 +08:00
def put_metadata(self, key: str, val: Any) -> None:
2018-10-21 15:15:53 -04:00
self.metadata[key] = val
2018-10-20 17:11:44 -04:00
2019-07-30 15:31:02 +08:00
def get_metadata(self, key: str) -> Any:
2018-10-21 15:15:53 -04:00
if key in self.metadata:
2018-10-30 00:03:19 +01:00
return self.metadata[key]
raise PeerDataError("key not found")
2018-10-30 00:03:19 +01:00
2019-01-09 21:38:56 +03:00
2018-10-30 00:03:19 +01:00
class PeerDataError(KeyError):
"""Raised when a key is not found in peer metadata"""