reworked subscribe unsubsrcibe

This commit is contained in:
zixuanzh 2019-03-30 17:59:08 -04:00
parent bf17f424b3
commit aec783b843

View File

@ -61,7 +61,7 @@ class Pubsub():
"""
Generate subscription message with all topics we are subscribed to
"""
print("MAKING HELLO PACKET")
packet = rpc_pb2.RPC()
message = rpc_pb2.Message(
from_id=str(self.host.get_id()).encode('utf-8'),
@ -74,7 +74,7 @@ class Pubsub():
return packet.SerializeToString()
async def continously_read_stream(self, stream):
async def continuously_read_stream(self, stream):
"""
Read from input stream in an infinite loop. Process
messages from other nodes, which for now are considered MessageTalk
@ -88,16 +88,12 @@ class Pubsub():
while True:
incoming = (await stream.read())
rpc_incoming = rpc_pb2.RPC()
print("JUST GOT STRING")
print(incoming)
rpc_incoming.ParseFromString(incoming)
print("JUST GOT")
print ("CONTINUOUSLY")
print (rpc_incoming)
if hasattr(rpc_incoming, 'publish'):
if rpc_incoming.publish:
# deal with "talk messages"
for msg in rpc_incoming.publish:
# TODO check what from_id is in go? is it origin or peer
old_format = MessageTalk(peer_id,
msg.from_id,
msg.topicIDs,
@ -107,48 +103,16 @@ class Pubsub():
await self.handle_talk(old_format)
await self.router.publish(peer_id, msg)
if hasattr(rpc_incoming, 'subscriptions'):
if rpc_incoming.subscriptions:
# deal with "subscription messages"
subs_map = {}
for msg in rpc_incoming.subscriptions:
if msg.subscribe:
subs_map[msg.topic_id] = "sub"
subs_map[msg.topicid] = "sub"
else:
subs_map[msg.topic_id] = "unsub"
old_format = MessageSub(peer_id, peer_id, subs_map, generate_message_id())
self.handle_subscription(old_format)
subs_map[msg.topicid] = "unsub"
# msg_type = msg_comps[0]
# msg_sender = rpc_incoming.publish.from_id
# # msg_origin = msg_comps[2]
# msg_id = rpc_incoming.publish.seqno
# if msg_id not in self.seen_messages:
# # Do stuff with incoming unseen message
# should_publish = True
# if msg_type == "subscription":
# self.handle_subscription(incoming)
# # We don't need to relay the subscription to our
# # peers because a given node only needs its peers
# # to know that it is subscribed to the topic (doesn't
# # need everyone to know)
# should_publish = False
# elif msg_type == "talk":
# await self.handle_talk(incoming)
# Add message id to seen
# self.seen_messages.append(msg_id)
# Publish message using router's publish
# if should_publish:
# msg = create_message_talk(incoming)
# # Adjust raw_msg to that the message sender
# # is now our peer_id
# msg.from_id = str(self.host.get_id())
# await self.router.publish(msg_sender, msg.to_str())
self.handle_subscription(rpc_incoming)
# Force context switch
await asyncio.sleep(0)
@ -167,10 +131,10 @@ class Pubsub():
# Send hello packet
hello = self.get_hello_packet()
print(hello)
await stream.write(hello)
# Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream))
asyncio.ensure_future(self.continuously_read_stream(stream))
async def handle_peer_queue(self):
"""
@ -197,30 +161,30 @@ class Pubsub():
await stream.write(hello)
# Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream))
asyncio.ensure_future(self.continuously_read_stream(stream))
# Force context switch
await asyncio.sleep(0)
def handle_subscription(self, subscription):
def handle_subscription(self, rpc_message):
"""
Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as
defined in the subscription message
:param subscription: raw data constituting a subscription message
"""
sub_msg = subscription
if sub_msg.subs_map:
print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id)
for topic_id in sub_msg.subs_map:
for sub_msg in rpc_message.subscriptions:
# Look at each subscription in the msg individually
if sub_msg.subs_map[topic_id]:
if topic_id not in self.peer_topics:
if sub_msg.subscribe:
origin_id = rpc_message.publish[0].from_id
if sub_msg.topicid not in self.peer_topics:
# Create topic list if it did not yet exist
self.peer_topics[topic_id] = [sub_msg.origin_id]
elif sub_msg.origin_id not in self.peer_topics[topic_id]:
self.peer_topics[sub_msg.topicid] = origin_id
elif orgin_id not in self.peer_topics[sub_msg.topicid]:
# Add peer to topic
self.peer_topics[topic_id].append(sub_msg.origin_id)
self.peer_topics[sub_msg.topicid].append(origin_id)
else:
# TODO: Remove peer from topic
pass
@ -250,13 +214,18 @@ class Pubsub():
self.my_topics[topic_id] = asyncio.Queue()
# Create subscribe message
sub_msg = MessageSub(
str(self.host.get_id()),\
str(self.host.get_id()), {topic_id: True}, generate_message_id()\
)
packet = rpc_pb2.RPC()
packet.publish.extend([rpc_pb2.Message(
from_id=str(self.host.get_id()).encode('utf-8'),
seqno=str(generate_message_id()).encode('utf-8')
)])
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe = True,
topicid = topic_id.encode('utf-8')
)])
# Send out subscribe message to all peers
await self.message_all_peers(sub_msg.to_str())
await self.message_all_peers(packet.SerializeToString())
# Tell router we are joining this topic
self.router.join(topic_id)
@ -275,27 +244,31 @@ class Pubsub():
del self.my_topics[topic_id]
# Create unsubscribe message
unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\
{topic_id: False}, generate_message_id())
packet = rpc_pb2.RPC()
packet.publish.extend([rpc_pb2.Message(
from_id=str(self.host.get_id()).encode('utf-8'),
seqno=str(generate_message_id()).encode('utf-8')
)])
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe = False,
topicid = topic_id.encode('utf-8')
)])
# Send out unsubscribe message to all peers
await self.message_all_peers(unsub_msg.to_str())
await self.message_all_peers(packet.SerializeToString())
# Tell router we are leaving this topic
self.router.leave(topic_id)
async def message_all_peers(self, raw_msg):
async def message_all_peers(self, rpc_msg):
"""
Broadcast a message to peers
:param raw_msg: raw contents of the message to broadcast
"""
# Encode message for sending
encoded_msg = raw_msg.encode()
# Broadcast message
for peer in self.peers:
stream = self.peers[peer]
# Write message to stream
await stream.write(encoded_msg)
await stream.write(rpc_msg)