Reflect PR feedback

* Rename `src` to `msg_forwarder` in pubsub/floodsub/gossipsub
* Rename Variables
* Sort imports
* Clean up
This commit is contained in:
mhchia 2019-07-29 12:09:35 +08:00
parent 74d831d4e2
commit f02d38c0ee
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
12 changed files with 54 additions and 55 deletions

View File

@ -51,7 +51,7 @@ class FloodSub(IPubsubRouter):
:param rpc: rpc message :param rpc: rpc message
""" """
async def publish(self, src: ID, pubsub_msg: rpc_pb2.Message) -> None: async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
""" """
Invoked to forward a new message that has been validated. Invoked to forward a new message that has been validated.
This is where the "flooding" part of floodsub happens This is where the "flooding" part of floodsub happens
@ -62,13 +62,13 @@ class FloodSub(IPubsubRouter):
so that seen messages are not further forwarded. so that seen messages are not further forwarded.
It also never forwards a message back to the source It also never forwards a message back to the source
or the peer that forwarded the message. or the peer that forwarded the message.
:param src: peer ID of the peer who forwards the message to us :param msg_forwarder: peer ID of the peer who forwards the message to us
:param pubsub_msg: pubsub message in protobuf. :param pubsub_msg: pubsub message in protobuf.
""" """
peers_gen = self._get_peers_to_send( peers_gen = self._get_peers_to_send(
pubsub_msg.topicIDs, pubsub_msg.topicIDs,
src=src, msg_forwarder=msg_forwarder,
origin=ID(pubsub_msg.from_id), origin=ID(pubsub_msg.from_id),
) )
rpc_msg = rpc_pb2.RPC( rpc_msg = rpc_pb2.RPC(
@ -98,11 +98,11 @@ class FloodSub(IPubsubRouter):
def _get_peers_to_send( def _get_peers_to_send(
self, self,
topic_ids: Iterable[str], topic_ids: Iterable[str],
src: ID, msg_forwarder: ID,
origin: ID) -> Iterable[ID]: origin: ID) -> Iterable[ID]:
""" """
Get the eligible peers to send the data to. Get the eligible peers to send the data to.
:param src: peer ID of the peer who forwards the message to us. :param msg_forwarder: peer ID of the peer who forwards the message to us.
:param origin: peer id of the peer the message originate from. :param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to. :return: a generator of the peer ids who we send data to.
""" """
@ -111,7 +111,7 @@ class FloodSub(IPubsubRouter):
continue continue
for peer_id_str in self.pubsub.peer_topics[topic]: for peer_id_str in self.pubsub.peer_topics[topic]:
peer_id = id_b58_decode(peer_id_str) peer_id = id_b58_decode(peer_id_str)
if peer_id in (src, origin): if peer_id in (msg_forwarder, origin):
continue continue
# FIXME: Should change `self.pubsub.peers` to Dict[PeerID, ...] # FIXME: Should change `self.pubsub.peers` to Dict[PeerID, ...]
if str(peer_id) not in self.pubsub.peers: if str(peer_id) not in self.pubsub.peers:

View File

@ -119,7 +119,7 @@ class GossipSub(IPubsubRouter):
for prune in control_message.prune: for prune in control_message.prune:
await self.handle_prune(prune, sender_peer_id) await self.handle_prune(prune, sender_peer_id)
async def publish(self, src: ID, pubsub_msg: rpc_pb2.Message) -> None: async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
# pylint: disable=too-many-locals # pylint: disable=too-many-locals
""" """
Invoked to forward a new message that has been validated. Invoked to forward a new message that has been validated.
@ -128,7 +128,7 @@ class GossipSub(IPubsubRouter):
peers_gen = self._get_peers_to_send( peers_gen = self._get_peers_to_send(
pubsub_msg.topicIDs, pubsub_msg.topicIDs,
src=src, msg_forwarder=msg_forwarder,
origin=ID(pubsub_msg.from_id), origin=ID(pubsub_msg.from_id),
) )
rpc_msg = rpc_pb2.RPC( rpc_msg = rpc_pb2.RPC(
@ -144,11 +144,11 @@ class GossipSub(IPubsubRouter):
def _get_peers_to_send( def _get_peers_to_send(
self, self,
topic_ids: Iterable[str], topic_ids: Iterable[str],
src: ID, msg_forwarder: ID,
origin: ID) -> Iterable[ID]: origin: ID) -> Iterable[ID]:
""" """
Get the eligible peers to send the data to. Get the eligible peers to send the data to.
:param src: the peer id of the peer who forwards the message to me. :param msg_forwarder: the peer id of the peer who forwards the message to me.
:param origin: peer id of the peer the message originate from. :param origin: peer id of the peer the message originate from.
:return: a generator of the peer ids who we send data to. :return: a generator of the peer ids who we send data to.
""" """
@ -167,10 +167,10 @@ class GossipSub(IPubsubRouter):
# gossipsub peers # gossipsub peers
# FIXME: Change `str` to `ID` # FIXME: Change `str` to `ID`
gossipsub_peers: List[str] = None in_topic_gossipsub_peers: List[str] = None
# TODO: Do we need to check `topic in self.pubsub.my_topics`? # TODO: Do we need to check `topic in self.pubsub.my_topics`?
if topic in self.mesh: if topic in self.mesh:
gossipsub_peers = self.mesh[topic] in_topic_gossipsub_peers = self.mesh[topic]
else: else:
# TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not # TODO(robzajac): Is topic DEFINITELY supposed to be in fanout if we are not
# subscribed? # subscribed?
@ -185,11 +185,11 @@ class GossipSub(IPubsubRouter):
self.degree, self.degree,
[], [],
) )
gossipsub_peers = self.fanout[topic] in_topic_gossipsub_peers = self.fanout[topic]
for peer_id_str in gossipsub_peers: for peer_id_str in in_topic_gossipsub_peers:
send_to.add(id_b58_decode(peer_id_str)) send_to.add(id_b58_decode(peer_id_str))
# Excludes `src` and `origin` # Excludes `msg_forwarder` and `origin`
yield from send_to.difference([src, origin]) yield from send_to.difference([msg_forwarder, origin])
async def join(self, topic): async def join(self, topic):
# Note: the comments here are the near-exact algorithm description from the spec # Note: the comments here are the near-exact algorithm description from the spec

View File

@ -142,7 +142,7 @@ class Pubsub:
continue continue
# TODO(mhchia): This will block this read_stream loop until all data are pushed. # TODO(mhchia): This will block this read_stream loop until all data are pushed.
# Should investigate further if this is an issue. # Should investigate further if this is an issue.
await self.push_msg(src=peer_id, msg=msg) await self.push_msg(msg_forwarder=peer_id, msg=msg)
if rpc_incoming.subscriptions: if rpc_incoming.subscriptions:
# deal with RPC.subscriptions # deal with RPC.subscriptions
@ -331,10 +331,10 @@ class Pubsub:
await self.push_msg(self.host.get_id(), msg) await self.push_msg(self.host.get_id(), msg)
async def push_msg(self, src: ID, msg: rpc_pb2.Message) -> None: async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
""" """
Push a pubsub message to others. Push a pubsub message to others.
:param src: the peer who forward us the message. :param msg_forwarder: the peer who forward us the message.
:param msg: the message we are going to push out. :param msg: the message we are going to push out.
""" """
# TODO: - Check if the `source` is in the blacklist. If yes, reject. # TODO: - Check if the `source` is in the blacklist. If yes, reject.
@ -350,7 +350,7 @@ class Pubsub:
self._mark_msg_seen(msg) self._mark_msg_seen(msg)
await self.handle_talk(msg) await self.handle_talk(msg)
await self.router.publish(src, msg) await self.router.publish(msg_forwarder, msg)
def _next_seqno(self) -> bytes: def _next_seqno(self) -> bytes:
""" """

View File

@ -42,10 +42,10 @@ class IPubsubRouter(ABC):
""" """
@abstractmethod @abstractmethod
async def publish(self, src, pubsub_msg) -> None: async def publish(self, msg_forwarder, pubsub_msg):
""" """
Invoked to forward a new message that has been validated Invoked to forward a new message that has been validated
:param src: peer_id of message sender :param msg_forwarder: peer_id of message sender
:param pubsub_msg: pubsub message to forward :param pubsub_msg: pubsub message to forward
""" """

View File

@ -2,6 +2,6 @@ import multiaddr
FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0" FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0"
SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] GOSSIPSUB_PROTOCOL_ID = "/gossipsub/1.0.0"
LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0") LISTEN_MADDR = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")

View File

@ -5,13 +5,14 @@ import multiaddr
from libp2p import new_node from libp2p import new_node
from libp2p.host.host_interface import IHost from libp2p.host.host_interface import IHost
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pubsub import Pubsub
from .utils import message_id_generator, generate_RPC_packet from .configs import FLOODSUB_PROTOCOL_ID
from .utils import message_id_generator
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
CRYPTO_TOPIC = "ethereum" CRYPTO_TOPIC = "ethereum"
# Message format: # Message format:

View File

@ -2,8 +2,6 @@ import asyncio
import pytest import pytest
import multiaddr
from libp2p import new_node from libp2p import new_node
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
@ -13,8 +11,12 @@ from tests.utils import (
connect, connect,
) )
from .configs import (
FLOODSUB_PROTOCOL_ID,
LISTEN_MADDR,
)
FLOODSUB_PROTOCOL_ID = "/floodsub/1.0.0"
SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID] SUPPORTED_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
FLOODSUB_PROTOCOL_TEST_CASES = [ FLOODSUB_PROTOCOL_TEST_CASES = [
@ -349,7 +351,6 @@ async def perform_test_from_obj(obj, router_factory):
or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior or B as a neighbor of A once. Do NOT list both A: ["B"] and B:["A"] as the behavior
is undefined (even if it may work) is undefined (even if it may work)
""" """
listen_maddr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")
# Step 1) Create graph # Step 1) Create graph
adj_list = obj["adj_list"] adj_list = obj["adj_list"]
@ -357,8 +358,8 @@ async def perform_test_from_obj(obj, router_factory):
pubsub_map = {} pubsub_map = {}
async def add_node(node_id: str) -> None: async def add_node(node_id: str) -> None:
node = await new_node(transport_opt=[str(listen_maddr)]) node = await new_node(transport_opt=[str(LISTEN_MADDR)])
await node.get_network().listen(listen_maddr) await node.get_network().listen(LISTEN_MADDR)
node_map[node_id] = node node_map[node_id] = node
pubsub_router = router_factory(protocols=obj["supported_protocols"]) pubsub_router = router_factory(protocols=obj["supported_protocols"])
pubsub = Pubsub(node, pubsub_router, ID(node_id.encode())) pubsub = Pubsub(node, pubsub_router, ID(node_id.encode()))
@ -417,7 +418,7 @@ async def perform_test_from_obj(obj, router_factory):
node_id = msg["node_id"] node_id = msg["node_id"]
# Publish message # Publish message
# FIXME: Should be single RPC package with several topics # TODO: Should be single RPC package with several topics
for topic in topics: for topic in topics:
tasks_publish.append( tasks_publish.append(
pubsub_map[node_id].publish( pubsub_map[node_id].publish(
@ -426,23 +427,22 @@ async def perform_test_from_obj(obj, router_factory):
) )
) )
# For each topic in topics, add topic, msg_talk tuple to ordered test list # For each topic in topics, add (topic, node_id, data) tuple to ordered test list
# TODO: Update message sender to be correct message sender before
# adding msg_talk to this list
for topic in topics: for topic in topics:
topics_in_msgs_ordered.append((topic, data)) topics_in_msgs_ordered.append((topic, node_id, data))
# Allow time for publishing before continuing # Allow time for publishing before continuing
await asyncio.gather(*tasks_publish, asyncio.sleep(2)) await asyncio.gather(*tasks_publish, asyncio.sleep(2))
# Step 4) Check that all messages were received correctly. # Step 4) Check that all messages were received correctly.
# TODO: Check message sender too for topic, origin_node_id, data in topics_in_msgs_ordered:
for topic, data in topics_in_msgs_ordered:
# Look at each node in each topic # Look at each node in each topic
for node_id in topic_map[topic]: for node_id in topic_map[topic]:
# Get message from subscription queue # Get message from subscription queue
msg = await queues_map[node_id][topic].get() msg = await queues_map[node_id][topic].get()
assert data == msg.data assert data == msg.data
# Check the message origin
assert node_map[origin_node_id].get_id().to_bytes() == msg.from_id
# Success, terminate pending tasks. # Success, terminate pending tasks.
await cleanup() await cleanup()

View File

@ -1,19 +1,13 @@
import asyncio import asyncio
from threading import Thread from threading import Thread
import multiaddr
import pytest import pytest
from libp2p import new_node
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub
from tests.utils import ( from tests.utils import (
cleanup, cleanup,
connect, connect,
) )
from .dummy_account_node import DummyAccountNode from .dummy_account_node import DummyAccountNode
# pylint: disable=too-many-locals # pylint: disable=too-many-locals

View File

@ -8,6 +8,7 @@ from tests.utils import (
connect, connect,
) )
from .configs import GOSSIPSUB_PROTOCOL_ID
from .utils import ( from .utils import (
create_libp2p_hosts, create_libp2p_hosts,
create_pubsub_and_gossipsub_instances, create_pubsub_and_gossipsub_instances,
@ -16,7 +17,7 @@ from .utils import (
) )
SUPPORTED_PROTOCOLS = ["/gossipsub/1.0.0"] SUPPORTED_PROTOCOLS = [GOSSIPSUB_PROTOCOL_ID]
@pytest.mark.asyncio @pytest.mark.asyncio
@ -51,7 +52,7 @@ async def test_join():
# Central node publish to the topic so that this topic # Central node publish to the topic so that this topic
# is added to central node's fanout # is added to central node's fanout
# publish from the randomly chosen host # publish from the randomly chosen host
await pubsubs[central_node_index].publish(topic, b"") await pubsubs[central_node_index].publish(topic, b"data")
# Check that the gossipsub of central node has fanout for the topic # Check that the gossipsub of central node has fanout for the topic
assert topic in gossipsubs[central_node_index].fanout assert topic in gossipsubs[central_node_index].fanout

View File

@ -1,14 +1,17 @@
import pytest import pytest
from libp2p.pubsub.mcache import MessageCache from libp2p.pubsub.mcache import MessageCache
class Msg: class Msg:
def __init__(self, topicIDs, seqno, from_id): def __init__(self, topicIDs, seqno, from_id):
# pylint: disable=invalid-name
self.topicIDs = topicIDs self.topicIDs = topicIDs
self.seqno = seqno, self.seqno = seqno
self.from_id = from_id self.from_id = from_id
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_mcache(): async def test_mcache():
# Ported from: # Ported from:

View File

@ -5,7 +5,10 @@ from libp2p import new_node
from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.floodsub import FloodSub from libp2p.pubsub.floodsub import FloodSub
SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] from .configs import FLOODSUB_PROTOCOL_ID
SUPPORTED_PUBSUB_PROTOCOLS = [FLOODSUB_PROTOCOL_ID]
TESTING_TOPIC = "TEST_SUBSCRIBE" TESTING_TOPIC = "TEST_SUBSCRIBE"

View File

@ -1,19 +1,16 @@
import asyncio import asyncio
import random
import struct import struct
from typing import ( from typing import (
Sequence, Sequence,
) )
import uuid
import multiaddr import multiaddr
from libp2p import new_node from libp2p import new_node
from libp2p.pubsub.pb import rpc_pb2
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.pubsub.pubsub import Pubsub
from libp2p.pubsub.gossipsub import GossipSub from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.pb import rpc_pb2
from libp2p.pubsub.pubsub import Pubsub
from tests.utils import connect from tests.utils import connect