From 218bdb42c412ed459f291f835d1aa8f7c62c106f Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 23 Jul 2019 16:10:14 +0800 Subject: [PATCH] Add basic functionalities of `publish` --- libp2p/pubsub/pubsub.py | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index fa04d2b..5f8390f 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,6 +1,7 @@ # pylint: disable=no-name-in-module import asyncio import time +<<<<<<< HEAD from typing import ( Any, Dict, @@ -9,6 +10,8 @@ from typing import ( Tuple, ) +======= +>>>>>>> Add basic functionalities of `publish` from typing import ( Any, Dict, @@ -331,15 +334,37 @@ class Pubsub: def list_peers(self, topic_id: str) -> Tuple[ID]: return - def publish(self, topic_id: str, data: bytes) -> None: - # TODO: Create pb message + async def publish(self, topic_id: str, data: bytes) -> None: + msg = rpc_pb2.Message( + data=data, + topicIDs=[topic_id], + from_id=self.host.get_id().to_bytes(), + seqno=self._next_seqno(), + ) # TODO: Sign with our signing key - # TODO: `p.pushMsg(p.host.ID(), msg)` + self.push_msg(self.host.get_id(), msg) + + async def push_msg(self, src: ID, msg: rpc_pb2.Message): # TODO: - Check if the source is in the blacklist. If yes, reject. # TODO: - Check if the `from` is in the blacklist. If yes, reject. - # TODO: - Check if the message is seen. If yes, reject it. + # TODO: - Check if signing is required and if so signature should be attached. + if self._is_msg_seen(msg): + return # TODO: - Validate the message. If failed, reject it. - # TODO: - Mark as seen and `publishMessage` - # TODO: - Notify the subscribers - # TODO: - Router.Publish - return + self._mark_msg_seen(msg) + await self.handle_talk(msg) + await self.router.publish(src, msg) + + def _next_seqno(self) -> bytes: + self.counter += 1 + return self.counter.to_bytes(8, 'big') + + def _is_msg_seen(self, msg: rpc_pb2.Message) -> bool: + msg_id = get_msg_id(msg) + return msg_id in self.seen_messages + + def _mark_msg_seen(self, msg: rpc_pb2.Message) -> None: + msg_id = get_msg_id(msg) + # FIXME: Mapping `msg_id` to `1` is quite awkward. Should investigate if there is a + # more appropriate way. + self.seen_messages[msg_id] = 1