reworked subscribe unsubsrcibe
This commit is contained in:
parent
75c12d4aed
commit
598b6b257f
@ -61,7 +61,7 @@ class Pubsub():
|
||||
"""
|
||||
Generate subscription message with all topics we are subscribed to
|
||||
"""
|
||||
print("MAKING HELLO PACKET")
|
||||
|
||||
packet = rpc_pb2.RPC()
|
||||
message = rpc_pb2.Message(
|
||||
from_id=str(self.host.get_id()).encode('utf-8'),
|
||||
@ -74,7 +74,7 @@ class Pubsub():
|
||||
|
||||
return packet.SerializeToString()
|
||||
|
||||
async def continously_read_stream(self, stream):
|
||||
async def continuously_read_stream(self, stream):
|
||||
"""
|
||||
Read from input stream in an infinite loop. Process
|
||||
messages from other nodes, which for now are considered MessageTalk
|
||||
@ -88,16 +88,12 @@ class Pubsub():
|
||||
while True:
|
||||
incoming = (await stream.read())
|
||||
rpc_incoming = rpc_pb2.RPC()
|
||||
print("JUST GOT STRING")
|
||||
print(incoming)
|
||||
rpc_incoming.ParseFromString(incoming)
|
||||
print("JUST GOT")
|
||||
print ("CONTINUOUSLY")
|
||||
print (rpc_incoming)
|
||||
|
||||
if hasattr(rpc_incoming, 'publish'):
|
||||
if rpc_incoming.publish:
|
||||
# deal with "talk messages"
|
||||
for msg in rpc_incoming.publish:
|
||||
# TODO check what from_id is in go? is it origin or peer
|
||||
old_format = MessageTalk(peer_id,
|
||||
msg.from_id,
|
||||
msg.topicIDs,
|
||||
@ -107,48 +103,16 @@ class Pubsub():
|
||||
await self.handle_talk(old_format)
|
||||
await self.router.publish(peer_id, msg)
|
||||
|
||||
if hasattr(rpc_incoming, 'subscriptions'):
|
||||
if rpc_incoming.subscriptions:
|
||||
# deal with "subscription messages"
|
||||
subs_map = {}
|
||||
for msg in rpc_incoming.subscriptions:
|
||||
if msg.subscribe:
|
||||
subs_map[msg.topic_id] = "sub"
|
||||
subs_map[msg.topicid] = "sub"
|
||||
else:
|
||||
subs_map[msg.topic_id] = "unsub"
|
||||
old_format = MessageSub(peer_id, peer_id, subs_map, generate_message_id())
|
||||
self.handle_subscription(old_format)
|
||||
subs_map[msg.topicid] = "unsub"
|
||||
|
||||
# msg_type = msg_comps[0]
|
||||
|
||||
# msg_sender = rpc_incoming.publish.from_id
|
||||
# # msg_origin = msg_comps[2]
|
||||
# msg_id = rpc_incoming.publish.seqno
|
||||
# if msg_id not in self.seen_messages:
|
||||
# # Do stuff with incoming unseen message
|
||||
# should_publish = True
|
||||
# if msg_type == "subscription":
|
||||
# self.handle_subscription(incoming)
|
||||
|
||||
# # We don't need to relay the subscription to our
|
||||
# # peers because a given node only needs its peers
|
||||
# # to know that it is subscribed to the topic (doesn't
|
||||
# # need everyone to know)
|
||||
# should_publish = False
|
||||
# elif msg_type == "talk":
|
||||
# await self.handle_talk(incoming)
|
||||
|
||||
# Add message id to seen
|
||||
# self.seen_messages.append(msg_id)
|
||||
|
||||
# Publish message using router's publish
|
||||
# if should_publish:
|
||||
# msg = create_message_talk(incoming)
|
||||
|
||||
# # Adjust raw_msg to that the message sender
|
||||
# # is now our peer_id
|
||||
# msg.from_id = str(self.host.get_id())
|
||||
|
||||
# await self.router.publish(msg_sender, msg.to_str())
|
||||
self.handle_subscription(rpc_incoming)
|
||||
|
||||
# Force context switch
|
||||
await asyncio.sleep(0)
|
||||
@ -167,10 +131,10 @@ class Pubsub():
|
||||
|
||||
# Send hello packet
|
||||
hello = self.get_hello_packet()
|
||||
print(hello)
|
||||
|
||||
await stream.write(hello)
|
||||
# Pass stream off to stream reader
|
||||
asyncio.ensure_future(self.continously_read_stream(stream))
|
||||
asyncio.ensure_future(self.continuously_read_stream(stream))
|
||||
|
||||
async def handle_peer_queue(self):
|
||||
"""
|
||||
@ -197,30 +161,30 @@ class Pubsub():
|
||||
await stream.write(hello)
|
||||
|
||||
# Pass stream off to stream reader
|
||||
asyncio.ensure_future(self.continously_read_stream(stream))
|
||||
asyncio.ensure_future(self.continuously_read_stream(stream))
|
||||
|
||||
# Force context switch
|
||||
await asyncio.sleep(0)
|
||||
|
||||
def handle_subscription(self, subscription):
|
||||
def handle_subscription(self, rpc_message):
|
||||
"""
|
||||
Handle an incoming subscription message from a peer. Update internal
|
||||
mapping to mark the peer as subscribed or unsubscribed to topics as
|
||||
defined in the subscription message
|
||||
:param subscription: raw data constituting a subscription message
|
||||
"""
|
||||
sub_msg = subscription
|
||||
if sub_msg.subs_map:
|
||||
print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id)
|
||||
for topic_id in sub_msg.subs_map:
|
||||
for sub_msg in rpc_message.subscriptions:
|
||||
|
||||
# Look at each subscription in the msg individually
|
||||
if sub_msg.subs_map[topic_id]:
|
||||
if topic_id not in self.peer_topics:
|
||||
if sub_msg.subscribe:
|
||||
origin_id = rpc_message.publish[0].from_id
|
||||
|
||||
if sub_msg.topicid not in self.peer_topics:
|
||||
# Create topic list if it did not yet exist
|
||||
self.peer_topics[topic_id] = [sub_msg.origin_id]
|
||||
elif sub_msg.origin_id not in self.peer_topics[topic_id]:
|
||||
self.peer_topics[sub_msg.topicid] = origin_id
|
||||
elif orgin_id not in self.peer_topics[sub_msg.topicid]:
|
||||
# Add peer to topic
|
||||
self.peer_topics[topic_id].append(sub_msg.origin_id)
|
||||
self.peer_topics[sub_msg.topicid].append(origin_id)
|
||||
else:
|
||||
# TODO: Remove peer from topic
|
||||
pass
|
||||
@ -250,13 +214,18 @@ class Pubsub():
|
||||
self.my_topics[topic_id] = asyncio.Queue()
|
||||
|
||||
# Create subscribe message
|
||||
sub_msg = MessageSub(
|
||||
str(self.host.get_id()),\
|
||||
str(self.host.get_id()), {topic_id: True}, generate_message_id()\
|
||||
)
|
||||
packet = rpc_pb2.RPC()
|
||||
packet.publish.extend([rpc_pb2.Message(
|
||||
from_id=str(self.host.get_id()).encode('utf-8'),
|
||||
seqno=str(generate_message_id()).encode('utf-8')
|
||||
)])
|
||||
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
|
||||
subscribe = True,
|
||||
topicid = topic_id.encode('utf-8')
|
||||
)])
|
||||
|
||||
# Send out subscribe message to all peers
|
||||
await self.message_all_peers(sub_msg.to_str())
|
||||
await self.message_all_peers(packet.SerializeToString())
|
||||
|
||||
# Tell router we are joining this topic
|
||||
self.router.join(topic_id)
|
||||
@ -275,27 +244,31 @@ class Pubsub():
|
||||
del self.my_topics[topic_id]
|
||||
|
||||
# Create unsubscribe message
|
||||
unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\
|
||||
{topic_id: False}, generate_message_id())
|
||||
packet = rpc_pb2.RPC()
|
||||
packet.publish.extend([rpc_pb2.Message(
|
||||
from_id=str(self.host.get_id()).encode('utf-8'),
|
||||
seqno=str(generate_message_id()).encode('utf-8')
|
||||
)])
|
||||
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(
|
||||
subscribe = False,
|
||||
topicid = topic_id.encode('utf-8')
|
||||
)])
|
||||
|
||||
# Send out unsubscribe message to all peers
|
||||
await self.message_all_peers(unsub_msg.to_str())
|
||||
await self.message_all_peers(packet.SerializeToString())
|
||||
|
||||
# Tell router we are leaving this topic
|
||||
self.router.leave(topic_id)
|
||||
|
||||
async def message_all_peers(self, raw_msg):
|
||||
async def message_all_peers(self, rpc_msg):
|
||||
"""
|
||||
Broadcast a message to peers
|
||||
:param raw_msg: raw contents of the message to broadcast
|
||||
"""
|
||||
|
||||
# Encode message for sending
|
||||
encoded_msg = raw_msg.encode()
|
||||
|
||||
# Broadcast message
|
||||
for peer in self.peers:
|
||||
stream = self.peers[peer]
|
||||
|
||||
# Write message to stream
|
||||
await stream.write(encoded_msg)
|
||||
await stream.write(rpc_msg)
|
||||
|
Loading…
x
Reference in New Issue
Block a user