Add basic functionalities of publish

This commit is contained in:
mhchia 2019-07-23 16:10:14 +08:00
parent 8f9e5a28ff
commit 218bdb42c4
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A

View File

@ -1,6 +1,7 @@
# pylint: disable=no-name-in-module
import asyncio
import time
<<<<<<< HEAD
from typing import (
Any,
Dict,
@ -9,6 +10,8 @@ from typing import (
Tuple,
)
=======
>>>>>>> Add basic functionalities of `publish`
from typing import (
Any,
Dict,
@ -331,15 +334,37 @@ class Pubsub:
def list_peers(self, topic_id: str) -> Tuple[ID]:
return
def publish(self, topic_id: str, data: bytes) -> None:
# TODO: Create pb message
async def publish(self, topic_id: str, data: bytes) -> None:
msg = rpc_pb2.Message(
data=data,
topicIDs=[topic_id],
from_id=self.host.get_id().to_bytes(),
seqno=self._next_seqno(),
)
# TODO: Sign with our signing key
# TODO: `p.pushMsg(p.host.ID(), msg)`
self.push_msg(self.host.get_id(), msg)
async def push_msg(self, src: ID, msg: rpc_pb2.Message):
# TODO: - Check if the source is in the blacklist. If yes, reject.
# TODO: - Check if the `from` is in the blacklist. If yes, reject.
# TODO: - Check if the message is seen. If yes, reject it.
# TODO: - Check if signing is required and if so signature should be attached.
if self._is_msg_seen(msg):
return
# TODO: - Validate the message. If failed, reject it.
# TODO: - Mark as seen and `publishMessage`
# TODO: - Notify the subscribers
# TODO: - Router.Publish
return
self._mark_msg_seen(msg)
await self.handle_talk(msg)
await self.router.publish(src, msg)
def _next_seqno(self) -> bytes:
self.counter += 1
return self.counter.to_bytes(8, 'big')
def _is_msg_seen(self, msg: rpc_pb2.Message) -> bool:
msg_id = get_msg_id(msg)
return msg_id in self.seen_messages
def _mark_msg_seen(self, msg: rpc_pb2.Message) -> None:
msg_id = get_msg_id(msg)
# FIXME: Mapping `msg_id` to `1` is quite awkward. Should investigate if there is a
# more appropriate way.
self.seen_messages[msg_id] = 1