Add helper functions

This commit is contained in:
mhchia 2019-09-02 21:01:13 +08:00
parent a883816881
commit 3717dc9adf
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
5 changed files with 60 additions and 26 deletions

View File

@ -16,6 +16,7 @@ from typing import (
from lru import LRU from lru import LRU
from libp2p.utils import encode_varint_prefixed
from libp2p.exceptions import ValidationError from libp2p.exceptions import ValidationError
from libp2p.host.host_interface import IHost from libp2p.host.host_interface import IHost
from libp2p.network.stream.net_stream_interface import INetStream from libp2p.network.stream.net_stream_interface import INetStream
@ -131,7 +132,7 @@ class Pubsub:
# Call handle peer to keep waiting for updates to peer queue # Call handle peer to keep waiting for updates to peer queue
asyncio.ensure_future(self.handle_peer_queue()) asyncio.ensure_future(self.handle_peer_queue())
def get_hello_packet(self) -> bytes: def get_hello_packet(self) -> rpc_pb2.RPC:
""" """
Generate subscription message with all topics we are subscribed to Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics only send hello packet if we have subscribed topics
@ -141,7 +142,7 @@ class Pubsub:
packet.subscriptions.extend( packet.subscriptions.extend(
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
) )
return packet.SerializeToString() return packet
async def continuously_read_stream(self, stream: INetStream) -> None: async def continuously_read_stream(self, stream: INetStream) -> None:
""" """
@ -227,9 +228,9 @@ class Pubsub:
self.router.add_peer(peer_id, stream.get_protocol()) self.router.add_peer(peer_id, stream.get_protocol())
# Send hello packet # Send hello packet
hello: bytes = self.get_hello_packet() hello = self.get_hello_packet()
await stream.write(hello.SerializeToString())
await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continuously_read_stream(stream)) asyncio.ensure_future(self.continuously_read_stream(stream))
# Force context switch # Force context switch
@ -257,8 +258,8 @@ class Pubsub:
self.router.add_peer(peer_id, stream.get_protocol()) self.router.add_peer(peer_id, stream.get_protocol())
# Send hello packet # Send hello packet
hello: bytes = self.get_hello_packet() hello = self.get_hello_packet()
await stream.write(hello) await stream.write(hello.SerializeToString())
# TODO: Investigate whether this should be replaced by `handlePeerEOF` # TODO: Investigate whether this should be replaced by `handlePeerEOF`
# Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/49274b0e8aecdf6cad59d768e5702ff00aa48488/comm.go#L80 # noqa: E501 # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/49274b0e8aecdf6cad59d768e5702ff00aa48488/comm.go#L80 # noqa: E501

View File

@ -1,7 +1,19 @@
import asyncio
import pytest import pytest
from .utils import connect
TOPIC = "TOPIC_0123"
@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.parametrize("num_hosts", (1,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_gossipsub(pubsubs_gsub, p2pds): async def test_pubsub_peers(pubsubs_gsub, p2pds):
pass # await connect(pubsubs_gsub[0].host, p2pds[0])
await connect(p2pds[0], pubsubs_gsub[0].host)
sub = await pubsubs_gsub[0].subscribe(TOPIC)
await asyncio.sleep(1)
peers = await p2pds[0].control.pubsub_list_peers(TOPIC)
print(f"!@# peers={peers}")

View File

@ -5,12 +5,35 @@ from multiaddr import Multiaddr
from libp2p.host.host_interface import IHost from libp2p.host.host_interface import IHost
from libp2p.peer.peerinfo import PeerInfo from libp2p.peer.peerinfo import PeerInfo
from libp2p.peer.id import ID
from .daemon import Daemon from .daemon import Daemon
TDaemonOrHost = Union[IHost, Daemon] TDaemonOrHost = Union[IHost, Daemon]
def _get_peer_info(node: TDaemonOrHost) -> PeerInfo:
peer_info: PeerInfo
if isinstance(node, Daemon):
peer_info = node.peer_info
else: # isinstance(node, IHost)
peer_id = node.get_id()
maddrs = [
node.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}"))
]
peer_info = PeerInfo(peer_id, maddrs)
return peer_info
async def _is_peer(peer_id: ID, node: TDaemonOrHost) -> bool:
if isinstance(node, Daemon):
pinfos = await node.control.list_peers()
peers = tuple([pinfo.peer_id for pinfo in pinfos])
return peer_id in peers
else: # isinstance(node, IHost)
return peer_id in node.get_network().connections
async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None: async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None:
# Type check # Type check
err_msg = ( err_msg = (
@ -21,20 +44,15 @@ async def connect(a: TDaemonOrHost, b: TDaemonOrHost) -> None:
[isinstance(node, IHost) or isinstance(node, Daemon) for node in (a, b)] [isinstance(node, IHost) or isinstance(node, Daemon) for node in (a, b)]
), err_msg ), err_msg
# TODO: Get peer info b_peer_info = _get_peer_info(b)
peer_info: PeerInfo
if isinstance(b, Daemon):
peer_info = b.peer_info
else: # isinstance(b, IHost)
peer_id = b.get_id()
maddrs = [
b.get_addrs()[0].decapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}"))
]
peer_info = PeerInfo(peer_id, maddrs)
# TODO: connect to peer info
if isinstance(a, Daemon): if isinstance(a, Daemon):
await a.control.connect(peer_info.peer_id, peer_info.addrs) await a.control.connect(b_peer_info.peer_id, b_peer_info.addrs)
else: # isinstance(b, IHost) else: # isinstance(b, IHost)
await a.connect(peer_info) await a.connect(b_peer_info)
# Allow additional sleep for both side to establish the connection. # Allow additional sleep for both side to establish the connection.
await asyncio.sleep(0.01) await asyncio.sleep(0.1)
a_peer_info = _get_peer_info(a)
assert await _is_peer(b_peer_info.peer_id, a)
assert await _is_peer(a_peer_info.peer_id, b)

View File

@ -1,7 +1,11 @@
from typing import NamedTuple from typing import NamedTuple
FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" from libp2p.pubsub import floodsub
GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0" from libp2p.pubsub import gossipsub
FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID
GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID
class GossipsubParams(NamedTuple): class GossipsubParams(NamedTuple):

View File

@ -70,8 +70,7 @@ async def test_peers_subscribe(pubsubs_fsub):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_hello_packet(pubsubs_fsub): async def test_get_hello_packet(pubsubs_fsub):
def _get_hello_packet_topic_ids(): def _get_hello_packet_topic_ids():
packet = rpc_pb2.RPC() packet = pubsubs_fsub[0].get_hello_packet()
packet.ParseFromString(pubsubs_fsub[0].get_hello_packet())
return tuple(sub.topicid for sub in packet.subscriptions) return tuple(sub.topicid for sub in packet.subscriptions)
# Test: No subscription, so there should not be any topic ids in the hello packet. # Test: No subscription, so there should not be any topic ids in the hello packet.