py-libp2p/libp2p/peer/peerinfo.py

71 lines
2.0 KiB
Python
Raw Normal View History

import json
2019-10-16 01:02:03 +08:00
from typing import Any, List, Sequence
2019-07-27 16:27:01 +08:00
import multiaddr
2019-01-10 02:38:56 +08:00
from .id import ID
class PeerInfo:
2019-07-27 16:27:01 +08:00
peer_id: ID
addrs: List[multiaddr.Multiaddr]
def __init__(self, peer_id: ID, addrs: Sequence[multiaddr.Multiaddr]) -> None:
self.peer_id = peer_id
self.addrs = list(addrs)
def to_string(self) -> str:
2019-10-16 01:02:03 +08:00
return json.dumps(
[self.peer_id.to_string(), list(map(lambda a: str(a), self.addrs))]
)
def __eq__(self, other: Any) -> bool:
2019-10-16 01:02:03 +08:00
return (
isinstance(other, PeerInfo)
and self.peer_id == other.peer_id
and self.addrs == other.addrs
)
@classmethod
def info_from_string(cls, info: str) -> "PeerInfo":
peer_id, raw_addrs = json.loads(info)
2019-10-16 01:02:03 +08:00
return PeerInfo(
ID.from_base58(peer_id),
list(map(lambda a: multiaddr.Multiaddr(a), raw_addrs)),
)
def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
if not addr:
raise InvalidAddrError("`addr` should not be `None`")
if not isinstance(addr, multiaddr.Multiaddr):
raise InvalidAddrError(f"`addr`={addr} should be of type `Multiaddr`")
parts = addr.split()
if not parts:
raise InvalidAddrError(
f"`parts`={parts} should at least have a protocol `P_P2P`"
)
2018-12-28 15:01:50 +08:00
p2p_part = parts[-1]
last_protocol_code = p2p_part.protocols()[0].code
if last_protocol_code != multiaddr.protocols.P_P2P:
raise InvalidAddrError(
f"The last protocol should be `P_P2P` instead of `{last_protocol_code}`"
)
# make sure the /p2p value parses as a peer.ID
2019-07-27 16:27:01 +08:00
peer_id_str: str = p2p_part.value_for_protocol(multiaddr.protocols.P_P2P)
peer_id: ID = ID.from_base58(peer_id_str)
# we might have received just an / p2p part, which means there's no addr.
if len(parts) > 1:
addr = multiaddr.Multiaddr.join(*parts[:-1])
return PeerInfo(peer_id, [addr])
class InvalidAddrError(ValueError):
pass