From cb6b76d3b55bd6af97eac5907ba0cba5dfc5e5df Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Sun, 24 Mar 2019 15:08:36 -0400 Subject: [PATCH 01/19] added rpc.proto from go repo --- rpc.proto | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 rpc.proto diff --git a/rpc.proto b/rpc.proto new file mode 100644 index 0000000..2ae2ef2 --- /dev/null +++ b/rpc.proto @@ -0,0 +1,78 @@ +// Borrowed from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto + +syntax = "proto2"; + +package pubsub.pb; + +message RPC { + repeated SubOpts subscriptions = 1; + repeated Message publish = 2; + + message SubOpts { + optional bool subscribe = 1; // subscribe or unsubcribe + optional string topicid = 2; + } + + optional ControlMessage control = 3; +} + +message Message { + optional bytes from = 1; + optional bytes data = 2; + optional bytes seqno = 3; + repeated string topicIDs = 4; + optional bytes signature = 5; + optional bytes key = 6; +} + +message ControlMessage { + repeated ControlIHave ihave = 1; + repeated ControlIWant iwant = 2; + repeated ControlGraft graft = 3; + repeated ControlPrune prune = 4; +} + +message ControlIHave { + optional string topicID = 1; + repeated string messageIDs = 2; +} + +message ControlIWant { + repeated string messageIDs = 1; +} + +message ControlGraft { + optional string topicID = 1; +} + +message ControlPrune { + optional string topicID = 1; +} + +message TopicDescriptor { + optional string name = 1; + optional AuthOpts auth = 2; + optional EncOpts enc = 3; + + message AuthOpts { + optional AuthMode mode = 1; + repeated bytes keys = 2; // root keys to trust + + enum AuthMode { + NONE = 0; // no authentication, anyone can publish + KEY = 1; // only messages signed by keys in the topic descriptor are accepted + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } + + message EncOpts { + optional EncMode mode = 1; + repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) + + enum EncMode { + NONE = 0; // no encryption, anyone can read + SHAREDKEY = 1; // messages are encrypted with shared key + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } +} \ No newline at end of file From 041e0fbe3453e1eb17143820dc48298adae44e61 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Thu, 28 Mar 2019 09:41:38 -0400 Subject: [PATCH 02/19] add pubsub proto --- libp2p/pubsub/pb/rpc.proto | 76 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 libp2p/pubsub/pb/rpc.proto diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto new file mode 100644 index 0000000..764b033 --- /dev/null +++ b/libp2p/pubsub/pb/rpc.proto @@ -0,0 +1,76 @@ +syntax = "proto2"; + +package pubsub.pb; + +message RPC { + repeated SubOpts subscriptions = 1; + repeated Message publish = 2; + + message SubOpts { + optional bool subscribe = 1; // subscribe or unsubcribe + optional string topicid = 2; + } + + optional ControlMessage control = 3; +} + +message Message { + optional bytes from = 1; + optional bytes data = 2; + optional bytes seqno = 3; + repeated string topicIDs = 4; + optional bytes signature = 5; + optional bytes key = 6; +} + +message ControlMessage { + repeated ControlIHave ihave = 1; + repeated ControlIWant iwant = 2; + repeated ControlGraft graft = 3; + repeated ControlPrune prune = 4; +} + +message ControlIHave { + optional string topicID = 1; + repeated string messageIDs = 2; +} + +message ControlIWant { + repeated string messageIDs = 1; +} + +message ControlGraft { + optional string topicID = 1; +} + +message ControlPrune { + optional string topicID = 1; +} + +message TopicDescriptor { + optional string name = 1; + optional AuthOpts auth = 2; + optional EncOpts enc = 3; + + message AuthOpts { + optional AuthMode mode = 1; + repeated bytes keys = 2; // root keys to trust + + enum AuthMode { + NONE = 0; // no authentication, anyone can publish + KEY = 1; // only messages signed by keys in the topic descriptor are accepted + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } + + message EncOpts { + optional EncMode mode = 1; + repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) + + enum EncMode { + NONE = 0; // no encryption, anyone can read + SHAREDKEY = 1; // messages are encrypted with shared key + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } +} \ No newline at end of file From 1e8d93fcf6777b62297c0d524a7dd1bae933ca20 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Thu, 28 Mar 2019 09:55:14 -0400 Subject: [PATCH 03/19] add generated rpc code --- libp2p/pubsub/pb/rpc_pb2.py | 638 +++++++++++++++++++++++++++++++ libp2p/pubsub/pb/rpc_pb2_grpc.py | 3 + 2 files changed, 641 insertions(+) create mode 100644 libp2p/pubsub/pb/rpc_pb2.py create mode 100644 libp2p/pubsub/pb/rpc_pb2_grpc.py diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py new file mode 100644 index 0000000..57035ed --- /dev/null +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -0,0 +1,638 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: rpc.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='rpc.proto', + package='pubsub.pb', + syntax='proto2', + serialized_options=None, + serialized_pb=_b('\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"f\n\x07Message\x12\x0c\n\x04\x66rom\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +) + + + +_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE = _descriptor.EnumDescriptor( + name='AuthMode', + full_name='pubsub.pb.TopicDescriptor.AuthOpts.AuthMode', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='NONE', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='KEY', index=1, number=1, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='WOT', index=2, number=2, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=865, + serialized_end=903, +) +_sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE) + +_TOPICDESCRIPTOR_ENCOPTS_ENCMODE = _descriptor.EnumDescriptor( + name='EncMode', + full_name='pubsub.pb.TopicDescriptor.EncOpts.EncMode', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='NONE', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SHAREDKEY', index=1, number=1, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='WOT', index=2, number=2, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=994, + serialized_end=1037, +) +_sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_ENCOPTS_ENCMODE) + + +_RPC_SUBOPTS = _descriptor.Descriptor( + name='SubOpts', + full_name='pubsub.pb.RPC.SubOpts', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='subscribe', full_name='pubsub.pb.RPC.SubOpts.subscribe', index=0, + number=1, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='topicid', full_name='pubsub.pb.RPC.SubOpts.topicid', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=160, + serialized_end=205, +) + +_RPC = _descriptor.Descriptor( + name='RPC', + full_name='pubsub.pb.RPC', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='subscriptions', full_name='pubsub.pb.RPC.subscriptions', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='publish', full_name='pubsub.pb.RPC.publish', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='control', full_name='pubsub.pb.RPC.control', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_RPC_SUBOPTS, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=25, + serialized_end=205, +) + + +_MESSAGE = _descriptor.Descriptor( + name='Message', + full_name='pubsub.pb.Message', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='from', full_name='pubsub.pb.Message.from', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='data', full_name='pubsub.pb.Message.data', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='seqno', full_name='pubsub.pb.Message.seqno', index=2, + number=3, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='topicIDs', full_name='pubsub.pb.Message.topicIDs', index=3, + number=4, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='signature', full_name='pubsub.pb.Message.signature', index=4, + number=5, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='key', full_name='pubsub.pb.Message.key', index=5, + number=6, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=207, + serialized_end=309, +) + + +_CONTROLMESSAGE = _descriptor.Descriptor( + name='ControlMessage', + full_name='pubsub.pb.ControlMessage', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='ihave', full_name='pubsub.pb.ControlMessage.ihave', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='iwant', full_name='pubsub.pb.ControlMessage.iwant', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='graft', full_name='pubsub.pb.ControlMessage.graft', index=2, + number=3, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='prune', full_name='pubsub.pb.ControlMessage.prune', index=3, + number=4, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=312, + serialized_end=488, +) + + +_CONTROLIHAVE = _descriptor.Descriptor( + name='ControlIHave', + full_name='pubsub.pb.ControlIHave', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='topicID', full_name='pubsub.pb.ControlIHave.topicID', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='messageIDs', full_name='pubsub.pb.ControlIHave.messageIDs', index=1, + number=2, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=490, + serialized_end=541, +) + + +_CONTROLIWANT = _descriptor.Descriptor( + name='ControlIWant', + full_name='pubsub.pb.ControlIWant', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='messageIDs', full_name='pubsub.pb.ControlIWant.messageIDs', index=0, + number=1, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=543, + serialized_end=577, +) + + +_CONTROLGRAFT = _descriptor.Descriptor( + name='ControlGraft', + full_name='pubsub.pb.ControlGraft', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='topicID', full_name='pubsub.pb.ControlGraft.topicID', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=579, + serialized_end=610, +) + + +_CONTROLPRUNE = _descriptor.Descriptor( + name='ControlPrune', + full_name='pubsub.pb.ControlPrune', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='topicID', full_name='pubsub.pb.ControlPrune.topicID', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=612, + serialized_end=643, +) + + +_TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor( + name='AuthOpts', + full_name='pubsub.pb.TopicDescriptor.AuthOpts', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='mode', full_name='pubsub.pb.TopicDescriptor.AuthOpts.mode', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='keys', full_name='pubsub.pb.TopicDescriptor.AuthOpts.keys', index=1, + number=2, type=12, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE, + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=779, + serialized_end=903, +) + +_TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( + name='EncOpts', + full_name='pubsub.pb.TopicDescriptor.EncOpts', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='mode', full_name='pubsub.pb.TopicDescriptor.EncOpts.mode', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='keyHashes', full_name='pubsub.pb.TopicDescriptor.EncOpts.keyHashes', index=1, + number=2, type=12, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _TOPICDESCRIPTOR_ENCOPTS_ENCMODE, + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=906, + serialized_end=1037, +) + +_TOPICDESCRIPTOR = _descriptor.Descriptor( + name='TopicDescriptor', + full_name='pubsub.pb.TopicDescriptor', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='pubsub.pb.TopicDescriptor.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='auth', full_name='pubsub.pb.TopicDescriptor.auth', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='enc', full_name='pubsub.pb.TopicDescriptor.enc', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_TOPICDESCRIPTOR_AUTHOPTS, _TOPICDESCRIPTOR_ENCOPTS, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=646, + serialized_end=1037, +) + +_RPC_SUBOPTS.containing_type = _RPC +_RPC.fields_by_name['subscriptions'].message_type = _RPC_SUBOPTS +_RPC.fields_by_name['publish'].message_type = _MESSAGE +_RPC.fields_by_name['control'].message_type = _CONTROLMESSAGE +_CONTROLMESSAGE.fields_by_name['ihave'].message_type = _CONTROLIHAVE +_CONTROLMESSAGE.fields_by_name['iwant'].message_type = _CONTROLIWANT +_CONTROLMESSAGE.fields_by_name['graft'].message_type = _CONTROLGRAFT +_CONTROLMESSAGE.fields_by_name['prune'].message_type = _CONTROLPRUNE +_TOPICDESCRIPTOR_AUTHOPTS.fields_by_name['mode'].enum_type = _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE +_TOPICDESCRIPTOR_AUTHOPTS.containing_type = _TOPICDESCRIPTOR +_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE.containing_type = _TOPICDESCRIPTOR_AUTHOPTS +_TOPICDESCRIPTOR_ENCOPTS.fields_by_name['mode'].enum_type = _TOPICDESCRIPTOR_ENCOPTS_ENCMODE +_TOPICDESCRIPTOR_ENCOPTS.containing_type = _TOPICDESCRIPTOR +_TOPICDESCRIPTOR_ENCOPTS_ENCMODE.containing_type = _TOPICDESCRIPTOR_ENCOPTS +_TOPICDESCRIPTOR.fields_by_name['auth'].message_type = _TOPICDESCRIPTOR_AUTHOPTS +_TOPICDESCRIPTOR.fields_by_name['enc'].message_type = _TOPICDESCRIPTOR_ENCOPTS +DESCRIPTOR.message_types_by_name['RPC'] = _RPC +DESCRIPTOR.message_types_by_name['Message'] = _MESSAGE +DESCRIPTOR.message_types_by_name['ControlMessage'] = _CONTROLMESSAGE +DESCRIPTOR.message_types_by_name['ControlIHave'] = _CONTROLIHAVE +DESCRIPTOR.message_types_by_name['ControlIWant'] = _CONTROLIWANT +DESCRIPTOR.message_types_by_name['ControlGraft'] = _CONTROLGRAFT +DESCRIPTOR.message_types_by_name['ControlPrune'] = _CONTROLPRUNE +DESCRIPTOR.message_types_by_name['TopicDescriptor'] = _TOPICDESCRIPTOR +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +RPC = _reflection.GeneratedProtocolMessageType('RPC', (_message.Message,), dict( + + SubOpts = _reflection.GeneratedProtocolMessageType('SubOpts', (_message.Message,), dict( + DESCRIPTOR = _RPC_SUBOPTS, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.RPC.SubOpts) + )) + , + DESCRIPTOR = _RPC, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.RPC) + )) +_sym_db.RegisterMessage(RPC) +_sym_db.RegisterMessage(RPC.SubOpts) + +Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict( + DESCRIPTOR = _MESSAGE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.Message) + )) +_sym_db.RegisterMessage(Message) + +ControlMessage = _reflection.GeneratedProtocolMessageType('ControlMessage', (_message.Message,), dict( + DESCRIPTOR = _CONTROLMESSAGE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlMessage) + )) +_sym_db.RegisterMessage(ControlMessage) + +ControlIHave = _reflection.GeneratedProtocolMessageType('ControlIHave', (_message.Message,), dict( + DESCRIPTOR = _CONTROLIHAVE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIHave) + )) +_sym_db.RegisterMessage(ControlIHave) + +ControlIWant = _reflection.GeneratedProtocolMessageType('ControlIWant', (_message.Message,), dict( + DESCRIPTOR = _CONTROLIWANT, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIWant) + )) +_sym_db.RegisterMessage(ControlIWant) + +ControlGraft = _reflection.GeneratedProtocolMessageType('ControlGraft', (_message.Message,), dict( + DESCRIPTOR = _CONTROLGRAFT, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlGraft) + )) +_sym_db.RegisterMessage(ControlGraft) + +ControlPrune = _reflection.GeneratedProtocolMessageType('ControlPrune', (_message.Message,), dict( + DESCRIPTOR = _CONTROLPRUNE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.ControlPrune) + )) +_sym_db.RegisterMessage(ControlPrune) + +TopicDescriptor = _reflection.GeneratedProtocolMessageType('TopicDescriptor', (_message.Message,), dict( + + AuthOpts = _reflection.GeneratedProtocolMessageType('AuthOpts', (_message.Message,), dict( + DESCRIPTOR = _TOPICDESCRIPTOR_AUTHOPTS, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.AuthOpts) + )) + , + + EncOpts = _reflection.GeneratedProtocolMessageType('EncOpts', (_message.Message,), dict( + DESCRIPTOR = _TOPICDESCRIPTOR_ENCOPTS, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.EncOpts) + )) + , + DESCRIPTOR = _TOPICDESCRIPTOR, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor) + )) +_sym_db.RegisterMessage(TopicDescriptor) +_sym_db.RegisterMessage(TopicDescriptor.AuthOpts) +_sym_db.RegisterMessage(TopicDescriptor.EncOpts) + + +# @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pb/rpc_pb2_grpc.py b/libp2p/pubsub/pb/rpc_pb2_grpc.py new file mode 100644 index 0000000..a894352 --- /dev/null +++ b/libp2p/pubsub/pb/rpc_pb2_grpc.py @@ -0,0 +1,3 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + From 81d121a0296739902ece55711ecc351fea7fad12 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Thu, 28 Mar 2019 15:23:56 -0400 Subject: [PATCH 04/19] update from to from_id in proto --- libp2p/pubsub/pb/rpc.proto | 2 +- libp2p/pubsub/pb/rpc_pb2.py | 46 ++++++++++++++++++------------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index 764b033..f07da20 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -15,7 +15,7 @@ message RPC { } message Message { - optional bytes from = 1; + optional bytes from_id = 1; optional bytes data = 2; optional bytes seqno = 3; repeated string topicIDs = 4; diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index 57035ed..d315782 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -19,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( package='pubsub.pb', syntax='proto2', serialized_options=None, - serialized_pb=_b('\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"f\n\x07Message\x12\x0c\n\x04\x66rom\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') + serialized_pb=_b('\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') ) @@ -45,8 +45,8 @@ _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=865, - serialized_end=903, + serialized_start=868, + serialized_end=906, ) _sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE) @@ -71,8 +71,8 @@ _TOPICDESCRIPTOR_ENCOPTS_ENCMODE = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=994, - serialized_end=1037, + serialized_start=997, + serialized_end=1040, ) _sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_ENCOPTS_ENCMODE) @@ -167,7 +167,7 @@ _MESSAGE = _descriptor.Descriptor( containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='from', full_name='pubsub.pb.Message.from', index=0, + name='from_id', full_name='pubsub.pb.Message.from_id', index=0, number=1, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, @@ -221,7 +221,7 @@ _MESSAGE = _descriptor.Descriptor( oneofs=[ ], serialized_start=207, - serialized_end=309, + serialized_end=312, ) @@ -272,8 +272,8 @@ _CONTROLMESSAGE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=312, - serialized_end=488, + serialized_start=315, + serialized_end=491, ) @@ -310,8 +310,8 @@ _CONTROLIHAVE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=490, - serialized_end=541, + serialized_start=493, + serialized_end=544, ) @@ -341,8 +341,8 @@ _CONTROLIWANT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=543, - serialized_end=577, + serialized_start=546, + serialized_end=580, ) @@ -372,8 +372,8 @@ _CONTROLGRAFT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=579, - serialized_end=610, + serialized_start=582, + serialized_end=613, ) @@ -403,8 +403,8 @@ _CONTROLPRUNE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=612, - serialized_end=643, + serialized_start=615, + serialized_end=646, ) @@ -442,8 +442,8 @@ _TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=779, - serialized_end=903, + serialized_start=782, + serialized_end=906, ) _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( @@ -480,8 +480,8 @@ _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=906, - serialized_end=1037, + serialized_start=909, + serialized_end=1040, ) _TOPICDESCRIPTOR = _descriptor.Descriptor( @@ -524,8 +524,8 @@ _TOPICDESCRIPTOR = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=646, - serialized_end=1037, + serialized_start=649, + serialized_end=1040, ) _RPC_SUBOPTS.containing_type = _RPC From 3cfeccaf17b1b6eb43ea052bb646b573afa3680a Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Thu, 28 Mar 2019 15:25:33 -0400 Subject: [PATCH 05/19] rewrote get_hello_packet --- libp2p/pubsub/pubsub.py | 74 +++++++++++++---------------------------- 1 file changed, 24 insertions(+), 50 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 9bd072f..17ecdd5 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,5 +1,7 @@ import asyncio +from .pb import rpc_pb2_grpc +from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee from .message import MessageSub from .message import create_message_talk, create_message_sub @@ -7,37 +9,6 @@ from. message import generate_message_id class Pubsub(): - """ - For now, because I'm on a plane and don't have access to the go repo/protobuf stuff, - this is going to be the message format for the two types: subscription and talk - subscription indicates subscribing or unsubscribing from a topic - talk is sending a message on topic(s) - subscription format: - subscription - 'from' - :'topicid' - :'topicid' - ... - Ex. - subscription - msg_sender_peer_id - origin_peer_id - sub:topic1 - sub:topic2 - unsub:fav_topic - talk format: - talk - 'from' - 'origin' - [topic_ids comma-delimited] - 'data' - Ex. - talk - msg_sender_peer_id - origin_peer_id - topic1,topics_are_cool,foo - I like tacos - """ # pylint: disable=too-many-instance-attributes def __init__(self, host, router, my_id): @@ -90,34 +61,37 @@ class Pubsub(): """ Generate subscription message with all topics we are subscribed to """ - subs_map = {} - for topic in self.my_topics: - subs_map[topic] = True - sub_msg = MessageSub( - str(self.host.get_id()),\ - str(self.host.get_id()), subs_map, generate_message_id()\ - ) - return sub_msg.to_str() + packet = rpc_pb2.RPC() + message = rpc_pb2.Message( + from_id=str(self.host.get_id()).encode('utf-8'), + seqno=str(generate_message_id()).encode('utf-8') + ) + packet.publish.extend([message]) + for topic_id in self.my_topics: + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe=True, topicid=topic_id)]) + + return packet.SerializeToString() async def continously_read_stream(self, stream): """ Read from input stream in an infinite loop. Process messages from other nodes, which for now are considered MessageTalk and MessageSub messages. - TODO: Handle RPC messages instead of my Aspyn's own custom message format :param stream: stream to continously read from """ while True: - incoming = (await stream.read()).decode() - msg_comps = incoming.split('\n') - msg_type = msg_comps[0] + incoming = (await stream.read()) + rpc_incoming = rpc_pb2.RPC() + rpc_incoming.ParseFromString(incoming) + print (rpc_incoming.publish) - msg_sender = msg_comps[1] + # msg_type = msg_comps[0] + + msg_sender = rpc_incoming.publish.from_id # msg_origin = msg_comps[2] - msg_id = msg_comps[3] - print("HIT ME1") + msg_id = rpc_incoming.publish.seqno if msg_id not in self.seen_messages: - print("HIT ME") # Do stuff with incoming unseen message should_publish = True if msg_type == "subscription": @@ -161,7 +135,7 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() - await stream.write(hello.encode()) + await stream.write(hello) # Pass stream off to stream reader asyncio.ensure_future(self.continously_read_stream(stream)) @@ -187,7 +161,7 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() - await stream.write(hello.encode()) + await stream.write(hello) # Pass stream off to stream reader asyncio.ensure_future(self.continously_read_stream(stream)) @@ -291,4 +265,4 @@ class Pubsub(): stream = self.peers[peer] # Write message to stream - await stream.write(encoded_msg) + await stream.write(encoded_msg) \ No newline at end of file From a81628f99d605db35ecb8bca6c778970d824874f Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Thu, 28 Mar 2019 15:42:55 -0400 Subject: [PATCH 06/19] update dependencies --- requirements_dev.txt | 2 ++ setup.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/requirements_dev.txt b/requirements_dev.txt index 4bf18f6..e3d8c0e 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -3,3 +3,5 @@ codecov pytest-cov pytest-asyncio pylint +grpcio +grpcio-tools diff --git a/setup.py b/setup.py index abaa1c7..c44a276 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,8 @@ setuptools.setup( "base58", "pymultihash", "multiaddr", + "grpcio", + "grpcio-tools" ], packages=["libp2p"], zip_safe=False, From bf17f424b38bc5704f99996e35123564c7466c4c Mon Sep 17 00:00:00 2001 From: Alex Haynes Date: Fri, 29 Mar 2019 16:23:30 -0400 Subject: [PATCH 07/19] RPC conversion progress --- libp2p/pubsub/floodsub.py | 41 ++++++++++++++----- libp2p/pubsub/pubsub.py | 85 +++++++++++++++++++++++++++------------ 2 files changed, 91 insertions(+), 35 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 68e542c..1cea17e 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,4 +1,6 @@ from .pubsub_router_interface import IPubsubRouter +from .pb import rpc_pb2 +from .message import MessageSub, MessageTalk from .message import create_message_talk class FloodSub(IPubsubRouter): @@ -56,28 +58,49 @@ class FloodSub(IPubsubRouter): """ # Encode message - encoded_msg = message.encode() + # encoded_msg = message.encode() + + if isinstance(message, str): + msg_talk = create_message_talk(message) + message = rpc_pb2.Message( + from_id=str(msg_talk.origin_id).encode('utf-8'), + seqno=str(msg_talk.message_id).encode('utf-8'), + topicIDs=msg_talk.topics, + data=msg_talk.data.encode() + + ) + packet = rpc_pb2.RPC() + print("YEET") + print(type(message)) + packet.publish.extend([message]) + + # Get message sender, origin, and topics - msg_talk = create_message_talk(message) + # msg_talk = create_message_talk(message) msg_sender = str(sender_peer_id) - msg_origin = msg_talk.origin_id - topics = msg_talk.topics + # msg_origin = msg_talk.origin_id + # topics = msg_talk.topics # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message - if msg_sender == msg_origin and msg_sender == str(self.pubsub.host.get_id()): - await self.pubsub.handle_talk(message) + if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): + old_format = MessageTalk(sender_peer_id, + message.from_id, + message.topicIDs, + message.data, + message.seqno) + await self.pubsub.handle_talk(old_format) # Deliver to self and peers - for topic in topics: + for topic in message.topicIDs: if topic in self.pubsub.peer_topics: for peer_id_in_topic in self.pubsub.peer_topics[topic]: # Forward to all known peers in the topic that are not the # message sender and are not the message origin - if peer_id_in_topic not in (msg_sender, msg_origin): + if peer_id_in_topic not in (msg_sender, message.from_id): stream = self.pubsub.peers[peer_id_in_topic] - await stream.write(encoded_msg) + await stream.write(packet.SerializeToString()) else: # Implies publish did not write print("publish did not write") diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 17ecdd5..b1ee363 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -3,7 +3,7 @@ import asyncio from .pb import rpc_pb2_grpc from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee -from .message import MessageSub +from .message import MessageSub, MessageTalk from .message import create_message_talk, create_message_sub from. message import generate_message_id @@ -61,6 +61,7 @@ class Pubsub(): """ Generate subscription message with all topics we are subscribed to """ + print("MAKING HELLO PACKET") packet = rpc_pb2.RPC() message = rpc_pb2.Message( from_id=str(self.host.get_id()).encode('utf-8'), @@ -80,43 +81,74 @@ class Pubsub(): and MessageSub messages. :param stream: stream to continously read from """ + + # TODO check on types here + peer_id = stream.mplex_conn.peer_id + while True: incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() + print("JUST GOT STRING") + print(incoming) rpc_incoming.ParseFromString(incoming) - print (rpc_incoming.publish) + print("JUST GOT") + print (rpc_incoming) + + if hasattr(rpc_incoming, 'publish'): + # deal with "talk messages" + for msg in rpc_incoming.publish: + # TODO check what from_id is in go? is it origin or peer + old_format = MessageTalk(peer_id, + msg.from_id, + msg.topicIDs, + msg.data, + msg.seqno) + self.seen_messages.append(msg.seqno) + await self.handle_talk(old_format) + await self.router.publish(peer_id, msg) + + if hasattr(rpc_incoming, 'subscriptions'): + # deal with "subscription messages" + subs_map = {} + for msg in rpc_incoming.subscriptions: + if msg.subscribe: + subs_map[msg.topic_id] = "sub" + else: + subs_map[msg.topic_id] = "unsub" + old_format = MessageSub(peer_id, peer_id, subs_map, generate_message_id()) + self.handle_subscription(old_format) # msg_type = msg_comps[0] - msg_sender = rpc_incoming.publish.from_id - # msg_origin = msg_comps[2] - msg_id = rpc_incoming.publish.seqno - if msg_id not in self.seen_messages: - # Do stuff with incoming unseen message - should_publish = True - if msg_type == "subscription": - self.handle_subscription(incoming) + # msg_sender = rpc_incoming.publish.from_id + # # msg_origin = msg_comps[2] + # msg_id = rpc_incoming.publish.seqno + # if msg_id not in self.seen_messages: + # # Do stuff with incoming unseen message + # should_publish = True + # if msg_type == "subscription": + # self.handle_subscription(incoming) - # We don't need to relay the subscription to our - # peers because a given node only needs its peers - # to know that it is subscribed to the topic (doesn't - # need everyone to know) - should_publish = False - elif msg_type == "talk": - await self.handle_talk(incoming) + # # We don't need to relay the subscription to our + # # peers because a given node only needs its peers + # # to know that it is subscribed to the topic (doesn't + # # need everyone to know) + # should_publish = False + # elif msg_type == "talk": + # await self.handle_talk(incoming) # Add message id to seen - self.seen_messages.append(msg_id) + # self.seen_messages.append(msg_id) # Publish message using router's publish - if should_publish: - msg = create_message_talk(incoming) + # if should_publish: + # msg = create_message_talk(incoming) - # Adjust raw_msg to that the message sender - # is now our peer_id - msg.from_id = str(self.host.get_id()) + # # Adjust raw_msg to that the message sender + # # is now our peer_id + # msg.from_id = str(self.host.get_id()) - await self.router.publish(msg_sender, msg.to_str()) + # await self.router.publish(msg_sender, msg.to_str()) # Force context switch await asyncio.sleep(0) @@ -135,6 +167,7 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() + print(hello) await stream.write(hello) # Pass stream off to stream reader asyncio.ensure_future(self.continously_read_stream(stream)) @@ -176,7 +209,7 @@ class Pubsub(): defined in the subscription message :param subscription: raw data constituting a subscription message """ - sub_msg = create_message_sub(subscription) + sub_msg = subscription if sub_msg.subs_map: print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id) for topic_id in sub_msg.subs_map: @@ -198,7 +231,7 @@ class Pubsub(): custom message that is published on a given topic(s) :param talk: raw data constituting a talk message """ - msg = create_message_talk(talk) + msg = talk # Check if this message has any topics that we are subscribed to for topic in msg.topics: From aec783b843ccd2cea79bee289953d2cea0d77498 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sat, 30 Mar 2019 17:59:08 -0400 Subject: [PATCH 08/19] reworked subscribe unsubsrcibe --- libp2p/pubsub/pubsub.py | 113 +++++++++++++++------------------------- 1 file changed, 43 insertions(+), 70 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index b1ee363..85abacb 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -61,7 +61,7 @@ class Pubsub(): """ Generate subscription message with all topics we are subscribed to """ - print("MAKING HELLO PACKET") + packet = rpc_pb2.RPC() message = rpc_pb2.Message( from_id=str(self.host.get_id()).encode('utf-8'), @@ -74,7 +74,7 @@ class Pubsub(): return packet.SerializeToString() - async def continously_read_stream(self, stream): + async def continuously_read_stream(self, stream): """ Read from input stream in an infinite loop. Process messages from other nodes, which for now are considered MessageTalk @@ -88,16 +88,12 @@ class Pubsub(): while True: incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() - print("JUST GOT STRING") - print(incoming) rpc_incoming.ParseFromString(incoming) - print("JUST GOT") + print ("CONTINUOUSLY") print (rpc_incoming) - - if hasattr(rpc_incoming, 'publish'): + if rpc_incoming.publish: # deal with "talk messages" for msg in rpc_incoming.publish: - # TODO check what from_id is in go? is it origin or peer old_format = MessageTalk(peer_id, msg.from_id, msg.topicIDs, @@ -107,48 +103,16 @@ class Pubsub(): await self.handle_talk(old_format) await self.router.publish(peer_id, msg) - if hasattr(rpc_incoming, 'subscriptions'): + if rpc_incoming.subscriptions: # deal with "subscription messages" subs_map = {} for msg in rpc_incoming.subscriptions: if msg.subscribe: - subs_map[msg.topic_id] = "sub" + subs_map[msg.topicid] = "sub" else: - subs_map[msg.topic_id] = "unsub" - old_format = MessageSub(peer_id, peer_id, subs_map, generate_message_id()) - self.handle_subscription(old_format) + subs_map[msg.topicid] = "unsub" - # msg_type = msg_comps[0] - - # msg_sender = rpc_incoming.publish.from_id - # # msg_origin = msg_comps[2] - # msg_id = rpc_incoming.publish.seqno - # if msg_id not in self.seen_messages: - # # Do stuff with incoming unseen message - # should_publish = True - # if msg_type == "subscription": - # self.handle_subscription(incoming) - - # # We don't need to relay the subscription to our - # # peers because a given node only needs its peers - # # to know that it is subscribed to the topic (doesn't - # # need everyone to know) - # should_publish = False - # elif msg_type == "talk": - # await self.handle_talk(incoming) - - # Add message id to seen - # self.seen_messages.append(msg_id) - - # Publish message using router's publish - # if should_publish: - # msg = create_message_talk(incoming) - - # # Adjust raw_msg to that the message sender - # # is now our peer_id - # msg.from_id = str(self.host.get_id()) - - # await self.router.publish(msg_sender, msg.to_str()) + self.handle_subscription(rpc_incoming) # Force context switch await asyncio.sleep(0) @@ -167,10 +131,10 @@ class Pubsub(): # Send hello packet hello = self.get_hello_packet() - print(hello) + await stream.write(hello) # Pass stream off to stream reader - asyncio.ensure_future(self.continously_read_stream(stream)) + asyncio.ensure_future(self.continuously_read_stream(stream)) async def handle_peer_queue(self): """ @@ -197,30 +161,30 @@ class Pubsub(): await stream.write(hello) # Pass stream off to stream reader - asyncio.ensure_future(self.continously_read_stream(stream)) + asyncio.ensure_future(self.continuously_read_stream(stream)) # Force context switch await asyncio.sleep(0) - def handle_subscription(self, subscription): + def handle_subscription(self, rpc_message): """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message :param subscription: raw data constituting a subscription message """ - sub_msg = subscription - if sub_msg.subs_map: - print("handle_subscription my_id: " + self.my_id + ", subber: " + sub_msg.origin_id) - for topic_id in sub_msg.subs_map: + for sub_msg in rpc_message.subscriptions: + # Look at each subscription in the msg individually - if sub_msg.subs_map[topic_id]: - if topic_id not in self.peer_topics: + if sub_msg.subscribe: + origin_id = rpc_message.publish[0].from_id + + if sub_msg.topicid not in self.peer_topics: # Create topic list if it did not yet exist - self.peer_topics[topic_id] = [sub_msg.origin_id] - elif sub_msg.origin_id not in self.peer_topics[topic_id]: + self.peer_topics[sub_msg.topicid] = origin_id + elif orgin_id not in self.peer_topics[sub_msg.topicid]: # Add peer to topic - self.peer_topics[topic_id].append(sub_msg.origin_id) + self.peer_topics[sub_msg.topicid].append(origin_id) else: # TODO: Remove peer from topic pass @@ -250,13 +214,18 @@ class Pubsub(): self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message - sub_msg = MessageSub( - str(self.host.get_id()),\ - str(self.host.get_id()), {topic_id: True}, generate_message_id()\ - ) + packet = rpc_pb2.RPC() + packet.publish.extend([rpc_pb2.Message( + from_id=str(self.host.get_id()).encode('utf-8'), + seqno=str(generate_message_id()).encode('utf-8') + )]) + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe = True, + topicid = topic_id.encode('utf-8') + )]) # Send out subscribe message to all peers - await self.message_all_peers(sub_msg.to_str()) + await self.message_all_peers(packet.SerializeToString()) # Tell router we are joining this topic self.router.join(topic_id) @@ -275,27 +244,31 @@ class Pubsub(): del self.my_topics[topic_id] # Create unsubscribe message - unsub_msg = MessageSub(str(self.host.get_id()), str(self.host.get_id()),\ - {topic_id: False}, generate_message_id()) + packet = rpc_pb2.RPC() + packet.publish.extend([rpc_pb2.Message( + from_id=str(self.host.get_id()).encode('utf-8'), + seqno=str(generate_message_id()).encode('utf-8') + )]) + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe = False, + topicid = topic_id.encode('utf-8') + )]) # Send out unsubscribe message to all peers - await self.message_all_peers(unsub_msg.to_str()) + await self.message_all_peers(packet.SerializeToString()) # Tell router we are leaving this topic self.router.leave(topic_id) - async def message_all_peers(self, raw_msg): + async def message_all_peers(self, rpc_msg): """ Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast """ - # Encode message for sending - encoded_msg = raw_msg.encode() - # Broadcast message for peer in self.peers: stream = self.peers[peer] # Write message to stream - await stream.write(encoded_msg) \ No newline at end of file + await stream.write(rpc_msg) From f5af4b9016d791599ff61465b7e699d82774679a Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sat, 30 Mar 2019 18:49:50 -0400 Subject: [PATCH 09/19] remove message.py --- libp2p/pubsub/floodsub.py | 31 +--------- libp2p/pubsub/message.py | 118 -------------------------------------- libp2p/pubsub/pubsub.py | 28 +++++---- 3 files changed, 15 insertions(+), 162 deletions(-) delete mode 100644 libp2p/pubsub/message.py diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 1cea17e..dd4dfc1 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,7 +1,6 @@ from .pubsub_router_interface import IPubsubRouter from .pb import rpc_pb2 -from .message import MessageSub, MessageTalk -from .message import create_message_talk + class FloodSub(IPubsubRouter): @@ -57,40 +56,14 @@ class FloodSub(IPubsubRouter): :param message: message to forward """ - # Encode message - # encoded_msg = message.encode() - - if isinstance(message, str): - msg_talk = create_message_talk(message) - message = rpc_pb2.Message( - from_id=str(msg_talk.origin_id).encode('utf-8'), - seqno=str(msg_talk.message_id).encode('utf-8'), - topicIDs=msg_talk.topics, - data=msg_talk.data.encode() - - ) packet = rpc_pb2.RPC() - print("YEET") - print(type(message)) packet.publish.extend([message]) - - - - # Get message sender, origin, and topics - # msg_talk = create_message_talk(message) msg_sender = str(sender_peer_id) - # msg_origin = msg_talk.origin_id - # topics = msg_talk.topics # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): - old_format = MessageTalk(sender_peer_id, - message.from_id, - message.topicIDs, - message.data, - message.seqno) - await self.pubsub.handle_talk(old_format) + await self.pubsub.handle_talk(sender_peer_id, message) # Deliver to self and peers for topic in message.topicIDs: diff --git a/libp2p/pubsub/message.py b/libp2p/pubsub/message.py deleted file mode 100644 index 2f839dc..0000000 --- a/libp2p/pubsub/message.py +++ /dev/null @@ -1,118 +0,0 @@ -import uuid - - -class MessageTalk(): - - """ - Object to make parsing talk messages easier, where talk messages are - defined as custom messages published to a set of topics - """ - # pylint: disable=too-few-public-methods - def __init__(self, from_id, origin_id, topics, data, message_id): - # pylint: disable=too-many-arguments - self.msg_type = "talk" - self.from_id = from_id - self.origin_id = origin_id - self.topics = topics - self.data = data - self.message_id = message_id - - def to_str(self): - """ - Convert to string - :return: MessageTalk object in string representation - """ - out = self.msg_type + '\n' - out += self.from_id + '\n' - out += self.origin_id + '\n' - out += self.message_id + '\n' - for i in range(len(self.topics)): - out += self.topics[i] - if i < len(self.topics) - 1: - out += ',' - out += '\n' + self.data - return out - - -class MessageSub(): - """ - Object to make parsing subscription messages easier, where subscription - messages are defined as indicating the topics a node wishes to subscribe to - or unsubscribe from - """ - # pylint: disable=too-few-public-methods - def __init__(self, from_id, origin_id, subs_map, message_id): - self.msg_type = "subscription" - self.from_id = from_id - self.origin_id = origin_id - self.subs_map = subs_map - self.message_id = message_id - - def to_str(self): - """ - Convert to string - :return: MessageSub object in string representation - """ - out = self.msg_type + '\n' - out += self.from_id + '\n' - out += self.origin_id + '\n' - out += self.message_id - - if self.subs_map: - out += '\n' - - keys = list(self.subs_map) - - for i, topic in enumerate(keys): - sub = self.subs_map[topic] - if sub: - out += "sub:" - else: - out += "unsub:" - out += topic - if i < len(keys) - 1: - out += '\n' - - return out - -def create_message_talk(msg_talk_as_str): - """ - Create a MessageTalk object from a MessageTalk string representation - :param msg_talk_as_str: a MessageTalk object in its string representation - :return: MessageTalk object - """ - msg_comps = msg_talk_as_str.split('\n') - from_id = msg_comps[1] - origin_id = msg_comps[2] - message_id = msg_comps[3] - topics = msg_comps[4].split(',') - data = msg_comps[5] - return MessageTalk(from_id, origin_id, topics, data, message_id) - -def create_message_sub(msg_sub_as_str): - """ - Create a MessageSub object from a MessageSub string representation - :param msg_talk_as_str: a MessageSub object in its string representation - :return: MessageSub object - """ - msg_comps = msg_sub_as_str.split('\n') - from_id = msg_comps[1] - origin_id = msg_comps[2] - message_id = msg_comps[3] - - subs_map = {} - for i in range(4, len(msg_comps)): - sub_comps = msg_comps[i].split(":") - topic = sub_comps[1] - if sub_comps[0] == "sub": - subs_map[topic] = True - else: - subs_map[topic] = False - return MessageSub(from_id, origin_id, subs_map, message_id) - -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 85abacb..976e6b4 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,11 +1,9 @@ import asyncio +import uuid from .pb import rpc_pb2_grpc from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee -from .message import MessageSub, MessageTalk -from .message import create_message_talk, create_message_sub -from. message import generate_message_id class Pubsub(): @@ -77,8 +75,7 @@ class Pubsub(): async def continuously_read_stream(self, stream): """ Read from input stream in an infinite loop. Process - messages from other nodes, which for now are considered MessageTalk - and MessageSub messages. + messages from other nodes :param stream: stream to continously read from """ @@ -94,13 +91,8 @@ class Pubsub(): if rpc_incoming.publish: # deal with "talk messages" for msg in rpc_incoming.publish: - old_format = MessageTalk(peer_id, - msg.from_id, - msg.topicIDs, - msg.data, - msg.seqno) self.seen_messages.append(msg.seqno) - await self.handle_talk(old_format) + await self.handle_talk(peer_id, msg) await self.router.publish(peer_id, msg) if rpc_incoming.subscriptions: @@ -189,21 +181,20 @@ class Pubsub(): # TODO: Remove peer from topic pass - async def handle_talk(self, talk): + async def handle_talk(self, peer_id, publish_message): """ Handle incoming Talk message from a peer. A Talk message contains some custom message that is published on a given topic(s) :param talk: raw data constituting a talk message """ - msg = talk # Check if this message has any topics that we are subscribed to - for topic in msg.topics: + for topic in publish_message.topicIDs: if topic in self.my_topics: # we are subscribed to a topic this message was sent for, # so add message to the subscription output queue # for each topic - await self.my_topics[topic].put(talk) + await self.my_topics[topic].put(publish_message) async def subscribe(self, topic_id): """ @@ -272,3 +263,10 @@ class Pubsub(): # Write message to stream await stream.write(rpc_msg) + +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) \ No newline at end of file From ec7bc45a5805faa95d2a389541c246b6412661d7 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sat, 30 Mar 2019 19:12:31 -0400 Subject: [PATCH 10/19] remove Message from dummy account --- tests/pubsub/dummy_account_node.py | 57 +++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index f328a6e..de78211 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,12 +1,12 @@ import asyncio +import uuid import multiaddr from libp2p import new_node -from libp2p.pubsub.message import create_message_talk +from libp2p.pubsub.pb import rpc_pb2_grpc +from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.message import MessageTalk -from libp2p.pubsub.message import generate_message_id SUPPORTED_PUBSUB_PROTOCOLS = ["/floodsub/1.0.0"] CRYPTO_TOPIC = "ethereum" @@ -53,16 +53,15 @@ class DummyAccountNode(): Handle all incoming messages on the CRYPTO_TOPIC from peers """ while True: - message_raw = await self.q.get() - message = create_message_talk(message_raw) - contents = message.data - - msg_comps = contents.split(",") - - if msg_comps[0] == "send": - self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) - elif msg_comps[0] == "set": - self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2])) + incoming = await self.q.get() + rpc_incoming = rpc_pb2.RPC() + rpc_incoming.ParseFromString(incoming) + for message in rpc_incoming.publish: + msg_comps = message.split(",") + if msg_comps[0] == "send": + self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) + elif msg_comps[0] == "set": + self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2])) async def setup_crypto_networking(self): """ @@ -82,8 +81,8 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) - msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) - await self.floodsub.publish(my_id, msg.to_str()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + await self.floodsub.publish(my_id, msg.SerializeToString()) async def publish_set_crypto(self, user, amount): """ @@ -93,8 +92,9 @@ class DummyAccountNode(): """ my_id = str(self.libp2p_node.get_id()) msg_contents = "set," + user + "," + str(amount) - msg = MessageTalk(my_id, my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) - await self.floodsub.publish(my_id, msg.to_str()) + packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) + + await self.floodsub.publish(my_id, packet.SerializeToString()) def handle_send_crypto(self, source_user, dest_user, amount): """ @@ -132,3 +132,26 @@ class DummyAccountNode(): else: return -1 +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) + +def generate_RPC_packet(origin_id, topics, msg_content, msg_id): + packet = rpc_pb2.RPC() + packet.publish.extend([rpc_pb2.Message( + from_id=origin_id.encode('utf-8'), + seqno=msg_id.encode('utf-8'), + data=msg_content.encode('utf-8'), + topicIDs=topics.encode('utf-8')) + ]) + + for topic in topics: + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe=True, + topicid = topic.encode('utf-8') + )]) + + return packet From 971dbe1a968f7df704bab20bb4264015a3307731 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sat, 30 Mar 2019 19:30:58 -0400 Subject: [PATCH 11/19] fix encoding issue --- libp2p/pubsub/floodsub.py | 3 +++ tests/pubsub/dummy_account_node.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index dd4dfc1..8b9ba1c 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -57,6 +57,9 @@ class FloodSub(IPubsubRouter): """ packet = rpc_pb2.RPC() + print ("IN FLOOODSUB PUBLISH") + print (message) + print ("++++++++++++++++") packet.publish.extend([message]) msg_sender = str(sender_peer_id) diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index de78211..930dc57 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -141,17 +141,21 @@ def generate_message_id(): def generate_RPC_packet(origin_id, topics, msg_content, msg_id): packet = rpc_pb2.RPC() - packet.publish.extend([rpc_pb2.Message( + message = rpc_pb2.Message( from_id=origin_id.encode('utf-8'), seqno=msg_id.encode('utf-8'), - data=msg_content.encode('utf-8'), - topicIDs=topics.encode('utf-8')) - ]) + data=msg_content.encode('utf-8') + ) for topic in topics: + message.topicIDs.extend([topic.encode('utf-8')]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe=True, topicid = topic.encode('utf-8') )]) + packet.publish.extend([message]) + print ("HEYHO") + print (packet) + return packet From 89a19a9213aff1bae5d7dfb8408e4eae8f8aa30f Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sun, 31 Mar 2019 22:16:28 -0400 Subject: [PATCH 12/19] reworked floodsub logic --- libp2p/pubsub/floodsub.py | 46 +++++++++++++--------- libp2p/pubsub/pubsub.py | 83 ++++++++++++++++++++++----------------- 2 files changed, 74 insertions(+), 55 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 8b9ba1c..d4736d0 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -41,7 +41,7 @@ class FloodSub(IPubsubRouter): :param rpc: rpc message """ - async def publish(self, sender_peer_id, message): + async def publish(self, sender_peer_id, rpc_message): """ Invoked to forward a new message that has been validated. This is where the "flooding" part of floodsub happens @@ -53,33 +53,41 @@ class FloodSub(IPubsubRouter): It also never forwards a message back to the source or the peer that forwarded the message. :param sender_peer_id: peer_id of message sender - :param message: message to forward + :param rpc_message: pubsub message in RPC string format """ packet = rpc_pb2.RPC() + packet.ParseFromString(rpc_message) print ("IN FLOOODSUB PUBLISH") - print (message) + print (packet) print ("++++++++++++++++") - packet.publish.extend([message]) msg_sender = str(sender_peer_id) - # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message - if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): - await self.pubsub.handle_talk(sender_peer_id, message) + for message in packet.publish: + if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): + await self.pubsub.handle_talk(sender_peer_id, message) - # Deliver to self and peers - for topic in message.topicIDs: - if topic in self.pubsub.peer_topics: - for peer_id_in_topic in self.pubsub.peer_topics[topic]: - # Forward to all known peers in the topic that are not the - # message sender and are not the message origin - if peer_id_in_topic not in (msg_sender, message.from_id): - stream = self.pubsub.peers[peer_id_in_topic] - await stream.write(packet.SerializeToString()) - else: - # Implies publish did not write - print("publish did not write") + + print ("OHOHOHOH") + print (self.pubsub.peer_topics) + print ("UUUJUJUJ") + print (self.pubsub.peers) + print ("********") + # Deliver to self and peers + for topic in message.topicIDs: + if topic in self.pubsub.peer_topics: + for peer_id_in_topic in self.pubsub.peer_topics[topic]: + # Forward to all known peers in the topic that are not the + # message sender and are not the message origin + print ("PEERID") + print (peer_id_in_topic) + if peer_id_in_topic not in (msg_sender, message.from_id): + stream = self.pubsub.peers[peer_id_in_topic] + await stream.write(packet.SerializeToString()) + else: + # Implies publish did not write + print("publish did not write") def join(self, topic): """ diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 976e6b4..7705dfb 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -80,31 +80,43 @@ class Pubsub(): """ # TODO check on types here - peer_id = stream.mplex_conn.peer_id + peer_id = str(stream.mplex_conn.peer_id) while True: incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) - print ("CONTINUOUSLY") + + print ("IN PUBSUB CONTINUOUSLY READ") print (rpc_incoming) + print ("###########################") + + should_publish = True + if rpc_incoming.publish: - # deal with "talk messages" - for msg in rpc_incoming.publish: - self.seen_messages.append(msg.seqno) - await self.handle_talk(peer_id, msg) - await self.router.publish(peer_id, msg) + # deal with RPC.publish + for message in rpc_incoming.publish: + self.seen_messages.append(message.seqno) + await self.handle_talk(peer_id, message) + if rpc_incoming.subscriptions: - # deal with "subscription messages" - subs_map = {} - for msg in rpc_incoming.subscriptions: - if msg.subscribe: - subs_map[msg.topicid] = "sub" - else: - subs_map[msg.topicid] = "unsub" + # deal with RPC.subscriptions + # We don't need to relay the subscription to our + # peers because a given node only needs its peers + # to know that it is subscribed to the topic (doesn't + # need everyone to know) + should_publish = False - self.handle_subscription(rpc_incoming) + # TODO check that peer_id is the same as origin_id + from_id = str(rpc_incoming.publish[0].from_id.decode('utf-8')) + for message in rpc_incoming.subscriptions: + if message.subscribe: + self.handle_subscription(from_id, message) + + if should_publish: + # relay message to peers with router + await self.router.publish(peer_id, incoming) # Force context switch await asyncio.sleep(0) @@ -136,7 +148,10 @@ class Pubsub(): pubsub protocols we support """ while True: + print ("PUBSUB HANDLE PEER QUEUE") peer_id = await self.peer_queue.get() + print (peer_id) + print ("++++++++++++++++++++++++") # Open a stream to peer on existing connection # (we know connection exists since that's the only way @@ -158,34 +173,30 @@ class Pubsub(): # Force context switch await asyncio.sleep(0) - def handle_subscription(self, rpc_message): + def handle_subscription(self, peer_id, sub_message): """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as defined in the subscription message - :param subscription: raw data constituting a subscription message + :param origin_id: id of the peer who subscribe to the message + :param sub_message: RPC.SubOpts """ - for sub_msg in rpc_message.subscriptions: - - # Look at each subscription in the msg individually - if sub_msg.subscribe: - origin_id = rpc_message.publish[0].from_id - - if sub_msg.topicid not in self.peer_topics: - # Create topic list if it did not yet exist - self.peer_topics[sub_msg.topicid] = origin_id - elif orgin_id not in self.peer_topics[sub_msg.topicid]: - # Add peer to topic - self.peer_topics[sub_msg.topicid].append(origin_id) - else: - # TODO: Remove peer from topic - pass + # TODO verify logic here + if sub_message.subscribe: + if sub_message.topicid not in self.peer_topics: + self.peer_topics[sub_message.topicid] = [peer_id] + elif peer_id not in self.peer_topics[sub_message.topicid]: + # Add peer to topic + self.peer_topics[sub_message.topicid].append(peer_id) + else: + # TODO: Remove peer from topic + pass async def handle_talk(self, peer_id, publish_message): """ - Handle incoming Talk message from a peer. A Talk message contains some - custom message that is published on a given topic(s) - :param talk: raw data constituting a talk message + Put incoming message from a peer onto my blocking queue + :param peer_id: peer id whom forwarded this message + :param talk: RPC.Message format """ # Check if this message has any topics that we are subscribed to @@ -269,4 +280,4 @@ def generate_message_id(): Generate a unique message id :return: messgae id """ - return str(uuid.uuid1()) \ No newline at end of file + return str(uuid.uuid1()) From de6bc011f01ce506d40c5a22e1c1e2008ad93670 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Sun, 31 Mar 2019 22:16:49 -0400 Subject: [PATCH 13/19] update dummy account node --- tests/pubsub/dummy_account_node.py | 32 +++++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 930dc57..0773533 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -37,6 +37,7 @@ class DummyAccountNode(): We use create as this serves as a factory function and allows us to use async await, unlike the init function """ + print ("**DUMMY** CREATE ACCOUNT NODE") self = DummyAccountNode() libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) @@ -54,14 +55,17 @@ class DummyAccountNode(): """ while True: incoming = await self.q.get() - rpc_incoming = rpc_pb2.RPC() - rpc_incoming.ParseFromString(incoming) - for message in rpc_incoming.publish: - msg_comps = message.split(",") - if msg_comps[0] == "send": - self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) - elif msg_comps[0] == "set": - self.handle_set_crypto_for_user(msg_comps[1], int(msg_comps[2])) + print ("**DUMMY** HANDLE INCOMING") + print (incoming) + print ("========================") + + msg_comps = incoming.data.decode('utf-8').split(",") + print (msg_comps) + print ("--------") + if msg_comps[0] == "send": + self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) + elif msg_comps[0] == "set": + self.handle_set_crypto(msg_comps[1], int(msg_comps[2])) async def setup_crypto_networking(self): """ @@ -103,6 +107,7 @@ class DummyAccountNode(): :param dest_user: user to send crypto to :param amount: amount of crypto to send """ + print ("**DUMMY** IN HANDLE SEND CRYPTO") if source_user in self.balances: self.balances[source_user] -= amount else: @@ -113,12 +118,15 @@ class DummyAccountNode(): else: self.balances[dest_user] = amount - def handle_set_crypto_for_user(self, dest_user, amount): + def handle_set_crypto(self, dest_user, amount): """ Handle incoming set_crypto message :param dest_user: user to set crypto for :param amount: amount of crypto """ + print ("**DUMMY** IN HANDLE SET CRYPTO") + print (type (dest_user)) + print (amount) self.balances[dest_user] = amount def get_balance(self, user): @@ -127,6 +135,9 @@ class DummyAccountNode(): :param user: user to get balance for :return: balance of user """ + print ("GET BALACNCE") + print (user) + print (self.balances) if user in self.balances: return self.balances[user] else: @@ -155,7 +166,4 @@ def generate_RPC_packet(origin_id, topics, msg_content, msg_id): )]) packet.publish.extend([message]) - print ("HEYHO") - print (packet) - return packet From 2e5e7e3c10da71178506ed8445fcfe06d387c01d Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Mon, 1 Apr 2019 15:04:20 -0400 Subject: [PATCH 14/19] remove message from test dummy --- tests/pubsub/dummy_account_node.py | 2 +- tests/pubsub/test_dummyaccount_demo.py | 148 ++++++++++++------------- 2 files changed, 74 insertions(+), 76 deletions(-) diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 0773533..eaca614 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -125,7 +125,7 @@ class DummyAccountNode(): :param amount: amount of crypto """ print ("**DUMMY** IN HANDLE SET CRYPTO") - print (type (dest_user)) + print (dest_user) print (amount) self.balances[dest_user] = amount diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index a071fa6..9b3a149 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -8,8 +8,6 @@ from libp2p import new_node from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.message import MessageTalk -from libp2p.pubsub.message import create_message_talk from dummy_account_node import DummyAccountNode # pylint: disable=too-many-locals @@ -66,7 +64,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): await action_func(dummy_nodes) # Allow time for action function to be performed (i.e. messages to propogate) - await asyncio.sleep(0.25) + await asyncio.sleep(1) # Perform assertion function for dummy_node in dummy_nodes: @@ -88,102 +86,102 @@ async def test_simple_two_nodes(): await perform_test(num_nodes, adj_map, action_func, assertion_func) -@pytest.mark.asyncio -async def test_simple_three_nodes_line_topography(): - num_nodes = 3 - adj_map = {0: [1], 1: [2]} +# @pytest.mark.asyncio +# async def test_simple_three_nodes_line_topography(): +# num_nodes = 3 +# adj_map = {0: [1], 1: [2]} - async def action_func(dummy_nodes): - await dummy_nodes[0].publish_set_crypto("aspyn", 10) +# async def action_func(dummy_nodes): +# await dummy_nodes[0].publish_set_crypto("aspyn", 10) - def assertion_func(dummy_node): - assert dummy_node.get_balance("aspyn") == 10 +# def assertion_func(dummy_node): +# assert dummy_node.get_balance("aspyn") == 10 - await perform_test(num_nodes, adj_map, action_func, assertion_func) +# await perform_test(num_nodes, adj_map, action_func, assertion_func) -@pytest.mark.asyncio -async def test_simple_three_nodes_triangle_topography(): - num_nodes = 3 - adj_map = {0: [1, 2], 1: [2]} +# @pytest.mark.asyncio +# async def test_simple_three_nodes_triangle_topography(): +# num_nodes = 3 +# adj_map = {0: [1, 2], 1: [2]} - async def action_func(dummy_nodes): - await dummy_nodes[0].publish_set_crypto("aspyn", 20) +# async def action_func(dummy_nodes): +# await dummy_nodes[0].publish_set_crypto("aspyn", 20) - def assertion_func(dummy_node): - assert dummy_node.get_balance("aspyn") == 20 +# def assertion_func(dummy_node): +# assert dummy_node.get_balance("aspyn") == 20 - await perform_test(num_nodes, adj_map, action_func, assertion_func) +# await perform_test(num_nodes, adj_map, action_func, assertion_func) -@pytest.mark.asyncio -async def test_simple_seven_nodes_tree_topography(): - num_nodes = 7 - adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} +# @pytest.mark.asyncio +# async def test_simple_seven_nodes_tree_topography(): +# num_nodes = 7 +# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} - async def action_func(dummy_nodes): - await dummy_nodes[0].publish_set_crypto("aspyn", 20) +# async def action_func(dummy_nodes): +# await dummy_nodes[0].publish_set_crypto("aspyn", 20) - def assertion_func(dummy_node): - assert dummy_node.get_balance("aspyn") == 20 +# def assertion_func(dummy_node): +# assert dummy_node.get_balance("aspyn") == 20 - await perform_test(num_nodes, adj_map, action_func, assertion_func) +# await perform_test(num_nodes, adj_map, action_func, assertion_func) -@pytest.mark.asyncio -async def test_set_then_send_from_root_seven_nodes_tree_topography(): - num_nodes = 7 - adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} +# @pytest.mark.asyncio +# async def test_set_then_send_from_root_seven_nodes_tree_topography(): +# num_nodes = 7 +# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} - async def action_func(dummy_nodes): - await dummy_nodes[0].publish_set_crypto("aspyn", 20) - await asyncio.sleep(0.25) - await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5) +# async def action_func(dummy_nodes): +# await dummy_nodes[0].publish_set_crypto("aspyn", 20) +# await asyncio.sleep(0.25) +# await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5) - def assertion_func(dummy_node): - assert dummy_node.get_balance("aspyn") == 15 - assert dummy_node.get_balance("alex") == 5 +# def assertion_func(dummy_node): +# assert dummy_node.get_balance("aspyn") == 15 +# assert dummy_node.get_balance("alex") == 5 - await perform_test(num_nodes, adj_map, action_func, assertion_func) +# await perform_test(num_nodes, adj_map, action_func, assertion_func) -@pytest.mark.asyncio -async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography(): - num_nodes = 7 - adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} +# @pytest.mark.asyncio +# async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography(): +# num_nodes = 7 +# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} - async def action_func(dummy_nodes): - await dummy_nodes[6].publish_set_crypto("aspyn", 20) - await asyncio.sleep(0.25) - await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5) +# async def action_func(dummy_nodes): +# await dummy_nodes[6].publish_set_crypto("aspyn", 20) +# await asyncio.sleep(0.25) +# await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5) - def assertion_func(dummy_node): - assert dummy_node.get_balance("aspyn") == 15 - assert dummy_node.get_balance("alex") == 5 +# def assertion_func(dummy_node): +# assert dummy_node.get_balance("aspyn") == 15 +# assert dummy_node.get_balance("alex") == 5 - await perform_test(num_nodes, adj_map, action_func, assertion_func) +# await perform_test(num_nodes, adj_map, action_func, assertion_func) -@pytest.mark.asyncio -async def test_simple_five_nodes_ring_topography(): - num_nodes = 5 - adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} +# @pytest.mark.asyncio +# async def test_simple_five_nodes_ring_topography(): +# num_nodes = 5 +# adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} - async def action_func(dummy_nodes): - await dummy_nodes[0].publish_set_crypto("aspyn", 20) +# async def action_func(dummy_nodes): +# await dummy_nodes[0].publish_set_crypto("aspyn", 20) - def assertion_func(dummy_node): - assert dummy_node.get_balance("aspyn") == 20 +# def assertion_func(dummy_node): +# assert dummy_node.get_balance("aspyn") == 20 - await perform_test(num_nodes, adj_map, action_func, assertion_func) +# await perform_test(num_nodes, adj_map, action_func, assertion_func) -@pytest.mark.asyncio -async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): - num_nodes = 5 - adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} +# @pytest.mark.asyncio +# async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): +# num_nodes = 5 +# adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} - async def action_func(dummy_nodes): - await dummy_nodes[0].publish_set_crypto("alex", 20) - await asyncio.sleep(0.25) - await dummy_nodes[3].publish_send_crypto("alex", "rob", 12) +# async def action_func(dummy_nodes): +# await dummy_nodes[0].publish_set_crypto("alex", 20) +# await asyncio.sleep(0.25) +# await dummy_nodes[3].publish_send_crypto("alex", "rob", 12) - def assertion_func(dummy_node): - assert dummy_node.get_balance("alex") == 8 - assert dummy_node.get_balance("rob") == 12 +# def assertion_func(dummy_node): +# assert dummy_node.get_balance("alex") == 8 +# assert dummy_node.get_balance("rob") == 12 - await perform_test(num_nodes, adj_map, action_func, assertion_func) +# await perform_test(num_nodes, adj_map, action_func, assertion_func) From 6eb070b78e53c4bcdab6d82bf073bbba86523517 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Mon, 1 Apr 2019 16:23:20 -0400 Subject: [PATCH 15/19] fix all tests --- libp2p/pubsub/floodsub.py | 18 ++- libp2p/pubsub/pubsub.py | 62 ++++++----- tests/pubsub/dummy_account_node.py | 4 +- tests/pubsub/test_dummyaccount_demo.py | 144 ++++++++++++------------ tests/pubsub/test_floodsub.py | 147 ++++++++++++++++--------- 5 files changed, 221 insertions(+), 154 deletions(-) diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index d4736d0..305607e 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -65,7 +65,16 @@ class FloodSub(IPubsubRouter): # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message for message in packet.publish: - if msg_sender == message.from_id and msg_sender == str(self.pubsub.host.get_id()): + decoded_from_id = message.from_id.decode('utf-8') + + print ("MESSAGE SENDER") + print (msg_sender) + print ("FROM ID") + print (message.from_id) + print (str(self.pubsub.host.get_id())) + + + if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): await self.pubsub.handle_talk(sender_peer_id, message) @@ -82,9 +91,12 @@ class FloodSub(IPubsubRouter): # message sender and are not the message origin print ("PEERID") print (peer_id_in_topic) - if peer_id_in_topic not in (msg_sender, message.from_id): + if peer_id_in_topic not in (msg_sender, decoded_from_id): stream = self.pubsub.peers[peer_id_in_topic] - await stream.write(packet.SerializeToString()) + # create new packet with just publish message + new_packet = rpc_pb2.RPC() + new_packet.publish.extend([message]) + await stream.write(new_packet.SerializeToString()) else: # Implies publish did not write print("publish did not write") diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 7705dfb..c7bfe06 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -58,17 +58,19 @@ class Pubsub(): def get_hello_packet(self): """ Generate subscription message with all topics we are subscribed to + only send hello packet if we have subscribed topics """ - packet = rpc_pb2.RPC() - message = rpc_pb2.Message( - from_id=str(self.host.get_id()).encode('utf-8'), - seqno=str(generate_message_id()).encode('utf-8') - ) - packet.publish.extend([message]) - for topic_id in self.my_topics: - packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe=True, topicid=topic_id)]) + if self.my_topics: + for topic_id in self.my_topics: + packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + subscribe=True, topicid=topic_id)]) + + # message = rpc_pb2.Message( + # from_id=str(self.host.get_id()).encode('utf-8'), + # seqno=str(generate_message_id()).encode('utf-8') + # ) + # packet.publish.extend([message]) return packet.SerializeToString() @@ -80,9 +82,11 @@ class Pubsub(): """ # TODO check on types here + print ("++++++++++ASPYN+++++++++++++++++") peer_id = str(stream.mplex_conn.peer_id) while True: + print ("HIT ME") incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) @@ -91,13 +95,15 @@ class Pubsub(): print (rpc_incoming) print ("###########################") - should_publish = True + should_publish = False if rpc_incoming.publish: # deal with RPC.publish for message in rpc_incoming.publish: - self.seen_messages.append(message.seqno) - await self.handle_talk(peer_id, message) + if message.seqno not in self.seen_messages: + should_publish = True + self.seen_messages.append(message.seqno) + await self.handle_talk(peer_id, message) if rpc_incoming.subscriptions: @@ -106,13 +112,9 @@ class Pubsub(): # peers because a given node only needs its peers # to know that it is subscribed to the topic (doesn't # need everyone to know) - should_publish = False - - # TODO check that peer_id is the same as origin_id - from_id = str(rpc_incoming.publish[0].from_id.decode('utf-8')) for message in rpc_incoming.subscriptions: if message.subscribe: - self.handle_subscription(from_id, message) + self.handle_subscription(peer_id, message) if should_publish: # relay message to peers with router @@ -182,6 +184,8 @@ class Pubsub(): :param sub_message: RPC.SubOpts """ # TODO verify logic here + + if sub_message.subscribe: if sub_message.topicid not in self.peer_topics: self.peer_topics[sub_message.topicid] = [peer_id] @@ -213,19 +217,23 @@ class Pubsub(): :param topic_id: topic_id to subscribe to """ # Map topic_id to blocking queue + + print ("**PUBSUB** in SUBSCRIBE") self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message packet = rpc_pb2.RPC() - packet.publish.extend([rpc_pb2.Message( - from_id=str(self.host.get_id()).encode('utf-8'), - seqno=str(generate_message_id()).encode('utf-8') - )]) + # packet.publish.extend([rpc_pb2.Message( + # from_id=str(self.host.get_id()).encode('utf-8'), + # seqno=str(generate_message_id()).encode('utf-8') + # )]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe = True, topicid = topic_id.encode('utf-8') )]) - + print (packet) + print ("**PUBSUB** PEEERS") + print (self.peers) # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -247,10 +255,10 @@ class Pubsub(): # Create unsubscribe message packet = rpc_pb2.RPC() - packet.publish.extend([rpc_pb2.Message( - from_id=str(self.host.get_id()).encode('utf-8'), - seqno=str(generate_message_id()).encode('utf-8') - )]) + # packet.publish.extend([rpc_pb2.Message( + # from_id=str(self.host.get_id()).encode('utf-8'), + # seqno=str(generate_message_id()).encode('utf-8') + # )]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe = False, topicid = topic_id.encode('utf-8') @@ -267,6 +275,8 @@ class Pubsub(): Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast """ + print ("**PUBSU** IN MESSAGE ALL PEERS") + print (rpc_msg) # Broadcast message for peer in self.peers: diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index eaca614..1951fc6 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -86,7 +86,7 @@ class DummyAccountNode(): my_id = str(self.libp2p_node.get_id()) msg_contents = "send," + source_user + "," + dest_user + "," + str(amount) packet = generate_RPC_packet(my_id, [CRYPTO_TOPIC], msg_contents, generate_message_id()) - await self.floodsub.publish(my_id, msg.SerializeToString()) + await self.floodsub.publish(my_id, packet.SerializeToString()) async def publish_set_crypto(self, user, amount): """ @@ -128,6 +128,8 @@ class DummyAccountNode(): print (dest_user) print (amount) self.balances[dest_user] = amount + print (self.balances) + print ("^^ balance") def get_balance(self, user): """ diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index 9b3a149..1f08c8d 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -86,102 +86,102 @@ async def test_simple_two_nodes(): await perform_test(num_nodes, adj_map, action_func, assertion_func) -# @pytest.mark.asyncio -# async def test_simple_three_nodes_line_topography(): -# num_nodes = 3 -# adj_map = {0: [1], 1: [2]} +@pytest.mark.asyncio +async def test_simple_three_nodes_line_topography(): + num_nodes = 3 + adj_map = {0: [1], 1: [2]} -# async def action_func(dummy_nodes): -# await dummy_nodes[0].publish_set_crypto("aspyn", 10) + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 10) -# def assertion_func(dummy_node): -# assert dummy_node.get_balance("aspyn") == 10 + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 10 -# await perform_test(num_nodes, adj_map, action_func, assertion_func) + await perform_test(num_nodes, adj_map, action_func, assertion_func) -# @pytest.mark.asyncio -# async def test_simple_three_nodes_triangle_topography(): -# num_nodes = 3 -# adj_map = {0: [1, 2], 1: [2]} +@pytest.mark.asyncio +async def test_simple_three_nodes_triangle_topography(): + num_nodes = 3 + adj_map = {0: [1, 2], 1: [2]} -# async def action_func(dummy_nodes): -# await dummy_nodes[0].publish_set_crypto("aspyn", 20) + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 20) -# def assertion_func(dummy_node): -# assert dummy_node.get_balance("aspyn") == 20 + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 20 -# await perform_test(num_nodes, adj_map, action_func, assertion_func) + await perform_test(num_nodes, adj_map, action_func, assertion_func) -# @pytest.mark.asyncio -# async def test_simple_seven_nodes_tree_topography(): -# num_nodes = 7 -# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} +@pytest.mark.asyncio +async def test_simple_seven_nodes_tree_topography(): + num_nodes = 7 + adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} -# async def action_func(dummy_nodes): -# await dummy_nodes[0].publish_set_crypto("aspyn", 20) + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 20) -# def assertion_func(dummy_node): -# assert dummy_node.get_balance("aspyn") == 20 + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 20 -# await perform_test(num_nodes, adj_map, action_func, assertion_func) + await perform_test(num_nodes, adj_map, action_func, assertion_func) -# @pytest.mark.asyncio -# async def test_set_then_send_from_root_seven_nodes_tree_topography(): -# num_nodes = 7 -# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} +@pytest.mark.asyncio +async def test_set_then_send_from_root_seven_nodes_tree_topography(): + num_nodes = 7 + adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} -# async def action_func(dummy_nodes): -# await dummy_nodes[0].publish_set_crypto("aspyn", 20) -# await asyncio.sleep(0.25) -# await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5) + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 20) + await asyncio.sleep(0.25) + await dummy_nodes[0].publish_send_crypto("aspyn", "alex", 5) -# def assertion_func(dummy_node): -# assert dummy_node.get_balance("aspyn") == 15 -# assert dummy_node.get_balance("alex") == 5 + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 15 + assert dummy_node.get_balance("alex") == 5 -# await perform_test(num_nodes, adj_map, action_func, assertion_func) + await perform_test(num_nodes, adj_map, action_func, assertion_func) -# @pytest.mark.asyncio -# async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography(): -# num_nodes = 7 -# adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} +@pytest.mark.asyncio +async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography(): + num_nodes = 7 + adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]} -# async def action_func(dummy_nodes): -# await dummy_nodes[6].publish_set_crypto("aspyn", 20) -# await asyncio.sleep(0.25) -# await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5) + async def action_func(dummy_nodes): + await dummy_nodes[6].publish_set_crypto("aspyn", 20) + await asyncio.sleep(0.25) + await dummy_nodes[4].publish_send_crypto("aspyn", "alex", 5) -# def assertion_func(dummy_node): -# assert dummy_node.get_balance("aspyn") == 15 -# assert dummy_node.get_balance("alex") == 5 + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 15 + assert dummy_node.get_balance("alex") == 5 -# await perform_test(num_nodes, adj_map, action_func, assertion_func) + await perform_test(num_nodes, adj_map, action_func, assertion_func) -# @pytest.mark.asyncio -# async def test_simple_five_nodes_ring_topography(): -# num_nodes = 5 -# adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} +@pytest.mark.asyncio +async def test_simple_five_nodes_ring_topography(): + num_nodes = 5 + adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} -# async def action_func(dummy_nodes): -# await dummy_nodes[0].publish_set_crypto("aspyn", 20) + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("aspyn", 20) -# def assertion_func(dummy_node): -# assert dummy_node.get_balance("aspyn") == 20 + def assertion_func(dummy_node): + assert dummy_node.get_balance("aspyn") == 20 -# await perform_test(num_nodes, adj_map, action_func, assertion_func) + await perform_test(num_nodes, adj_map, action_func, assertion_func) -# @pytest.mark.asyncio -# async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): -# num_nodes = 5 -# adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} +@pytest.mark.asyncio +async def test_set_then_send_from_diff_nodes_five_nodes_ring_topography(): + num_nodes = 5 + adj_map = {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]} -# async def action_func(dummy_nodes): -# await dummy_nodes[0].publish_set_crypto("alex", 20) -# await asyncio.sleep(0.25) -# await dummy_nodes[3].publish_send_crypto("alex", "rob", 12) + async def action_func(dummy_nodes): + await dummy_nodes[0].publish_set_crypto("alex", 20) + await asyncio.sleep(0.25) + await dummy_nodes[3].publish_send_crypto("alex", "rob", 12) -# def assertion_func(dummy_node): -# assert dummy_node.get_balance("alex") == 8 -# assert dummy_node.get_balance("rob") == 12 + def assertion_func(dummy_node): + assert dummy_node.get_balance("alex") == 8 + assert dummy_node.get_balance("rob") == 12 -# await perform_test(num_nodes, adj_map, action_func, assertion_func) + await perform_test(num_nodes, adj_map, action_func, assertion_func) diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index f4a5826..c4c2681 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -1,15 +1,14 @@ import asyncio +import uuid import multiaddr import pytest from tests.utils import cleanup from libp2p import new_node from libp2p.peer.peerinfo import info_from_p2p_addr +from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub -from libp2p.pubsub.message import MessageTalk -from libp2p.pubsub.message import create_message_talk -from libp2p.pubsub.message import generate_message_id # pylint: disable=too-many-locals @@ -22,7 +21,7 @@ async def connect(node1, node2): await node1.connect(info) @pytest.mark.asyncio -async def test_simple_two_nodes(): +async def test_simple_two_nodes_RPC(): node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) @@ -45,71 +44,77 @@ async def test_simple_two_nodes(): node_a_id = str(node_a.get_id()) - msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) - - await floodsub_a.publish(node_a.get_id(), msg.to_str()) - + msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id()) + await floodsub_a.publish(node_a_id, msg.SerializeToString()) + print ("MESSAGE B") + print (msg.SerializeToString()) + print ("=========") await asyncio.sleep(0.25) res_b = await qb.get() + print ("RES B") + print (res_b) + print ("-----") # Check that the msg received by node_b is the same # as the message sent by node_a - assert res_b == msg.to_str() + assert res_b.SerializeToString() == msg.publish[0].SerializeToString() + + # Success, terminate pending tasks. await cleanup() -@pytest.mark.asyncio -async def test_simple_three_nodes(): - # Want to pass message from A -> B -> C - node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) +# @pytest.mark.asyncio +# async def test_simple_three_nodes(): +# # Want to pass message from A -> B -> C +# node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) +# node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) +# node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) +# await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) +# await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) +# await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - supported_protocols = ["/floodsub/1.0.0"] +# supported_protocols = ["/floodsub/1.0.0"] - floodsub_a = FloodSub(supported_protocols) - pubsub_a = Pubsub(node_a, floodsub_a, "a") - floodsub_b = FloodSub(supported_protocols) - pubsub_b = Pubsub(node_b, floodsub_b, "b") - floodsub_c = FloodSub(supported_protocols) - pubsub_c = Pubsub(node_c, floodsub_c, "c") +# floodsub_a = FloodSub(supported_protocols) +# pubsub_a = Pubsub(node_a, floodsub_a, "a") +# floodsub_b = FloodSub(supported_protocols) +# pubsub_b = Pubsub(node_b, floodsub_b, "b") +# floodsub_c = FloodSub(supported_protocols) +# pubsub_c = Pubsub(node_c, floodsub_c, "c") - await connect(node_a, node_b) - await connect(node_b, node_c) +# await connect(node_a, node_b) +# await connect(node_b, node_c) - await asyncio.sleep(0.25) - qb = await pubsub_b.subscribe("my_topic") - qc = await pubsub_c.subscribe("my_topic") - await asyncio.sleep(0.25) +# await asyncio.sleep(0.25) +# qb = await pubsub_b.subscribe("my_topic") +# qc = await pubsub_c.subscribe("my_topic") +# await asyncio.sleep(0.25) - node_a_id = str(node_a.get_id()) +# node_a_id = str(node_a.get_id()) - msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) +# msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) - await floodsub_a.publish(node_a.get_id(), msg.to_str()) +# await floodsub_a.publish(node_a.get_id(), msg.to_str()) - await asyncio.sleep(0.25) - res_b = await qb.get() - res_c = await qc.get() +# await asyncio.sleep(0.25) +# res_b = await qb.get() +# res_c = await qc.get() - # Check that the msg received by node_b is the same - # as the message sent by node_a - assert res_b == msg.to_str() +# # Check that the msg received by node_b is the same +# # as the message sent by node_a +# assert res_b == msg.to_str() - # res_c should match original msg but with b as sender - node_b_id = str(node_b.get_id()) - msg.from_id = node_b_id +# # res_c should match original msg but with b as sender +# node_b_id = str(node_b.get_id()) +# msg.from_id = node_b_id - assert res_c == msg.to_str() +# assert res_c == msg.to_str() - # Success, terminate pending tasks. - await cleanup() +# # Success, terminate pending tasks. +# await cleanup() async def perform_test_from_obj(obj): """ @@ -237,11 +242,14 @@ async def perform_test_from_obj(obj): actual_node_id = str(node_map[node_id].get_id()) # Create correctly formatted message - msg_talk = MessageTalk(actual_node_id, actual_node_id, topics, data, generate_message_id()) + msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id()) + print ("**TEST FLOODSUB** MESSAGE TALK") + print (msg_talk) # Publish message # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) - tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()))) + tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\ + actual_node_id, msg_talk.SerializeToString()))) # For each topic in topics, add topic, msg_talk tuple to ordered test list # TODO: Update message sender to be correct message sender before @@ -261,12 +269,20 @@ async def perform_test_from_obj(obj): for node_id in topic_map[topic]: # Get message from subscription queue msg_on_node_str = await queues_map[node_id][topic].get() - msg_on_node = create_message_talk(msg_on_node_str) + + print ("MESSAGE ON NODE STR") + print (msg_on_node_str) + + print ("ACTUAL MESSSSAGE") + print (actual_msg) + + assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString() + # msg_on_node = create_message_talk(msg_on_node_str) # Perform checks - assert actual_msg.origin_id == msg_on_node.origin_id - assert actual_msg.topics == msg_on_node.topics - assert actual_msg.data == msg_on_node.data + # assert actual_msg.origin_id == msg_on_node.origin_id + # assert actual_msg.topics == msg_on_node.topics + # assert actual_msg.data == msg_on_node.data # Success, terminate pending tasks. await cleanup() @@ -484,3 +500,30 @@ async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): ] } await perform_test_from_obj(test_obj) + +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) + +def generate_RPC_packet(origin_id, topics, msg_content, msg_id): + packet = rpc_pb2.RPC() + message = rpc_pb2.Message( + from_id=origin_id.encode('utf-8'), + seqno=msg_id.encode('utf-8'), + data=msg_content.encode('utf-8'), + ) + for topic in topics: + message.topicIDs.extend([topic.encode('utf-8')]) + + # for topic in topics: + # message.topicIDs.extend([topic.encode('utf-8')]) + # packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( + # subscribe=True, + # topicid = topic.encode('utf-8') + # )]) + + packet.publish.extend([message]) + return packet From 41d1aae55b8a2f65212254ee712b6ca2083c1885 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Mon, 1 Apr 2019 16:55:44 -0400 Subject: [PATCH 16/19] clean up --- libp2p/pubsub/floodsub.py | 25 +----- libp2p/pubsub/pubsub.py | 57 +++---------- libp2p/pubsub/pubsub_router_interface.py | 4 +- tests/pubsub/dummy_account_node.py | 47 +---------- tests/pubsub/test_floodsub.py | 103 +---------------------- tests/pubsub/utils.py | 30 +++++++ 6 files changed, 54 insertions(+), 212 deletions(-) create mode 100644 tests/pubsub/utils.py diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index 305607e..a4f2e86 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -1,8 +1,9 @@ -from .pubsub_router_interface import IPubsubRouter from .pb import rpc_pb2 +from .pubsub_router_interface import IPubsubRouter class FloodSub(IPubsubRouter): + # pylint: disable=no-member def __init__(self, protocols): self.protocols = protocols @@ -55,42 +56,22 @@ class FloodSub(IPubsubRouter): :param sender_peer_id: peer_id of message sender :param rpc_message: pubsub message in RPC string format """ - packet = rpc_pb2.RPC() packet.ParseFromString(rpc_message) - print ("IN FLOOODSUB PUBLISH") - print (packet) - print ("++++++++++++++++") msg_sender = str(sender_peer_id) # Deliver to self if self was origin # Note: handle_talk checks if self is subscribed to topics in message for message in packet.publish: decoded_from_id = message.from_id.decode('utf-8') - - print ("MESSAGE SENDER") - print (msg_sender) - print ("FROM ID") - print (message.from_id) - print (str(self.pubsub.host.get_id())) - - if msg_sender == decoded_from_id and msg_sender == str(self.pubsub.host.get_id()): - await self.pubsub.handle_talk(sender_peer_id, message) + await self.pubsub.handle_talk(message) - - print ("OHOHOHOH") - print (self.pubsub.peer_topics) - print ("UUUJUJUJ") - print (self.pubsub.peers) - print ("********") # Deliver to self and peers for topic in message.topicIDs: if topic in self.pubsub.peer_topics: for peer_id_in_topic in self.pubsub.peer_topics[topic]: # Forward to all known peers in the topic that are not the # message sender and are not the message origin - print ("PEERID") - print (peer_id_in_topic) if peer_id_in_topic not in (msg_sender, decoded_from_id): stream = self.pubsub.peers[peer_id_in_topic] # create new packet with just publish message diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index c7bfe06..bfad873 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,13 +1,12 @@ import asyncio import uuid -from .pb import rpc_pb2_grpc from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee class Pubsub(): - # pylint: disable=too-many-instance-attributes + # pylint: disable=too-many-instance-attributes, no-member def __init__(self, host, router, my_id): """ @@ -65,12 +64,6 @@ class Pubsub(): for topic_id in self.my_topics: packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe=True, topicid=topic_id)]) - - # message = rpc_pb2.Message( - # from_id=str(self.host.get_id()).encode('utf-8'), - # seqno=str(generate_message_id()).encode('utf-8') - # ) - # packet.publish.extend([message]) return packet.SerializeToString() @@ -82,19 +75,13 @@ class Pubsub(): """ # TODO check on types here - print ("++++++++++ASPYN+++++++++++++++++") peer_id = str(stream.mplex_conn.peer_id) while True: - print ("HIT ME") incoming = (await stream.read()) rpc_incoming = rpc_pb2.RPC() rpc_incoming.ParseFromString(incoming) - print ("IN PUBSUB CONTINUOUSLY READ") - print (rpc_incoming) - print ("###########################") - should_publish = False if rpc_incoming.publish: @@ -103,8 +90,7 @@ class Pubsub(): if message.seqno not in self.seen_messages: should_publish = True self.seen_messages.append(message.seqno) - await self.handle_talk(peer_id, message) - + await self.handle_talk(message) if rpc_incoming.subscriptions: # deal with RPC.subscriptions @@ -150,10 +136,8 @@ class Pubsub(): pubsub protocols we support """ while True: - print ("PUBSUB HANDLE PEER QUEUE") + peer_id = await self.peer_queue.get() - print (peer_id) - print ("++++++++++++++++++++++++") # Open a stream to peer on existing connection # (we know connection exists since that's the only way @@ -175,7 +159,7 @@ class Pubsub(): # Force context switch await asyncio.sleep(0) - def handle_subscription(self, peer_id, sub_message): + def handle_subscription(self, origin_id, sub_message): """ Handle an incoming subscription message from a peer. Update internal mapping to mark the peer as subscribed or unsubscribed to topics as @@ -183,23 +167,19 @@ class Pubsub(): :param origin_id: id of the peer who subscribe to the message :param sub_message: RPC.SubOpts """ - # TODO verify logic here - - if sub_message.subscribe: if sub_message.topicid not in self.peer_topics: - self.peer_topics[sub_message.topicid] = [peer_id] - elif peer_id not in self.peer_topics[sub_message.topicid]: + self.peer_topics[sub_message.topicid] = [origin_id] + elif origin_id not in self.peer_topics[sub_message.topicid]: # Add peer to topic - self.peer_topics[sub_message.topicid].append(peer_id) + self.peer_topics[sub_message.topicid].append(origin_id) else: # TODO: Remove peer from topic pass - async def handle_talk(self, peer_id, publish_message): + async def handle_talk(self, publish_message): """ Put incoming message from a peer onto my blocking queue - :param peer_id: peer id whom forwarded this message :param talk: RPC.Message format """ @@ -216,9 +196,8 @@ class Pubsub(): Subscribe ourself to a topic :param topic_id: topic_id to subscribe to """ - # Map topic_id to blocking queue - print ("**PUBSUB** in SUBSCRIBE") + # Map topic_id to blocking queue self.my_topics[topic_id] = asyncio.Queue() # Create subscribe message @@ -228,12 +207,10 @@ class Pubsub(): # seqno=str(generate_message_id()).encode('utf-8') # )]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe = True, - topicid = topic_id.encode('utf-8') + subscribe=True, + topicid=topic_id.encode('utf-8') )]) - print (packet) - print ("**PUBSUB** PEEERS") - print (self.peers) + # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -255,13 +232,9 @@ class Pubsub(): # Create unsubscribe message packet = rpc_pb2.RPC() - # packet.publish.extend([rpc_pb2.Message( - # from_id=str(self.host.get_id()).encode('utf-8'), - # seqno=str(generate_message_id()).encode('utf-8') - # )]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe = False, - topicid = topic_id.encode('utf-8') + subscribe=False, + topicid=topic_id.encode('utf-8') )]) # Send out unsubscribe message to all peers @@ -275,8 +248,6 @@ class Pubsub(): Broadcast a message to peers :param raw_msg: raw contents of the message to broadcast """ - print ("**PUBSU** IN MESSAGE ALL PEERS") - print (rpc_msg) # Broadcast message for peer in self.peers: diff --git a/libp2p/pubsub/pubsub_router_interface.py b/libp2p/pubsub/pubsub_router_interface.py index 727b39e..ec5132e 100644 --- a/libp2p/pubsub/pubsub_router_interface.py +++ b/libp2p/pubsub/pubsub_router_interface.py @@ -39,11 +39,11 @@ class IPubsubRouter(ABC): """ @abstractmethod - def publish(self, sender_peer_id, message): + def publish(self, sender_peer_id, rpc_message): """ Invoked to forward a new message that has been validated :param sender_peer_id: peer_id of message sender - :param message: message to forward + :param rpc_message: message to forward """ @abstractmethod diff --git a/tests/pubsub/dummy_account_node.py b/tests/pubsub/dummy_account_node.py index 1951fc6..05a02bf 100644 --- a/tests/pubsub/dummy_account_node.py +++ b/tests/pubsub/dummy_account_node.py @@ -1,10 +1,8 @@ import asyncio -import uuid import multiaddr +from utils import generate_message_id, generate_RPC_packet from libp2p import new_node -from libp2p.pubsub.pb import rpc_pb2_grpc -from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub @@ -37,7 +35,6 @@ class DummyAccountNode(): We use create as this serves as a factory function and allows us to use async await, unlike the init function """ - print ("**DUMMY** CREATE ACCOUNT NODE") self = DummyAccountNode() libp2p_node = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) @@ -54,14 +51,9 @@ class DummyAccountNode(): Handle all incoming messages on the CRYPTO_TOPIC from peers """ while True: - incoming = await self.q.get() - print ("**DUMMY** HANDLE INCOMING") - print (incoming) - print ("========================") - + incoming = await self.q.get() msg_comps = incoming.data.decode('utf-8').split(",") - print (msg_comps) - print ("--------") + if msg_comps[0] == "send": self.handle_send_crypto(msg_comps[1], msg_comps[2], int(msg_comps[3])) elif msg_comps[0] == "set": @@ -107,7 +99,6 @@ class DummyAccountNode(): :param dest_user: user to send crypto to :param amount: amount of crypto to send """ - print ("**DUMMY** IN HANDLE SEND CRYPTO") if source_user in self.balances: self.balances[source_user] -= amount else: @@ -124,12 +115,7 @@ class DummyAccountNode(): :param dest_user: user to set crypto for :param amount: amount of crypto """ - print ("**DUMMY** IN HANDLE SET CRYPTO") - print (dest_user) - print (amount) self.balances[dest_user] = amount - print (self.balances) - print ("^^ balance") def get_balance(self, user): """ @@ -137,35 +123,8 @@ class DummyAccountNode(): :param user: user to get balance for :return: balance of user """ - print ("GET BALACNCE") - print (user) - print (self.balances) if user in self.balances: return self.balances[user] else: return -1 -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) - -def generate_RPC_packet(origin_id, topics, msg_content, msg_id): - packet = rpc_pb2.RPC() - message = rpc_pb2.Message( - from_id=origin_id.encode('utf-8'), - seqno=msg_id.encode('utf-8'), - data=msg_content.encode('utf-8') - ) - - for topic in topics: - message.topicIDs.extend([topic.encode('utf-8')]) - packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - subscribe=True, - topicid = topic.encode('utf-8') - )]) - - packet.publish.extend([message]) - return packet diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index c4c2681..4fc059b 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -1,5 +1,4 @@ import asyncio -import uuid import multiaddr import pytest @@ -9,6 +8,7 @@ from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.pubsub.pb import rpc_pb2 from libp2p.pubsub.pubsub import Pubsub from libp2p.pubsub.floodsub import FloodSub +from utils import generate_message_id, generate_RPC_packet # pylint: disable=too-many-locals @@ -46,16 +46,10 @@ async def test_simple_two_nodes_RPC(): msg = generate_RPC_packet(node_a_id, ["my_topic"], "some data", generate_message_id()) await floodsub_a.publish(node_a_id, msg.SerializeToString()) - print ("MESSAGE B") - print (msg.SerializeToString()) - print ("=========") await asyncio.sleep(0.25) res_b = await qb.get() - print ("RES B") - print (res_b) - print ("-----") # Check that the msg received by node_b is the same # as the message sent by node_a assert res_b.SerializeToString() == msg.publish[0].SerializeToString() @@ -65,57 +59,6 @@ async def test_simple_two_nodes_RPC(): # Success, terminate pending tasks. await cleanup() -# @pytest.mark.asyncio -# async def test_simple_three_nodes(): -# # Want to pass message from A -> B -> C -# node_a = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) -# node_b = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) -# node_c = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"]) - -# await node_a.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) -# await node_b.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) -# await node_c.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - -# supported_protocols = ["/floodsub/1.0.0"] - -# floodsub_a = FloodSub(supported_protocols) -# pubsub_a = Pubsub(node_a, floodsub_a, "a") -# floodsub_b = FloodSub(supported_protocols) -# pubsub_b = Pubsub(node_b, floodsub_b, "b") -# floodsub_c = FloodSub(supported_protocols) -# pubsub_c = Pubsub(node_c, floodsub_c, "c") - -# await connect(node_a, node_b) -# await connect(node_b, node_c) - -# await asyncio.sleep(0.25) -# qb = await pubsub_b.subscribe("my_topic") -# qc = await pubsub_c.subscribe("my_topic") -# await asyncio.sleep(0.25) - -# node_a_id = str(node_a.get_id()) - -# msg = MessageTalk(node_a_id, node_a_id, ["my_topic"], "some data", generate_message_id()) - -# await floodsub_a.publish(node_a.get_id(), msg.to_str()) - -# await asyncio.sleep(0.25) -# res_b = await qb.get() -# res_c = await qc.get() - -# # Check that the msg received by node_b is the same -# # as the message sent by node_a -# assert res_b == msg.to_str() - -# # res_c should match original msg but with b as sender -# node_b_id = str(node_b.get_id()) -# msg.from_id = node_b_id - -# assert res_c == msg.to_str() - -# # Success, terminate pending tasks. -# await cleanup() - async def perform_test_from_obj(obj): """ Perform a floodsub test from a test obj. @@ -243,9 +186,7 @@ async def perform_test_from_obj(obj): # Create correctly formatted message msg_talk = generate_RPC_packet(actual_node_id, topics, data, generate_message_id()) - - print ("**TEST FLOODSUB** MESSAGE TALK") - print (msg_talk) + # Publish message # await floodsub_map[node_id].publish(actual_node_id, msg_talk.to_str()) tasks_publish.append(asyncio.ensure_future(floodsub_map[node_id].publish(\ @@ -269,20 +210,7 @@ async def perform_test_from_obj(obj): for node_id in topic_map[topic]: # Get message from subscription queue msg_on_node_str = await queues_map[node_id][topic].get() - - print ("MESSAGE ON NODE STR") - print (msg_on_node_str) - - print ("ACTUAL MESSSSAGE") - print (actual_msg) - assert actual_msg.publish[0].SerializeToString() == msg_on_node_str.SerializeToString() - # msg_on_node = create_message_talk(msg_on_node_str) - - # Perform checks - # assert actual_msg.origin_id == msg_on_node.origin_id - # assert actual_msg.topics == msg_on_node.topics - # assert actual_msg.data == msg_on_node.data # Success, terminate pending tasks. await cleanup() @@ -500,30 +428,3 @@ async def test_three_nodes_clique_two_topic_diff_origin_test_obj(): ] } await perform_test_from_obj(test_obj) - -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) - -def generate_RPC_packet(origin_id, topics, msg_content, msg_id): - packet = rpc_pb2.RPC() - message = rpc_pb2.Message( - from_id=origin_id.encode('utf-8'), - seqno=msg_id.encode('utf-8'), - data=msg_content.encode('utf-8'), - ) - for topic in topics: - message.topicIDs.extend([topic.encode('utf-8')]) - - # for topic in topics: - # message.topicIDs.extend([topic.encode('utf-8')]) - # packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( - # subscribe=True, - # topicid = topic.encode('utf-8') - # )]) - - packet.publish.extend([message]) - return packet diff --git a/tests/pubsub/utils.py b/tests/pubsub/utils.py new file mode 100644 index 0000000..d4695d7 --- /dev/null +++ b/tests/pubsub/utils.py @@ -0,0 +1,30 @@ +import uuid +from libp2p.pubsub.pb import rpc_pb2 + +def generate_message_id(): + """ + Generate a unique message id + :return: messgae id + """ + return str(uuid.uuid1()) + +def generate_RPC_packet(origin_id, topics, msg_content, msg_id): + """ + Generate RPC packet to send over wire + :param origin_id: peer id of the message origin + :param topics: list of topics + :param msg_content: string of content in data + :param msg_id: seqno for the message + """ + packet = rpc_pb2.RPC() + message = rpc_pb2.Message( + from_id=origin_id.encode('utf-8'), + seqno=msg_id.encode('utf-8'), + data=msg_content.encode('utf-8'), + ) + + for topic in topics: + message.topicIDs.extend([topic.encode('utf-8')]) + + packet.publish.extend([message]) + return packet From 0238dff2173600a109cbbe73a4725e7104360e7e Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Tue, 2 Apr 2019 21:17:48 -0400 Subject: [PATCH 17/19] remove unused code --- libp2p/pubsub/pubsub.py | 12 ------------ tests/pubsub/test_floodsub.py | 2 -- 2 files changed, 14 deletions(-) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index bfad873..bee5fba 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -1,5 +1,4 @@ import asyncio -import uuid from .pb import rpc_pb2 from .pubsub_notifee import PubsubNotifee @@ -202,10 +201,6 @@ class Pubsub(): # Create subscribe message packet = rpc_pb2.RPC() - # packet.publish.extend([rpc_pb2.Message( - # from_id=str(self.host.get_id()).encode('utf-8'), - # seqno=str(generate_message_id()).encode('utf-8') - # )]) packet.subscriptions.extend([rpc_pb2.RPC.SubOpts( subscribe=True, topicid=topic_id.encode('utf-8') @@ -255,10 +250,3 @@ class Pubsub(): # Write message to stream await stream.write(rpc_msg) - -def generate_message_id(): - """ - Generate a unique message id - :return: messgae id - """ - return str(uuid.uuid1()) diff --git a/tests/pubsub/test_floodsub.py b/tests/pubsub/test_floodsub.py index 4fc059b..272af79 100644 --- a/tests/pubsub/test_floodsub.py +++ b/tests/pubsub/test_floodsub.py @@ -54,8 +54,6 @@ async def test_simple_two_nodes_RPC(): # as the message sent by node_a assert res_b.SerializeToString() == msg.publish[0].SerializeToString() - - # Success, terminate pending tasks. await cleanup() From 3a52d29cb784d332045349c26f6c5a8d214752d2 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 3 Apr 2019 14:05:37 -0400 Subject: [PATCH 18/19] remove redundant proto file --- rpc.proto | 78 ------------------------------------------------------- 1 file changed, 78 deletions(-) delete mode 100644 rpc.proto diff --git a/rpc.proto b/rpc.proto deleted file mode 100644 index 2ae2ef2..0000000 --- a/rpc.proto +++ /dev/null @@ -1,78 +0,0 @@ -// Borrowed from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto - -syntax = "proto2"; - -package pubsub.pb; - -message RPC { - repeated SubOpts subscriptions = 1; - repeated Message publish = 2; - - message SubOpts { - optional bool subscribe = 1; // subscribe or unsubcribe - optional string topicid = 2; - } - - optional ControlMessage control = 3; -} - -message Message { - optional bytes from = 1; - optional bytes data = 2; - optional bytes seqno = 3; - repeated string topicIDs = 4; - optional bytes signature = 5; - optional bytes key = 6; -} - -message ControlMessage { - repeated ControlIHave ihave = 1; - repeated ControlIWant iwant = 2; - repeated ControlGraft graft = 3; - repeated ControlPrune prune = 4; -} - -message ControlIHave { - optional string topicID = 1; - repeated string messageIDs = 2; -} - -message ControlIWant { - repeated string messageIDs = 1; -} - -message ControlGraft { - optional string topicID = 1; -} - -message ControlPrune { - optional string topicID = 1; -} - -message TopicDescriptor { - optional string name = 1; - optional AuthOpts auth = 2; - optional EncOpts enc = 3; - - message AuthOpts { - optional AuthMode mode = 1; - repeated bytes keys = 2; // root keys to trust - - enum AuthMode { - NONE = 0; // no authentication, anyone can publish - KEY = 1; // only messages signed by keys in the topic descriptor are accepted - WOT = 2; // web of trust, certificates can allow publisher set to grow - } - } - - message EncOpts { - optional EncMode mode = 1; - repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) - - enum EncMode { - NONE = 0; // no encryption, anyone can read - SHAREDKEY = 1; // messages are encrypted with shared key - WOT = 2; // web of trust, certificates can allow publisher set to grow - } - } -} \ No newline at end of file From 225bd390dfc154a11238460097152a09b0150673 Mon Sep 17 00:00:00 2001 From: zixuanzh Date: Wed, 3 Apr 2019 14:08:05 -0400 Subject: [PATCH 19/19] add source to rpc.proto --- libp2p/pubsub/pb/rpc.proto | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index f07da20..df38bad 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -1,3 +1,5 @@ +// Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto + syntax = "proto2"; package pubsub.pb;