reworked subscribe unsubsrcibe

This commit is contained in:
zixuanzh 2019-03-30 17:59:08 -04:00 committed by Stuckinaboot
parent 83de4f2972
commit c03ba881f2

View File

@ -61,7 +61,7 @@ class Pubsub():
""" """
Generate subscription message with all topics we are subscribed to Generate subscription message with all topics we are subscribed to
""" """
print("MAKING HELLO PACKET")
packet = rpc_pb2.RPC() packet = rpc_pb2.RPC()
message = rpc_pb2.Message( message = rpc_pb2.Message(
from_id=str(self.host.get_id()).encode('utf-8'), from_id=str(self.host.get_id()).encode('utf-8'),
@ -74,7 +74,7 @@ class Pubsub():
return packet.SerializeToString() return packet.SerializeToString()
async def continously_read_stream(self, stream): async def continuously_read_stream(self, stream):
""" """
Read from input stream in an infinite loop. Process Read from input stream in an infinite loop. Process
messages from other nodes, which for now are considered MessageTalk messages from other nodes, which for now are considered MessageTalk
@ -88,16 +88,12 @@ class Pubsub():
while True: while True:
incoming = (await stream.read()) incoming = (await stream.read())
rpc_incoming = rpc_pb2.RPC() rpc_incoming = rpc_pb2.RPC()
print("JUST GOT STRING")
print(incoming)
rpc_incoming.ParseFromString(incoming) rpc_incoming.ParseFromString(incoming)
print("JUST GOT") print ("CONTINUOUSLY")
print (rpc_incoming) print (rpc_incoming)
if rpc_incoming.publish:
if hasattr(rpc_incoming, 'publish'):
# deal with "talk messages" # deal with "talk messages"
for msg in rpc_incoming.publish: for msg in rpc_incoming.publish:
# TODO check what from_id is in go? is it origin or peer
old_format = MessageTalk(peer_id, old_format = MessageTalk(peer_id,
msg.from_id, msg.from_id,
msg.topicIDs, msg.topicIDs,
@ -107,48 +103,16 @@ class Pubsub():
await self.handle_talk(old_format) await self.handle_talk(old_format)
await self.router.publish(peer_id, msg) await self.router.publish(peer_id, msg)
if hasattr(rpc_incoming, 'subscriptions'): if rpc_incoming.subscriptions:
# deal with "subscription messages" # deal with "subscription messages"
subs_map = {} subs_map = {}
for msg in rpc_incoming.subscriptions: for msg in rpc_incoming.subscriptions:
if msg.subscribe: if msg.subscribe:
subs_map[msg.topic_id] = "sub" subs_map[msg.topicid] = "sub"
else: else:
subs_map[msg.topic_id] = "unsub" subs_map[msg.topicid] = "unsub"
old_format = MessageSub(peer_id, peer_id, subs_map, generate_message_id())
self.handle_subscription(old_format)
# msg_type = msg_comps[0] self.handle_subscription(rpc_incoming)
# msg_sender = rpc_incoming.publish.from_id
# # msg_origin = msg_comps[2]
# msg_id = rpc_incoming.publish.seqno
# if msg_id not in self.seen_messages:
# # Do stuff with incoming unseen message
# should_publish = True
# if msg_type == "subscription":
# self.handle_subscription(incoming)
# # We don't need to relay the subscription to our
# # peers because a given node only needs its peers
# # to know that it is subscribed to the topic (doesn't
# # need everyone to know)
# should_publish = False
# elif msg_type == "talk":
# await self.handle_talk(incoming)
# Add message id to seen
# self.seen_messages.append(msg_id)
# Publish message using router's publish
# if should_publish:
# msg = create_message_talk(incoming)
# # Adjust raw_msg to that the message sender
# # is now our peer_id
# msg.from_id = str(self.host.get_id())
# await self.router.publish(msg_sender, msg.to_str())
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
@ -167,10 +131,10 @@ class Pubsub():
# Send hello packet # Send hello packet
hello = self.get_hello_packet() hello = self.get_hello_packet()
print(hello)
await stream.write(hello) await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream)) asyncio.ensure_future(self.continuously_read_stream(stream))
async def handle_peer_queue(self): async def handle_peer_queue(self):
""" """
@ -197,30 +161,30 @@ class Pubsub():
await stream.write(hello) await stream.write(hello)
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continously_read_stream(stream)) asyncio.ensure_future(self.continuously_read_stream(stream))
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
def handle_subscription(self, subscription): def handle_subscription(self, rpc_message):
""" """
Handle an incoming subscription message from a peer. Update internal Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as mapping to mark the peer as subscribed or unsubscribed to topics as
defined in the subscription message defined in the subscription message
:param subscription: raw data constituting a subscription message :param subscription: raw data constituting a subscription message
""" """
sub_msg = subscription for sub_msg in rpc_message.subscriptions:
if sub_msg.subs_map:
print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id)
for topic_id in sub_msg.subs_map:
# Look at each subscription in the msg individually # Look at each subscription in the msg individually
if sub_msg.subs_map[topic_id]: if sub_msg.subscribe:
if topic_id not in self.peer_topics: origin_id = rpc_message.publish[0].from_id
if sub_msg.topicid not in self.peer_topics:
# Create topic list if it did not yet exist # Create topic list if it did not yet exist
self.peer_topics[topic_id] = [sub_msg.origin_id] self.peer_topics[sub_msg.topicid] = origin_id
elif sub_msg.origin_id not in self.peer_topics[topic_id]: elif orgin_id not in self.peer_topics[sub_msg.topicid]:
# Add peer to topic # Add peer to topic
self.peer_topics[topic_id].append(sub_msg.origin_id) self.peer_topics[sub_msg.topicid].append(origin_id)
else: else:
# TODO: Remove peer from topic # TODO: Remove peer from topic
pass pass
@ -250,13 +214,18 @@ class Pubsub():
self.my_topics[topic_id] = asyncio.Queue() self.my_topics[topic_id] = asyncio.Queue()
# Create subscribe message # Create subscribe message
sub_msg = MessageSub( packet = rpc_pb2.RPC()
str(self.host.get_id()),\ packet.publish.extend([rpc_pb2.Message(
str(self.host.get_id()), {topic_id: True}, generate_message_id()\ from_id=str(self.host.get_id()).encode('utf-8'),
) seqno=str(generate_message_id()).encode('utf-8')
)])
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe = True,
topicid = topic_id.encode('utf-8')
)])
# Send out subscribe message to all peers # Send out subscribe message to all peers
await self.message_all_peers(sub_msg.to_str()) await self.message_all_peers(packet.SerializeToString())
# Tell router we are joining this topic # Tell router we are joining this topic
self.router.join(topic_id) self.router.join(topic_id)
@ -275,27 +244,31 @@ class Pubsub():
del self.my_topics[topic_id] del self.my_topics[topic_id]
# Create unsubscribe message # Create unsubscribe message
unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\ packet = rpc_pb2.RPC()
{topic_id: False}, generate_message_id()) packet.publish.extend([rpc_pb2.Message(
from_id=str(self.host.get_id()).encode('utf-8'),
seqno=str(generate_message_id()).encode('utf-8')
)])
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
subscribe = False,
topicid = topic_id.encode('utf-8')
)])
# Send out unsubscribe message to all peers # Send out unsubscribe message to all peers
await self.message_all_peers(unsub_msg.to_str()) await self.message_all_peers(packet.SerializeToString())
# Tell router we are leaving this topic # Tell router we are leaving this topic
self.router.leave(topic_id) self.router.leave(topic_id)
async def message_all_peers(self, raw_msg): async def message_all_peers(self, rpc_msg):
""" """
Broadcast a message to peers Broadcast a message to peers
:param raw_msg: raw contents of the message to broadcast :param raw_msg: raw contents of the message to broadcast
""" """
# Encode message for sending
encoded_msg = raw_msg.encode()
# Broadcast message # Broadcast message
for peer in self.peers: for peer in self.peers:
stream = self.peers[peer] stream = self.peers[peer]
# Write message to stream # Write message to stream
await stream.write(encoded_msg) await stream.write(rpc_msg)