diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 300ad71..e522a4b 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -217,6 +217,10 @@ class Swarm(INetwork): "fail to upgrade the connection to a secured connection" ) from error peer_id = secured_conn.get_remote_peer() + peer_ip, peer_port = writer.get_extra_info("peername") + peer_maddr = Multiaddr(f"/ip4/{peer_ip}/tcp/{peer_port}") + # TODO: Fix the ttl + self.peerstore.add_addr(peer_id, peer_maddr, 12345678) try: muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 52c4b55..a02f5c3 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -16,12 +16,12 @@ from typing import ( from lru import LRU -from libp2p.utils import encode_varint_prefixed from libp2p.exceptions import ValidationError from libp2p.host.host_interface import IHost from libp2p.network.stream.net_stream_interface import INetStream from libp2p.peer.id import ID from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee @@ -153,11 +153,14 @@ class Pubsub: peer_id = stream.mplex_conn.peer_id while True: - incoming: bytes = (await stream.read()) + print("!@# continuously_read_stream: waiting") + incoming: bytes = await read_varint_prefixed_bytes(stream) + print(f"!@# continuously_read_stream: incoming={incoming}") rpc_incoming: rpc_pb2.RPC = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) if rpc_incoming.publish: + print("!@# continuously_read_stream: publish") # deal with RPC.publish for msg in rpc_incoming.publish: if not self._is_subscribed_to_msg(msg): @@ -167,6 +170,7 @@ class Pubsub: asyncio.ensure_future(self.push_msg(msg_forwarder=peer_id, msg=msg)) if rpc_incoming.subscriptions: + print("!@# continuously_read_stream: subscriptions") # deal with RPC.subscriptions # We don't need to relay the subscription to our # peers because a given node only needs its peers @@ -179,6 +183,7 @@ class Pubsub: # This is necessary because `control` is an optional field in pb2. # Ref: https://developers.google.com/protocol-buffers/docs/reference/python-generated#singular-fields-proto2 # noqa: E501 if rpc_incoming.HasField("control"): + print("!@# continuously_read_stream: control") # Pass rpc to router so router could perform custom logic await self.router.handle_rpc(rpc_incoming, peer_id) @@ -221,20 +226,23 @@ class Pubsub: on one of the supported pubsub protocols. :param stream: newly created stream """ - # Add peer + await self.continuously_read_stream(stream) + + async def _handle_new_peer(self, peer_id: ID) -> None: + # Open a stream to peer on existing connection + # (we know connection exists since that's the only way + # an element gets added to peer_queue) + stream: INetStream = await self.host.new_stream(peer_id, self.protocols) + # Map peer to stream - peer_id: ID = stream.mplex_conn.peer_id self.peers[peer_id] = stream - self.router.add_peer(peer_id, stream.get_protocol()) # Send hello packet hello = self.get_hello_packet() - await stream.write(hello.SerializeToString()) - - # Pass stream off to stream reader - asyncio.ensure_future(self.continuously_read_stream(stream)) - # Force context switch - await asyncio.sleep(0) + await stream.write(encode_varint_prefixed(hello.SerializeToString())) + # TODO: Check EOF in the future in the stream's lifetime. + # TODO: Check if the peer in black list. + self.router.add_peer(peer_id, stream.get_protocol()) async def handle_peer_queue(self) -> None: """ @@ -247,25 +255,9 @@ class Pubsub: peer_id: ID = await self.peer_queue.get() - # Open a stream to peer on existing connection - # (we know connection exists since that's the only way - # an element gets added to peer_queue) - stream: INetStream = await self.host.new_stream(peer_id, self.protocols) - # Add Peer - # Map peer to stream - self.peers[peer_id] = stream - self.router.add_peer(peer_id, stream.get_protocol()) - - # Send hello packet - hello = self.get_hello_packet() - await stream.write(hello.SerializeToString()) - - # TODO: Investigate whether this should be replaced by `handlePeerEOF` - # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/49274b0e8aecdf6cad59d768e5702ff00aa48488/comm.go#L80 # noqa: E501 - # Pass stream off to stream reader - asyncio.ensure_future(self.continuously_read_stream(stream)) + asyncio.ensure_future(self._handle_new_peer(peer_id)) # Force context switch await asyncio.sleep(0) @@ -366,7 +358,7 @@ class Pubsub: # Broadcast message for stream in self.peers.values(): # Write message to stream - await stream.write(raw_msg) + await stream.write(encode_varint_prefixed(raw_msg)) async def publish(self, topic_id: str, data: bytes) -> None: """ diff --git a/libp2p/pubsub/pubsub_notifee.py b/libp2p/pubsub/pubsub_notifee.py index 8878a27..6ecab1a 100644 --- a/libp2p/pubsub/pubsub_notifee.py +++ b/libp2p/pubsub/pubsub_notifee.py @@ -36,11 +36,7 @@ class PubsubNotifee(INotifee): :param network: network the connection was opened on :param conn: connection that was opened """ - - # Only add peer_id if we are initiator (otherwise we would end up - # with two pubsub streams between us and the peer) - if conn.initiator: - await self.initiator_peers_queue.put(conn.peer_id) + await self.initiator_peers_queue.put(conn.peer_id) async def disconnected(self, network: INetwork, conn: IMuxedConn) -> None: pass diff --git a/tests/interop/test_pubsub.py b/tests/interop/test_pubsub.py index bbb9b8d..5c7f70d 100644 --- a/tests/interop/test_pubsub.py +++ b/tests/interop/test_pubsub.py @@ -2,18 +2,31 @@ import asyncio import pytest +from libp2p.peer.id import ID + from .utils import connect - TOPIC = "TOPIC_0123" +DATA = b"DATA_0123" @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.asyncio -async def test_pubsub_peers(pubsubs_gsub, p2pds): +async def test_pubsub_subscribe(pubsubs_gsub, p2pds): # await connect(pubsubs_gsub[0].host, p2pds[0]) await connect(p2pds[0], pubsubs_gsub[0].host) + peers = await p2pds[0].control.pubsub_list_peers("") + assert pubsubs_gsub[0].host.get_id() in peers + # FIXME: + assert p2pds[0].peer_id in pubsubs_gsub[0].peers + sub = await pubsubs_gsub[0].subscribe(TOPIC) - await asyncio.sleep(1) - peers = await p2pds[0].control.pubsub_list_peers(TOPIC) - print(f"!@# peers={peers}") + peers_topic = await p2pds[0].control.pubsub_list_peers(TOPIC) + await asyncio.sleep(0.1) + assert pubsubs_gsub[0].host.get_id() in peers_topic + + await p2pds[0].control.pubsub_publish(TOPIC, DATA) + msg = await sub.get() + assert ID(msg.from_id) == p2pds[0].peer_id + assert msg.data == DATA + assert len(msg.topicIDs) == 1 and msg.topicIDs[0] == TOPIC diff --git a/tests/interop/utils.py b/tests/interop/utils.py index 43b93a6..d050606 100644 --- a/tests/interop/utils.py +++ b/tests/interop/utils.py @@ -4,8 +4,8 @@ from typing import Union from multiaddr import Multiaddr from libp2p.host.host_interface import IHost -from libp2p.peer.peerinfo import PeerInfo from libp2p.peer.id import ID +from libp2p.peer.peerinfo import PeerInfo from .daemon import Daemon diff --git a/tests/pubsub/configs.py b/tests/pubsub/configs.py index e073787..b205325 100644 --- a/tests/pubsub/configs.py +++ b/tests/pubsub/configs.py @@ -1,8 +1,6 @@ from typing import NamedTuple -from libp2p.pubsub import floodsub -from libp2p.pubsub import gossipsub - +from libp2p.pubsub import floodsub, gossipsub FLOODSUB_PROTOCOL_ID = floodsub.PROTOCOL_ID GOSSIPSUB_PROTOCOL_ID = gossipsub.PROTOCOL_ID