From 47643a67c61609b47b71052ab82f0d1d23a5e483 Mon Sep 17 00:00:00 2001 From: NIC619 Date: Tue, 6 Aug 2019 12:32:18 +0800 Subject: [PATCH] Apply PR feedback --- libp2p/pubsub/pubsub.py | 30 ++++++++++++++++++------------ libp2p/pubsub/validators.py | 2 +- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 0dcee67..dd51be2 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,7 +1,7 @@ import asyncio -from collections import namedtuple +import logging import time -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Iterable, List, Tuple, Union +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, NamedTuple, Tuple, Union from lru import LRU @@ -17,17 +17,20 @@ if TYPE_CHECKING: from .pubsub_router_interface import IPubsubRouter +log = logging.getLogger(__name__) + + def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]: # NOTE: `string(from, seqno)` in Go return (msg.seqno, msg.from_id) -TopicValidator = namedtuple("TopicValidator", ["validator", "is_async"]) - - ValidatorFn = Union[Callable[[ID, rpc_pb2.Message], bool], Awaitable[bool]] +TopicValidator = NamedTuple("TopicValidator", (("validator", ValidatorFn), ("is_async", bool))) + + class Pubsub: host: IHost @@ -181,14 +184,14 @@ class Pubsub: if topic in self.topic_validators: del self.topic_validators[topic] - def get_msg_validators(self, msg: rpc_pb2.Message) -> Iterable[TopicValidator]: + def get_msg_validators(self, msg: rpc_pb2.Message) -> Tuple[TopicValidator, ...]: """ Get all validators corresponding to the topics in the message. :param msg: the message published to the topic """ - for topic in msg.topicIDs: - if topic in self.topic_validators: - yield self.topic_validators[topic] + return ( + self.topic_validators[topic] for topic in msg.topicIDs if topic in self.topic_validators + ) async def stream_handler(self, stream: INetStream) -> None: """ @@ -399,15 +402,18 @@ class Pubsub: # TODO: Check if signing is required and if so signature should be attached. + # If the message is processed before, return(i.e., don't further process the message). if self._is_msg_seen(msg): return # TODO: - Validate the message. If failed, reject it. # Validate the signature of the message # FIXME: `signature_validator` is currently a stub. - if not signature_validator(msg.key, msg.SerializeToString()): + if not signature_validator(msg.key, msg.SerializeToString(), msg.singature): + log.debug(f"Signature validation failed for msg={msg}") return - # Validate the message with registered topic validators + # Validate the message with registered topic validators. + # If the validation failed, return(i.e., don't further process the message). is_validation_passed = await self.validate_msg(msg_forwarder, msg) if not is_validation_passed: return @@ -436,4 +442,4 @@ class Pubsub: def _is_subscribed_to_msg(self, msg: rpc_pb2.Message) -> bool: if not self.my_topics: return False - return any([topic in self.my_topics for topic in msg.topicIDs]) + return any(topic in self.my_topics for topic in msg.topicIDs) diff --git a/libp2p/pubsub/validators.py b/libp2p/pubsub/validators.py index e575980..4d99ede 100644 --- a/libp2p/pubsub/validators.py +++ b/libp2p/pubsub/validators.py @@ -1,5 +1,5 @@ # FIXME: Replace the type of `pubkey` with a custom type `Pubkey` -def signature_validator(pubkey: bytes, msg: bytes) -> bool: +def signature_validator(pubkey: bytes, msg: bytes, sig: bytes) -> bool: """ Verify the message against the given public key. :param pubkey: the public key which signs the message.