rewrote get_hello_packet

This commit is contained in:
zixuanzh 2019-03-28 15:25:33 -04:00 committed by Stuckinaboot
parent 2b41818561
commit b297aa9576

View File

@ -1,5 +1,7 @@
import asyncio
from .pb import rpc_pb2_grpc
from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee
from .message import MessageSub
from .message import create_message_talk, create_message_sub
@ -7,37 +9,6 @@ from. message import generate_message_id
class Pubsub():
"""
For now, because I'm on a plane and don't have access to the go repo/protobuf stuff,
this is going to be the message format for the two types: subscription and talk
subscription indicates subscribing or unsubscribing from a topic
talk is sending a message on topic(s)
subscription format:
subscription
'from'
<one of 'sub', 'unsub'>:'topicid'
<one of 'sub', 'unsub'>:'topicid'
...
Ex.
subscription
msg_sender_peer_id
origin_peer_id
sub:topic1
sub:topic2
unsub:fav_topic
talk format:
talk
'from'
'origin'
[topic_ids comma-delimited]
'data'
Ex.
talk
msg_sender_peer_id
origin_peer_id
topic1,topics_are_cool,foo
I like tacos
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, host, router, my_id):
@ -90,34 +61,37 @@ class Pubsub():
"""
Generate subscription message with all topics we are subscribed to
"""
subs_map = {}
for topic in self.my_topics:
subs_map[topic] = True
sub_msg = MessageSub(
str(self.host.get_id()),\
str(self.host.get_id()), subs_map, generate_message_id()\
packet = rpc_pb2.RPC()
message = rpc_pb2.Message(
from_id=str(self.host.get_id()).encode('utf-8'),
seqno=str(generate_message_id()).encode('utf-8')
)
return sub_msg.to_str()
packet.publish.extend([message])
for topic_id in self.my_topics:
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe=True, topicid=topic_id)])
return packet.SerializeToString()
async def continously_read_stream(self, stream):
"""
Read from input stream in an infinite loop. Process
messages from other nodes, which for now are considered MessageTalk
and MessageSub messages.
TODO: Handle RPC messages instead of my Aspyn's own custom message format
:param stream: stream to continously read from
"""
while True:
incoming = (await stream.read()).decode()
msg_comps = incoming.split('\n')
msg_type = msg_comps[0]
incoming = (await stream.read())
rpc_incoming = rpc_pb2.RPC()
rpc_incoming.ParseFromString(incoming)
print (rpc_incoming.publish)
msg_sender = msg_comps[1]
# msg_type = msg_comps[0]
msg_sender = rpc_incoming.publish.from_id
# msg_origin = msg_comps[2]
msg_id = msg_comps[3]
print("HIT ME1")
msg_id = rpc_incoming.publish.seqno
if msg_id not in self.seen_messages:
print("HIT ME")
# Do stuff with incoming unseen message
should_publish = True
if msg_type == "subscription":
@ -161,7 +135,7 @@ class Pubsub():
# Send hello packet
hello = self.get_hello_packet()
await stream.write(hello.encode())
await stream.write(hello)
# Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream))
@ -187,7 +161,7 @@ class Pubsub():
# Send hello packet
hello = self.get_hello_packet()
await stream.write(hello.encode())
await stream.write(hello)
# Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream))