From 65526a3319eb18d44edd0e70c7e34efc28ba6463 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sat, 30 Mar 2019 19:12:31 -0400 Subject: [PATCH] remove Message from dummy account --- tests/pubsub/dummy_account_node.py | 57 +++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index f328a6e..de78211 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,12 +1,12 @@ import asyncio +import uuid import multiaddr from libp2p import new_node -from libp2p.pubsub.message import create_message_talk +from libp2p.pubsub.pb import rpc_pb2_grpc +from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.message import MessageTalk -from libp2p.pubsub.message import generate_message_id SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] CRYPTO_TOPIC = "ethereum" @@ -53,16 +53,15 @@ class DummyAccountNode(): Handle all incoming messages on the CRYPTO_TOPIC from peers """ while True: - message_raw = await self.q.get() - message = create_message_talk(message_raw) - contents = message.data - - msg_comps = contents.split(",") - - if msg_comps[0] == "send": - self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) - elif msg_comps[0] == "set": - self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2])) + incoming = await self.q.get() + rpc_incoming = rpc_pb2.RPC() + rpc_incoming.ParseFromString(incoming) + for message in rpc_incoming.publish: + msg_comps = message.split(",") + if msg_comps[0] == "send": + self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) + elif msg_comps[0] == "set": + self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2])) async def setup_crypto_networking(self): """ @@ -82,8 +81,8 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) - msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) - await self.floodsub.publish(my_id, msg.to_str()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + await self.floodsub.publish(my_id, msg.SerializeToString()) async def publish_set_crypto(self, user, amount): """ @@ -93,8 +92,9 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "set," + user + "," + str(amount) - msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) - await self.floodsub.publish(my_id, msg.to_str()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + + await self.floodsub.publish(my_id, packet.SerializeToString()) def handle_send_crypto(self, source_user, dest_user, amount): """ @@ -132,3 +132,26 @@ class DummyAccountNode(): else: return -1 +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) + +def generate_RPC_packet(origin_id, topics, msg_content, msg_id): + packet = rpc_pb2.RPC() + packet.publish.extend([rpc_pb2.Message( + from_id=origin_id.encode('utf-8'), + seqno=msg_id.encode('utf-8'), + data=msg_content.encode('utf-8'), + topicIDs=topics.encode('utf-8')) + ]) + + for topic in topics: + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe=True, + topicid = topic.encode('utf-8') + )]) + + return packet