rewrote get_hello_packet

This commit is contained in:
zixuanzh 2019-03-28 15:25:33 -04:00
parent 81d121a029
commit 3cfeccaf17

View File

@ -1,5 +1,7 @@
import asyncio import asyncio
from .pb import rpc_pb2_grpc
from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
from .message import MessageSub from .message import MessageSub
from .message import create_message_talk, create_message_sub from .message import create_message_talk, create_message_sub
@ -7,37 +9,6 @@ from. message import generate_message_id
class Pubsub(): class Pubsub():
"""
For now, because I'm on a plane and don't have access to the go repo/protobuf stuff,
this is going to be the message format for the two types: subscription and talk
subscription indicates subscribing or unsubscribing from a topic
talk is sending a message on topic(s)
subscription format:
subscription
'from'
<one of 'sub', 'unsub'>:'topicid'
<one of 'sub', 'unsub'>:'topicid'
...
Ex.
subscription
msg_sender_peer_id
origin_peer_id
sub:topic1
sub:topic2
unsub:fav_topic
talk format:
talk
'from'
'origin'
[topic_ids comma-delimited]
'data'
Ex.
talk
msg_sender_peer_id
origin_peer_id
topic1,topics_are_cool,foo
I like tacos
"""
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
def __init__(self, host, router, my_id): def __init__(self, host, router, my_id):
@ -90,34 +61,37 @@ class Pubsub():
""" """
Generate subscription message with all topics we are subscribed to Generate subscription message with all topics we are subscribed to
""" """
subs_map = {} packet = rpc_pb2.RPC()
for topic in self.my_topics: message = rpc_pb2.Message(
subs_map[topic] = True from_id=str(self.host.get_id()).encode('utf-8'),
sub_msg = MessageSub( seqno=str(generate_message_id()).encode('utf-8')
str(self.host.get_id()),\
str(self.host.get_id()), subs_map, generate_message_id()\
) )
return sub_msg.to_str() packet.publish.extend([message])
for topic_id in self.my_topics:
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe=True, topicid=topic_id)])
return packet.SerializeToString()
async def continously_read_stream(self, stream): async def continously_read_stream(self, stream):
""" """
Read from input stream in an infinite loop. Process Read from input stream in an infinite loop. Process
messages from other nodes, which for now are considered MessageTalk messages from other nodes, which for now are considered MessageTalk
and MessageSub messages. and MessageSub messages.
TODO: Handle RPC messages instead of my Aspyn's own custom message format
:param stream: stream to continously read from :param stream: stream to continously read from
""" """
while True: while True:
incoming = (await stream.read()).decode() incoming = (await stream.read())
msg_comps = incoming.split('\n') rpc_incoming = rpc_pb2.RPC()
msg_type = msg_comps[0] rpc_incoming.ParseFromString(incoming)
print (rpc_incoming.publish)
msg_sender = msg_comps[1] # msg_type = msg_comps[0]
msg_sender = rpc_incoming.publish.from_id
# msg_origin = msg_comps[2] # msg_origin = msg_comps[2]
msg_id = msg_comps[3] msg_id = rpc_incoming.publish.seqno
print("HIT ME1")
if msg_id not in self.seen_messages: if msg_id not in self.seen_messages:
print("HIT ME")
# Do stuff with incoming unseen message # Do stuff with incoming unseen message
should_publish = True should_publish = True
if msg_type == "subscription": if msg_type == "subscription":
@ -161,7 +135,7 @@ class Pubsub():
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello = self.get_hello_packet()
await stream.write(hello.encode()) await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream)) asyncio.ensure_future(self.continously_read_stream(stream))
@ -187,7 +161,7 @@ class Pubsub():
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello = self.get_hello_packet()
await stream.write(hello.encode()) await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream)) asyncio.ensure_future(self.continously_read_stream(stream))