From 7bc363f2fa98bf07dc089edec386e017513acc67 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 16 Aug 2019 17:01:27 +0800 Subject: [PATCH 01/19] Remove initiator in `Mplex` Besides, fix the wrong passed `multi_addr` to `mplex_stream`. --- libp2p/network/swarm.py | 1 + libp2p/stream_muxer/abc.py | 1 + libp2p/stream_muxer/mplex/mplex.py | 11 +++++++---- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 9dd1c15..0bf91a3 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -145,6 +145,7 @@ class Swarm(INetwork): # Use muxed conn to open stream, which returns # a muxed stream # TODO: Remove protocol id from being passed into muxed_conn + # FIXME: Remove multiaddr from being passed into muxed_conn muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr) # Perform protocol muxing to determine protocol to use diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 7084d7a..26ad509 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -62,6 +62,7 @@ class IMuxedConn(ABC): Read a message from `stream_id`'s buffer, non-blockingly. """ + # FIXME: Remove multiaddr from being passed into muxed_conn @abstractmethod async def open_stream( self, protocol_id: str, multi_addr: Multiaddr diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index aa64b69..2aa59c9 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -25,7 +25,6 @@ class Mplex(IMuxedConn): secured_conn: ISecureConn raw_conn: IRawConnection - initiator: bool peer_id: ID # TODO: `dataIn` in go implementation. Should be size of 8. # TODO: Also, `dataIn` is closed indicating EOF in Go. We don't have similar strategies @@ -41,13 +40,12 @@ class Mplex(IMuxedConn): ) -> None: """ create a new muxed connection - :param conn: an instance of raw connection + :param secured_conn: an instance of ``ISecureConn`` :param generic_protocol_handler: generic protocol handler for new muxed streams :param peer_id: peer_id of peer the connection is to """ self.conn = secured_conn - self.initiator = secured_conn.initiator # Store generic protocol handler self.generic_protocol_handler = generic_protocol_handler @@ -63,6 +61,10 @@ class Mplex(IMuxedConn): # Kick off reading asyncio.ensure_future(self.handle_incoming()) + @property + def initiator(self) -> bool: + return self.conn.initiator + def close(self) -> None: """ close the stream muxer and underlying raw connection @@ -98,6 +100,7 @@ class Mplex(IMuxedConn): return None return await self.buffers[stream_id].get() + # FIXME: Remove multiaddr from being passed into muxed_conn async def open_stream( self, protocol_id: str, multi_addr: Multiaddr ) -> IMuxedStream: @@ -108,7 +111,7 @@ class Mplex(IMuxedConn): :return: a new stream """ stream_id = self.conn.next_stream_id() - stream = MplexStream(stream_id, multi_addr, self) + stream = MplexStream(stream_id, True, self) self.buffers[stream_id] = asyncio.Queue() await self.send_message(HeaderTags.NewStream, None, stream_id) return stream From 59b373b48aa16a12bb1a95bf809a5c22670bc219 Mon Sep 17 00:00:00 2001 From: mhchia Date: Fri, 16 Aug 2019 17:08:22 +0800 Subject: [PATCH 02/19] Add `plaintext.proto` Update Makefile to handle the import in `plaintext.proto`. Import path is modified to be relative to the project root. And we run `protoc` from where `Makefile` locates, i.e. the project root. Reference: - plaintext.proto: https://github.com/libp2p/go-libp2p-core/blob/62b2c6c482ef265f6b975d59244fd7b2169aaabf/sec/insecure/pb/plaintext.proto --- Makefile | 18 ++++- libp2p/security/insecure/pb/__init__.py | 0 libp2p/security/insecure/pb/plaintext.proto | 10 +++ libp2p/security/insecure/pb/plaintext_pb2.py | 79 +++++++++++++++++++ libp2p/security/insecure/pb/plaintext_pb2.pyi | 45 +++++++++++ 5 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 libp2p/security/insecure/pb/__init__.py create mode 100644 libp2p/security/insecure/pb/plaintext.proto create mode 100644 libp2p/security/insecure/pb/plaintext_pb2.py create mode 100644 libp2p/security/insecure/pb/plaintext_pb2.pyi diff --git a/Makefile b/Makefile index d44fd6b..e5b6509 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,10 @@ FILES_TO_LINT = libp2p tests examples setup.py +PB = libp2p/crypto/pb/crypto.proto libp2p/pubsub/pb/rpc.proto libp2p/security/insecure/pb/plaintext.proto +PY = $(PB:.proto=_pb2.py) +PYI = $(PB:.proto=_pb2.pyi) + +# Set default to `protobufs`, otherwise `format` is called when typing only `make` +all: protobufs format: black $(FILES_TO_LINT) @@ -10,6 +16,12 @@ lintroll: isort --recursive --check-only $(FILES_TO_LINT) flake8 $(FILES_TO_LINT) -protobufs: - cd libp2p/crypto/pb && protoc --python_out=. --mypy_out=. crypto.proto - cd libp2p/pubsub/pb && protoc --python_out=. --mypy_out=. rpc.proto +protobufs: $(PY) + +%_pb2.py: %.proto + protoc --python_out=. --mypy_out=. $< + +.PHONY: clean + +clean: + rm -f $(PY) $(PYI) diff --git a/libp2p/security/insecure/pb/__init__.py b/libp2p/security/insecure/pb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libp2p/security/insecure/pb/plaintext.proto b/libp2p/security/insecure/pb/plaintext.proto new file mode 100644 index 0000000..0c8ffb0 --- /dev/null +++ b/libp2p/security/insecure/pb/plaintext.proto @@ -0,0 +1,10 @@ +syntax = "proto2"; + +package plaintext.pb; + +import "libp2p/crypto/pb/crypto.proto"; + +message Exchange { + optional bytes id = 1; + optional crypto.pb.PublicKey pubkey = 2; +} diff --git a/libp2p/security/insecure/pb/plaintext_pb2.py b/libp2p/security/insecure/pb/plaintext_pb2.py new file mode 100644 index 0000000..72b2740 --- /dev/null +++ b/libp2p/security/insecure/pb/plaintext_pb2.py @@ -0,0 +1,79 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: libp2p/security/insecure/pb/plaintext.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from libp2p.crypto.pb import crypto_pb2 as libp2p_dot_crypto_dot_pb_dot_crypto__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='libp2p/security/insecure/pb/plaintext.proto', + package='plaintext.pb', + syntax='proto2', + serialized_pb=_b('\n+libp2p/security/insecure/pb/plaintext.proto\x12\x0cplaintext.pb\x1a\x1dlibp2p/crypto/pb/crypto.proto\"<\n\x08\x45xchange\x12\n\n\x02id\x18\x01 \x01(\x0c\x12$\n\x06pubkey\x18\x02 \x01(\x0b\x32\x14.crypto.pb.PublicKey') + , + dependencies=[libp2p_dot_crypto_dot_pb_dot_crypto__pb2.DESCRIPTOR,]) + + + + +_EXCHANGE = _descriptor.Descriptor( + name='Exchange', + full_name='plaintext.pb.Exchange', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='plaintext.pb.Exchange.id', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='pubkey', full_name='plaintext.pb.Exchange.pubkey', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=92, + serialized_end=152, +) + +_EXCHANGE.fields_by_name['pubkey'].message_type = libp2p_dot_crypto_dot_pb_dot_crypto__pb2._PUBLICKEY +DESCRIPTOR.message_types_by_name['Exchange'] = _EXCHANGE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Exchange = _reflection.GeneratedProtocolMessageType('Exchange', (_message.Message,), dict( + DESCRIPTOR = _EXCHANGE, + __module__ = 'libp2p.security.insecure.pb.plaintext_pb2' + # @@protoc_insertion_point(class_scope:plaintext.pb.Exchange) + )) +_sym_db.RegisterMessage(Exchange) + + +# @@protoc_insertion_point(module_scope) diff --git a/libp2p/security/insecure/pb/plaintext_pb2.pyi b/libp2p/security/insecure/pb/plaintext_pb2.pyi new file mode 100644 index 0000000..641bd9a --- /dev/null +++ b/libp2p/security/insecure/pb/plaintext_pb2.pyi @@ -0,0 +1,45 @@ +# @generated by generate_proto_mypy_stubs.py. Do not edit! +import sys +from google.protobuf.descriptor import ( + Descriptor as google___protobuf___descriptor___Descriptor, +) + +from google.protobuf.message import ( + Message as google___protobuf___message___Message, +) + +from libp2p.crypto.pb.crypto_pb2 import ( + PublicKey as libp2p___crypto___pb___crypto_pb2___PublicKey, +) + +from typing import ( + Optional as typing___Optional, +) + +from typing_extensions import ( + Literal as typing_extensions___Literal, +) + + +class Exchange(google___protobuf___message___Message): + DESCRIPTOR: google___protobuf___descriptor___Descriptor = ... + id = ... # type: bytes + + @property + def pubkey(self) -> libp2p___crypto___pb___crypto_pb2___PublicKey: ... + + def __init__(self, + *, + id : typing___Optional[bytes] = None, + pubkey : typing___Optional[libp2p___crypto___pb___crypto_pb2___PublicKey] = None, + ) -> None: ... + @classmethod + def FromString(cls, s: bytes) -> Exchange: ... + def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ... + def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ... + if sys.version_info >= (3,): + def HasField(self, field_name: typing_extensions___Literal[u"id",u"pubkey"]) -> bool: ... + def ClearField(self, field_name: typing_extensions___Literal[u"id",u"pubkey"]) -> None: ... + else: + def HasField(self, field_name: typing_extensions___Literal[u"id",b"id",u"pubkey",b"pubkey"]) -> bool: ... + def ClearField(self, field_name: typing_extensions___Literal[u"id",b"id",u"pubkey",b"pubkey"]) -> None: ... From 519294472461a642453f95af0cc04a9c7a7a719f Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 17 Aug 2019 00:12:15 +0800 Subject: [PATCH 03/19] Update pb --- libp2p/crypto/pb/crypto_pb2.py | 47 +++++---- libp2p/pubsub/pb/rpc_pb2.py | 175 ++++++++++++++++----------------- 2 files changed, 110 insertions(+), 112 deletions(-) diff --git a/libp2p/crypto/pb/crypto_pb2.py b/libp2p/crypto/pb/crypto_pb2.py index 0e4aa1f..1b51897 100644 --- a/libp2p/crypto/pb/crypto_pb2.py +++ b/libp2p/crypto/pb/crypto_pb2.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: crypto.proto +# source: libp2p/crypto/pb/crypto.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) @@ -9,6 +8,7 @@ from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -17,11 +17,10 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( - name='crypto.proto', + name='libp2p/crypto/pb/crypto.proto', package='crypto.pb', syntax='proto2', - serialized_options=None, - serialized_pb=_b('\n\x0c\x63rypto.proto\x12\tcrypto.pb\"?\n\tPublicKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"@\n\nPrivateKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c*9\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03') + serialized_pb=_b('\n\x1dlibp2p/crypto/pb/crypto.proto\x12\tcrypto.pb\"?\n\tPublicKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"@\n\nPrivateKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c*9\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03') ) _KEYTYPE = _descriptor.EnumDescriptor( @@ -32,25 +31,25 @@ _KEYTYPE = _descriptor.EnumDescriptor( values=[ _descriptor.EnumValueDescriptor( name='RSA', index=0, number=0, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='Ed25519', index=1, number=1, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='Secp256k1', index=2, number=2, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='ECDSA', index=3, number=3, - serialized_options=None, + options=None, type=None), ], containing_type=None, - serialized_options=None, - serialized_start=158, - serialized_end=215, + options=None, + serialized_start=175, + serialized_end=232, ) _sym_db.RegisterEnumDescriptor(_KEYTYPE) @@ -75,28 +74,28 @@ _PUBLICKEY = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='data', full_name='crypto.pb.PublicKey.data', index=1, number=2, type=12, cpp_type=9, label=2, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=27, - serialized_end=90, + serialized_start=44, + serialized_end=107, ) @@ -113,28 +112,28 @@ _PRIVATEKEY = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='data', full_name='crypto.pb.PrivateKey.data', index=1, number=2, type=12, cpp_type=9, label=2, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=92, - serialized_end=156, + serialized_start=109, + serialized_end=173, ) _PUBLICKEY.fields_by_name['key_type'].enum_type = _KEYTYPE @@ -146,14 +145,14 @@ _sym_db.RegisterFileDescriptor(DESCRIPTOR) PublicKey = _reflection.GeneratedProtocolMessageType('PublicKey', (_message.Message,), dict( DESCRIPTOR = _PUBLICKEY, - __module__ = 'crypto_pb2' + __module__ = 'libp2p.crypto.pb.crypto_pb2' # @@protoc_insertion_point(class_scope:crypto.pb.PublicKey) )) _sym_db.RegisterMessage(PublicKey) PrivateKey = _reflection.GeneratedProtocolMessageType('PrivateKey', (_message.Message,), dict( DESCRIPTOR = _PRIVATEKEY, - __module__ = 'crypto_pb2' + __module__ = 'libp2p.crypto.pb.crypto_pb2' # @@protoc_insertion_point(class_scope:crypto.pb.PrivateKey) )) _sym_db.RegisterMessage(PrivateKey) diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index ab794f2..7c53fc4 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: rpc.proto +# source: libp2p/pubsub/pb/rpc.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) @@ -8,6 +7,7 @@ from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -16,11 +16,10 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( - name='rpc.proto', + name='libp2p/pubsub/pb/rpc.proto', package='pubsub.pb', syntax='proto2', - serialized_options=None, - serialized_pb=_b('\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') + serialized_pb=_b('\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') ) @@ -33,21 +32,21 @@ _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE = _descriptor.EnumDescriptor( values=[ _descriptor.EnumValueDescriptor( name='NONE', index=0, number=0, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='KEY', index=1, number=1, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='WOT', index=2, number=2, - serialized_options=None, + options=None, type=None), ], containing_type=None, - serialized_options=None, - serialized_start=868, - serialized_end=906, + options=None, + serialized_start=885, + serialized_end=923, ) _sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE) @@ -59,21 +58,21 @@ _TOPICDESCRIPTOR_ENCOPTS_ENCMODE = _descriptor.EnumDescriptor( values=[ _descriptor.EnumValueDescriptor( name='NONE', index=0, number=0, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='SHAREDKEY', index=1, number=1, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='WOT', index=2, number=2, - serialized_options=None, + options=None, type=None), ], containing_type=None, - serialized_options=None, - serialized_start=997, - serialized_end=1040, + options=None, + serialized_start=1014, + serialized_end=1057, ) _sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_ENCOPTS_ENCMODE) @@ -91,28 +90,28 @@ _RPC_SUBOPTS = _descriptor.Descriptor( has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='topicid', full_name='pubsub.pb.RPC.SubOpts.topicid', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=160, - serialized_end=205, + serialized_start=177, + serialized_end=222, ) _RPC = _descriptor.Descriptor( @@ -128,35 +127,35 @@ _RPC = _descriptor.Descriptor( has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='publish', full_name='pubsub.pb.RPC.publish', index=1, number=2, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='control', full_name='pubsub.pb.RPC.control', index=2, number=3, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[_RPC_SUBOPTS, ], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=25, - serialized_end=205, + serialized_start=42, + serialized_end=222, ) @@ -173,56 +172,56 @@ _MESSAGE = _descriptor.Descriptor( has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='data', full_name='pubsub.pb.Message.data', index=1, number=2, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='seqno', full_name='pubsub.pb.Message.seqno', index=2, number=3, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='topicIDs', full_name='pubsub.pb.Message.topicIDs', index=3, number=4, type=9, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='signature', full_name='pubsub.pb.Message.signature', index=4, number=5, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='key', full_name='pubsub.pb.Message.key', index=5, number=6, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=207, - serialized_end=312, + serialized_start=224, + serialized_end=329, ) @@ -239,42 +238,42 @@ _CONTROLMESSAGE = _descriptor.Descriptor( has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='iwant', full_name='pubsub.pb.ControlMessage.iwant', index=1, number=2, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='graft', full_name='pubsub.pb.ControlMessage.graft', index=2, number=3, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='prune', full_name='pubsub.pb.ControlMessage.prune', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=315, - serialized_end=491, + serialized_start=332, + serialized_end=508, ) @@ -291,28 +290,28 @@ _CONTROLIHAVE = _descriptor.Descriptor( has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='messageIDs', full_name='pubsub.pb.ControlIHave.messageIDs', index=1, number=2, type=9, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=493, - serialized_end=544, + serialized_start=510, + serialized_end=561, ) @@ -329,21 +328,21 @@ _CONTROLIWANT = _descriptor.Descriptor( has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=546, - serialized_end=580, + serialized_start=563, + serialized_end=597, ) @@ -360,21 +359,21 @@ _CONTROLGRAFT = _descriptor.Descriptor( has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=582, - serialized_end=613, + serialized_start=599, + serialized_end=630, ) @@ -391,21 +390,21 @@ _CONTROLPRUNE = _descriptor.Descriptor( has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=615, - serialized_end=646, + serialized_start=632, + serialized_end=663, ) @@ -422,14 +421,14 @@ _TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='keys', full_name='pubsub.pb.TopicDescriptor.AuthOpts.keys', index=1, number=2, type=12, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -437,14 +436,14 @@ _TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor( enum_types=[ _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE, ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=782, - serialized_end=906, + serialized_start=799, + serialized_end=923, ) _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( @@ -460,14 +459,14 @@ _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='keyHashes', full_name='pubsub.pb.TopicDescriptor.EncOpts.keyHashes', index=1, number=2, type=12, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -475,14 +474,14 @@ _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( enum_types=[ _TOPICDESCRIPTOR_ENCOPTS_ENCMODE, ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=909, - serialized_end=1040, + serialized_start=926, + serialized_end=1057, ) _TOPICDESCRIPTOR = _descriptor.Descriptor( @@ -498,35 +497,35 @@ _TOPICDESCRIPTOR = _descriptor.Descriptor( has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='auth', full_name='pubsub.pb.TopicDescriptor.auth', index=1, number=2, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='enc', full_name='pubsub.pb.TopicDescriptor.enc', index=2, number=3, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[_TOPICDESCRIPTOR_AUTHOPTS, _TOPICDESCRIPTOR_ENCOPTS, ], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=649, - serialized_end=1040, + serialized_start=666, + serialized_end=1057, ) _RPC_SUBOPTS.containing_type = _RPC @@ -559,12 +558,12 @@ RPC = _reflection.GeneratedProtocolMessageType('RPC', (_message.Message,), dict( SubOpts = _reflection.GeneratedProtocolMessageType('SubOpts', (_message.Message,), dict( DESCRIPTOR = _RPC_SUBOPTS, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.RPC.SubOpts) )) , DESCRIPTOR = _RPC, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.RPC) )) _sym_db.RegisterMessage(RPC) @@ -572,42 +571,42 @@ _sym_db.RegisterMessage(RPC.SubOpts) Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict( DESCRIPTOR = _MESSAGE, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.Message) )) _sym_db.RegisterMessage(Message) ControlMessage = _reflection.GeneratedProtocolMessageType('ControlMessage', (_message.Message,), dict( DESCRIPTOR = _CONTROLMESSAGE, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlMessage) )) _sym_db.RegisterMessage(ControlMessage) ControlIHave = _reflection.GeneratedProtocolMessageType('ControlIHave', (_message.Message,), dict( DESCRIPTOR = _CONTROLIHAVE, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIHave) )) _sym_db.RegisterMessage(ControlIHave) ControlIWant = _reflection.GeneratedProtocolMessageType('ControlIWant', (_message.Message,), dict( DESCRIPTOR = _CONTROLIWANT, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIWant) )) _sym_db.RegisterMessage(ControlIWant) ControlGraft = _reflection.GeneratedProtocolMessageType('ControlGraft', (_message.Message,), dict( DESCRIPTOR = _CONTROLGRAFT, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlGraft) )) _sym_db.RegisterMessage(ControlGraft) ControlPrune = _reflection.GeneratedProtocolMessageType('ControlPrune', (_message.Message,), dict( DESCRIPTOR = _CONTROLPRUNE, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlPrune) )) _sym_db.RegisterMessage(ControlPrune) @@ -616,19 +615,19 @@ TopicDescriptor = _reflection.GeneratedProtocolMessageType('TopicDescriptor', (_ AuthOpts = _reflection.GeneratedProtocolMessageType('AuthOpts', (_message.Message,), dict( DESCRIPTOR = _TOPICDESCRIPTOR_AUTHOPTS, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.AuthOpts) )) , EncOpts = _reflection.GeneratedProtocolMessageType('EncOpts', (_message.Message,), dict( DESCRIPTOR = _TOPICDESCRIPTOR_ENCOPTS, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.EncOpts) )) , DESCRIPTOR = _TOPICDESCRIPTOR, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor) )) _sym_db.RegisterMessage(TopicDescriptor) From a0923d202a86fef26690e3ce5ffab8a483452ae1 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 17 Aug 2019 00:19:37 +0800 Subject: [PATCH 04/19] Move varint and delim read/write to toplevel To `libp2p.utils`. --- .../multiselect_communicator.py | 23 +++----------- libp2p/security/base_session.py | 2 +- libp2p/stream_muxer/abc.py | 6 +++- libp2p/stream_muxer/mplex/mplex.py | 2 +- libp2p/transport/tcp/tcp.py | 1 + libp2p/{stream_muxer/mplex => }/utils.py | 31 +++++++++++++++++++ 6 files changed, 44 insertions(+), 21 deletions(-) rename libp2p/{stream_muxer/mplex => }/utils.py (59%) diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index 783c380..ebfcc23 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -1,23 +1,10 @@ from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.stream_muxer.abc import IMuxedStream -from libp2p.stream_muxer.mplex.utils import decode_uvarint_from_stream, encode_uvarint -from libp2p.typing import StreamReader +from libp2p.utils import encode_delim, read_delim from .multiselect_communicator_interface import IMultiselectCommunicator -def delim_encode(msg_str: str) -> bytes: - msg_bytes = msg_str.encode() - varint_len_msg = encode_uvarint(len(msg_bytes) + 1) - return varint_len_msg + msg_bytes + b"\n" - - -async def delim_read(reader: StreamReader, timeout: int = 10) -> str: - len_msg = await decode_uvarint_from_stream(reader, timeout) - msg_bytes = await reader.read(len_msg) - return msg_bytes.decode().rstrip() - - class RawConnectionCommunicator(IMultiselectCommunicator): conn: IRawConnection @@ -25,12 +12,12 @@ class RawConnectionCommunicator(IMultiselectCommunicator): self.conn = conn async def write(self, msg_str: str) -> None: - msg_bytes = delim_encode(msg_str) + msg_bytes = encode_delim(msg_str) self.conn.writer.write(msg_bytes) await self.conn.writer.drain() async def read(self) -> str: - return await delim_read(self.conn.reader) + return await read_delim(self.conn.reader) class StreamCommunicator(IMultiselectCommunicator): @@ -40,8 +27,8 @@ class StreamCommunicator(IMultiselectCommunicator): self.stream = stream async def write(self, msg_str: str) -> None: - msg_bytes = delim_encode(msg_str) + msg_bytes = encode_delim(msg_str) await self.stream.write(msg_bytes) async def read(self) -> str: - return await delim_read(self.stream) + return await read_delim(self.stream) diff --git a/libp2p/security/base_session.py b/libp2p/security/base_session.py index 8fc4dab..ba14037 100644 --- a/libp2p/security/base_session.py +++ b/libp2p/security/base_session.py @@ -7,7 +7,7 @@ from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn -class BaseSession(ISecureConn, IRawConnection): +class BaseSession(ISecureConn): """ ``BaseSession`` is not fully instantiated from its abstract classes as it is only meant to be used in clases that derive from it. diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 26ad509..7270c15 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -17,7 +17,6 @@ class IMuxedConn(ABC): reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go """ - initiator: bool peer_id: ID @abstractmethod @@ -35,6 +34,11 @@ class IMuxedConn(ABC): :param peer_id: peer_id of peer the connection is to """ + @property + @abstractmethod + def initiator(self) -> bool: + pass + @abstractmethod def close(self) -> None: """ diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 2aa59c9..aeae0ba 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -13,7 +13,7 @@ from libp2p.typing import TProtocol from .constants import HeaderTags from .exceptions import StreamNotFound from .mplex_stream import MplexStream -from .utils import decode_uvarint_from_stream, encode_uvarint +from libp2p.utils import decode_uvarint_from_stream, encode_uvarint MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0") diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index ae1e776..aee0313 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -62,6 +62,7 @@ class TCPListener(IListener): class TCP(ITransport): + # TODO: Remove `self_id` async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: """ dial a transport to peer listening on multiaddr diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/utils.py similarity index 59% rename from libp2p/stream_muxer/mplex/utils.py rename to libp2p/utils.py index 44326ad..9ecaa83 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/utils.py @@ -5,6 +5,9 @@ from typing import Tuple from libp2p.typing import StreamReader +TIMEOUT = 10 + + def encode_uvarint(number: int) -> bytes: """Pack `number` into varint bytes""" buf = b"" @@ -45,3 +48,31 @@ async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> in break return result + + +# Varint-prefixed read/write + + +def encode_varint_prefixed(msg_bytes: bytes) -> bytes: + varint_len = encode_uvarint(len(msg_bytes)) + return varint_len + msg_bytes + + +async def read_varint_prefixed_bytes( + reader: StreamReader, timeout: int = TIMEOUT +) -> bytes: + len_msg = await decode_uvarint_from_stream(reader, timeout) + return await reader.read(len_msg) + + +# Delimited read/write + + +def encode_delim(msg_str: str) -> bytes: + delimited_msg = msg_str + "\n" + return encode_varint_prefixed(delimited_msg.encode()) + + +async def read_delim(reader: StreamReader, timeout: int = TIMEOUT) -> str: + msg_bytes = await read_varint_prefixed_bytes(reader, timeout) + return msg_bytes.decode().rstrip() From 22b1a5395d9bc24a88db613e8059b8025350a309 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 17 Aug 2019 01:38:50 +0800 Subject: [PATCH 05/19] A working plaintext 2.0 without validation --- libp2p/__init__.py | 2 +- libp2p/network/swarm.py | 10 ++---- libp2p/security/insecure/exceptions.py | 2 ++ libp2p/security/insecure/transport.py | 44 +++++++++++++++++++++++--- libp2p/stream_muxer/mplex/mplex.py | 2 +- libp2p/transport/tcp/tcp.py | 12 ------- libp2p/utils.py | 3 +- 7 files changed, 48 insertions(+), 27 deletions(-) create mode 100644 libp2p/security/insecure/exceptions.py diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 21f1d9c..10e71db 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -98,7 +98,7 @@ def initialize_default_swarm( muxer_transports_by_protocol = muxer_opt or {MPLEX_PROTOCOL_ID: Mplex} security_transports_by_protocol = sec_opt or { - PLAINTEXT_PROTOCOL_ID: InsecureTransport(key_pair) + TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair) } upgrader = TransportUpgrader( security_transports_by_protocol, muxer_transports_by_protocol diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 0bf91a3..badd16c 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -184,12 +184,6 @@ class Swarm(INetwork): async def conn_handler( reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: - # Read in first message (should be peer_id of initiator) and ack - peer_id = ID.from_base58((await reader.read(1024)).decode()) - - writer.write("received peer id".encode()) - await writer.drain() - # Upgrade reader/write to a net_stream and pass \ # to appropriate stream handler (using multiaddr) raw_conn = RawConnection( @@ -202,9 +196,11 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn + # FIXME: This dummy `ID(b"")` for the remote peer is useless. secured_conn = await self.upgrader.upgrade_security( - raw_conn, peer_id, False + raw_conn, ID(b""), False ) + peer_id = secured_conn.get_remote_peer() muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id ) diff --git a/libp2p/security/insecure/exceptions.py b/libp2p/security/insecure/exceptions.py new file mode 100644 index 0000000..d2570e7 --- /dev/null +++ b/libp2p/security/insecure/exceptions.py @@ -0,0 +1,2 @@ +class UpgradeFailure(Exception): + pass diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 97c9676..5f00fc1 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -1,15 +1,34 @@ +from libp2p.crypto.keys import PublicKey from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID from libp2p.security.base_session import BaseSession from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn from libp2p.typing import TProtocol +from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes -PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/1.0.0") +from .exceptions import UpgradeFailure +from .pb import plaintext_pb2 + +# Reference: https://github.com/libp2p/go-libp2p-core/blob/master/sec/insecure/insecure.go + + +PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/2.0.0") class InsecureSession(BaseSession): - pass + # FIXME: Update the read/write to `BaseSession` + async def run_handshake(self): + msg = make_exchange_message(self.local_private_key.get_public_key()) + self.writer.write(encode_varint_prefixed(msg.SerializeToString())) + await self.writer.drain() + + msg_bytes_other_side = await read_varint_prefixed_bytes(self.reader) + msg_other_side = plaintext_pb2.Exchange() + msg_other_side.ParseFromString(msg_bytes_other_side) + # TODO: Verify public key with peer id + # TODO: Store public key + self.remote_peer_id = ID(msg_other_side.id) class InsecureTransport(BaseSecureTransport): @@ -24,7 +43,9 @@ class InsecureTransport(BaseSecureTransport): for an inbound connection (i.e. we are not the initiator) :return: secure connection object (that implements secure_conn_interface) """ - return InsecureSession(self, conn, ID(b"")) + session = InsecureSession(self, conn, ID(b"")) + await session.run_handshake() + return session async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: """ @@ -32,4 +53,19 @@ class InsecureTransport(BaseSecureTransport): for an inbound connection (i.e. we are the initiator) :return: secure connection object (that implements secure_conn_interface) """ - return InsecureSession(self, conn, peer_id) + session = InsecureSession(self, conn, peer_id) + await session.run_handshake() + # TODO: Check if `remote_public_key is not None`. If so, check if `session.remote_peer` + received_peer_id = session.get_remote_peer() + if session.get_remote_peer() != peer_id: + raise UpgradeFailure( + "remote peer sent unexpected peer ID. " + f"expected={peer_id} received={received_peer_id}" + ) + return session + + +def make_exchange_message(pubkey: PublicKey) -> plaintext_pb2.Exchange: + pubkey_pb = pubkey.serialize_to_protobuf() + id_bytes = ID.from_pubkey(pubkey).to_bytes() + return plaintext_pb2.Exchange(id=id_bytes, pubkey=pubkey_pb) diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index aeae0ba..16d1019 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -9,11 +9,11 @@ from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.typing import TProtocol +from libp2p.utils import decode_uvarint_from_stream, encode_uvarint from .constants import HeaderTags from .exceptions import StreamNotFound from .mplex_stream import MplexStream -from libp2p.utils import decode_uvarint_from_stream, encode_uvarint MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0") diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index aee0313..6f6d4d9 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -75,18 +75,6 @@ class TCP(ITransport): reader, writer = await asyncio.open_connection(host, int(port)) - # TODO: Change this `sending peer id` process to `/plaintext/2.0.0` - # First: send our peer ID so receiver knows it - writer.write(self_id.to_base58().encode()) - await writer.drain() - - # Await ack for peer id - expected_ack_str = "received peer id" - ack = (await reader.read(len(expected_ack_str))).decode() - - if ack != expected_ack_str: - raise Exception("Receiver did not receive peer id") - return RawConnection(host, port, reader, writer, True) def create_listener(self, handler_function: THandler) -> TCPListener: diff --git a/libp2p/utils.py b/libp2p/utils.py index 9ecaa83..cb7c485 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -4,8 +4,7 @@ from typing import Tuple from libp2p.typing import StreamReader - -TIMEOUT = 10 +TIMEOUT = 1 def encode_uvarint(number: int) -> bytes: From bb7d37fd4f686cacad0082a79d35200a9085f780 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 17 Aug 2019 21:41:17 +0800 Subject: [PATCH 06/19] Fix msg encoding - Change varint-prefix encode to fixedint-prefix(4 bytes) encode. --- libp2p/network/swarm.py | 18 +++++++++--------- libp2p/protocol_muxer/multiselect.py | 4 +++- libp2p/security/insecure/transport.py | 9 ++++++--- libp2p/utils.py | 22 ++++++++++++++++++++-- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index badd16c..e4d71a6 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -177,8 +177,8 @@ class Swarm(INetwork): Call listener listen with the multiaddr Map multiaddr to listener """ - for multiaddr in multiaddrs: - if str(multiaddr) in self.listeners: + for maddr in multiaddrs: + if str(maddr) in self.listeners: return True async def conn_handler( @@ -187,8 +187,8 @@ class Swarm(INetwork): # Upgrade reader/write to a net_stream and pass \ # to appropriate stream handler (using multiaddr) raw_conn = RawConnection( - multiaddr.value_for_protocol("ip4"), - multiaddr.value_for_protocol("tcp"), + maddr.value_for_protocol("ip4"), + maddr.value_for_protocol("tcp"), reader, writer, False, @@ -215,19 +215,19 @@ class Swarm(INetwork): try: # Success listener = self.transport.create_listener(conn_handler) - self.listeners[str(multiaddr)] = listener - await listener.listen(multiaddr) + self.listeners[str(maddr)] = listener + await listener.listen(maddr) # Call notifiers since event occurred for notifee in self.notifees: - await notifee.listen(self, multiaddr) + await notifee.listen(self, maddr) return True except IOError: # Failed. Continue looping. - print("Failed to connect to: " + str(multiaddr)) + print("Failed to connect to: " + str(maddr)) - # No multiaddr succeeded + # No maddr succeeded return False def notify(self, notifee: INotifee) -> bool: diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index c854415..8ce66d9 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -79,7 +79,9 @@ class Multiselect(IMultiselectMuxer): # Confirm that the protocols are the same if not validate_handshake(handshake_contents): - raise MultiselectError("multiselect protocol ID mismatch") + raise MultiselectError( + f"multiselect protocol ID mismatch: handshake_contents={handshake_contents}" + ) # Handshake succeeded if this point is reached diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 5f00fc1..7f26820 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -5,7 +5,7 @@ from libp2p.security.base_session import BaseSession from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn from libp2p.typing import TProtocol -from libp2p.utils import encode_varint_prefixed, read_varint_prefixed_bytes +from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed from .exceptions import UpgradeFailure from .pb import plaintext_pb2 @@ -20,12 +20,15 @@ class InsecureSession(BaseSession): # FIXME: Update the read/write to `BaseSession` async def run_handshake(self): msg = make_exchange_message(self.local_private_key.get_public_key()) - self.writer.write(encode_varint_prefixed(msg.SerializeToString())) + msg_bytes = msg.SerializeToString() + encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) + self.writer.write(encoded_msg_bytes) await self.writer.drain() - msg_bytes_other_side = await read_varint_prefixed_bytes(self.reader) + msg_bytes_other_side = await read_fixedint_prefixed(self.reader) msg_other_side = plaintext_pb2.Exchange() msg_other_side.ParseFromString(msg_bytes_other_side) + # TODO: Verify public key with peer id # TODO: Store public key self.remote_peer_id = ID(msg_other_side.id) diff --git a/libp2p/utils.py b/libp2p/utils.py index cb7c485..5fbc8ac 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -4,7 +4,7 @@ from typing import Tuple from libp2p.typing import StreamReader -TIMEOUT = 1 +TIMEOUT = 10 def encode_uvarint(number: int) -> bytes: @@ -64,7 +64,8 @@ async def read_varint_prefixed_bytes( return await reader.read(len_msg) -# Delimited read/write +# Delimited read/write, used by multistream-select. +# Reference: https://github.com/gogo/protobuf/blob/07eab6a8298cf32fac45cceaac59424f98421bbc/io/varint.go#L109-L126 # noqa: E501 def encode_delim(msg_str: str) -> bytes: @@ -75,3 +76,20 @@ def encode_delim(msg_str: str) -> bytes: async def read_delim(reader: StreamReader, timeout: int = TIMEOUT) -> str: msg_bytes = await read_varint_prefixed_bytes(reader, timeout) return msg_bytes.decode().rstrip() + + +SIZE_LEN_BYTES = 4 + +# Fixed-prefixed read/write, used by "/plaintext/2.0.0". +# Reference: https://github.com/libp2p/go-msgio/blob/d5bbf59d3c4240266b1d2e5df9dc993454c42011/num.go#L11-L33 # noqa: E501 # noqa: E501 + + +def encode_fixedint_prefixed(msg_bytes: bytes) -> bytes: + len_prefix = len(msg_bytes).to_bytes(SIZE_LEN_BYTES, "big") + return len_prefix + msg_bytes + + +async def read_fixedint_prefixed(reader: StreamReader) -> bytes: + len_bytes = await reader.read(SIZE_LEN_BYTES) + len_int = int.from_bytes(len_bytes, "big") + return await reader.read(len_int) From de8d356955f097415c41a82382bab0723f977bc0 Mon Sep 17 00:00:00 2001 From: mhchia Date: Sat, 17 Aug 2019 22:11:08 +0800 Subject: [PATCH 07/19] Fix tests failure due to lack of peer id Fix it through doing plaintext handshake. --- libp2p/security/simple/transport.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/libp2p/security/simple/transport.py b/libp2p/security/simple/transport.py index 7a19a9f..8eed2a6 100644 --- a/libp2p/security/simple/transport.py +++ b/libp2p/security/simple/transport.py @@ -30,6 +30,10 @@ class SimpleSecurityTransport(BaseSecureTransport): ) session = InsecureSession(self, conn, ID(b"")) + # TODO: Calls handshake to make them know the peer id each other, otherwise tests fail. + # However, it seems pretty weird that `SimpleSecurityTransport` sends peer id through + # `Insecure`. + await session.run_handshake() # NOTE: this is abusing the abstraction we have here # but this code may be deprecated soon and this exists # mainly to satisfy a test that will go along w/ it @@ -56,6 +60,10 @@ class SimpleSecurityTransport(BaseSecureTransport): ) session = InsecureSession(self, conn, peer_id) + # TODO: Calls handshake to make them know the peer id each other, otherwise tests fail. + # However, it seems pretty weird that `SimpleSecurityTransport` sends peer id through + # `Insecure`. + await session.run_handshake() # NOTE: this is abusing the abstraction we have here # but this code may be deprecated soon and this exists # mainly to satisfy a test that will go along w/ it From 0b62321265332175ab41bbd05e83c75cb62ba82c Mon Sep 17 00:00:00 2001 From: mhchia Date: Mon, 19 Aug 2019 13:04:11 +0800 Subject: [PATCH 08/19] Fix `test_security_multistream` By passing initiator keypairs to node. --- tests/security/test_security_multistream.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index ddbae8e..9a380b1 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -1,14 +1,13 @@ import asyncio -import multiaddr import pytest from libp2p import new_node from libp2p.crypto.rsa import create_new_key_pair -from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.protocol_muxer.multiselect_client import MultiselectClientError from libp2p.security.insecure.transport import InsecureSession, InsecureTransport from libp2p.security.simple.transport import SimpleSecurityTransport +from tests.configs import LISTEN_MADDR from tests.utils import cleanup, connect # TODO: Add tests for multiple streams being opened on different @@ -16,9 +15,7 @@ from tests.utils import cleanup, connect def peer_id_for_node(node): - addr = node.get_addrs()[0] - info = info_from_p2p_addr(addr) - return info.peer_id + return node.get_id() initiator_key_pair = create_new_key_pair() @@ -35,14 +32,16 @@ async def perform_simple_test( # TODO: implement -- note we need to introduce the notion of communicating over a raw connection # for testing, we do NOT want to communicate over a stream so we can't just create two nodes # and use their conn because our mplex will internally relay messages to a stream - sec_opt1 = transports_for_initiator - sec_opt2 = transports_for_noninitiator - node1 = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"], sec_opt=sec_opt1) - node2 = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"], sec_opt=sec_opt2) + node1 = await new_node( + key_pair=initiator_key_pair, sec_opt=transports_for_initiator + ) + node2 = await new_node( + key_pair=noninitiator_key_pair, sec_opt=transports_for_noninitiator + ) - await node1.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node2.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node1.get_network().listen(LISTEN_MADDR) + await node2.get_network().listen(LISTEN_MADDR) await connect(node1, node2) From 2a1367b011c6b051a19cc13bdd15148ee27e3033 Mon Sep 17 00:00:00 2001 From: Kevin Mai-Husan Chia Date: Tue, 20 Aug 2019 16:00:27 +0800 Subject: [PATCH 09/19] Apply suggestions from code review Co-Authored-By: NIC Lin --- libp2p/security/insecure/transport.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 7f26820..64f08f0 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -60,7 +60,7 @@ class InsecureTransport(BaseSecureTransport): await session.run_handshake() # TODO: Check if `remote_public_key is not None`. If so, check if `session.remote_peer` received_peer_id = session.get_remote_peer() - if session.get_remote_peer() != peer_id: + if received_peer_id != peer_id: raise UpgradeFailure( "remote peer sent unexpected peer ID. " f"expected={peer_id} received={received_peer_id}" From 5768daa9bfad0f9f85e5d5338122a932aca56341 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 20 Aug 2019 16:42:34 +0800 Subject: [PATCH 10/19] PR feedbacks - Nits - Add `SecurityUpgradeFailure` and handle `UpgradeFailure` in Swarm. --- libp2p/network/swarm.py | 20 +++++++++++++------- libp2p/protocol_muxer/multiselect.py | 3 ++- libp2p/security/insecure/exceptions.py | 2 -- libp2p/security/insecure/transport.py | 5 ++--- libp2p/security/simple/transport.py | 17 +++++++++-------- libp2p/transport/exceptions.py | 7 +++++++ 6 files changed, 33 insertions(+), 21 deletions(-) delete mode 100644 libp2p/security/insecure/exceptions.py create mode 100644 libp2p/transport/exceptions.py diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index e4d71a6..19d1b76 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -10,6 +10,7 @@ from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator from libp2p.routing.interfaces import IPeerRouting from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream +from libp2p.transport.exceptions import UpgradeFailure from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader @@ -197,13 +198,18 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn # FIXME: This dummy `ID(b"")` for the remote peer is useless. - secured_conn = await self.upgrader.upgrade_security( - raw_conn, ID(b""), False - ) - peer_id = secured_conn.get_remote_peer() - muxed_conn = await self.upgrader.upgrade_connection( - secured_conn, self.generic_protocol_handler, peer_id - ) + try: + secured_conn = await self.upgrader.upgrade_security( + raw_conn, ID(b""), False + ) + peer_id = secured_conn.get_remote_peer() + muxed_conn = await self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) + except UpgradeFailure: + # TODO: Add logging to indicate the failure + raw_conn.close() + return # Store muxed_conn with peer id self.connections[peer_id] = muxed_conn diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 8ce66d9..9fd3de8 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -80,7 +80,8 @@ class Multiselect(IMultiselectMuxer): # Confirm that the protocols are the same if not validate_handshake(handshake_contents): raise MultiselectError( - f"multiselect protocol ID mismatch: handshake_contents={handshake_contents}" + "multiselect protocol ID mismatch: " + f"received handshake_contents={handshake_contents}" ) # Handshake succeeded if this point is reached diff --git a/libp2p/security/insecure/exceptions.py b/libp2p/security/insecure/exceptions.py deleted file mode 100644 index d2570e7..0000000 --- a/libp2p/security/insecure/exceptions.py +++ /dev/null @@ -1,2 +0,0 @@ -class UpgradeFailure(Exception): - pass diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 64f08f0..337d20f 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -4,10 +4,10 @@ from libp2p.peer.id import ID from libp2p.security.base_session import BaseSession from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.transport.exceptions import SecurityUpgradeFailure from libp2p.typing import TProtocol from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed -from .exceptions import UpgradeFailure from .pb import plaintext_pb2 # Reference: https://github.com/libp2p/go-libp2p-core/blob/master/sec/insecure/insecure.go @@ -17,7 +17,6 @@ PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/2.0.0") class InsecureSession(BaseSession): - # FIXME: Update the read/write to `BaseSession` async def run_handshake(self): msg = make_exchange_message(self.local_private_key.get_public_key()) msg_bytes = msg.SerializeToString() @@ -61,7 +60,7 @@ class InsecureTransport(BaseSecureTransport): # TODO: Check if `remote_public_key is not None`. If so, check if `session.remote_peer` received_peer_id = session.get_remote_peer() if received_peer_id != peer_id: - raise UpgradeFailure( + raise SecurityUpgradeFailure( "remote peer sent unexpected peer ID. " f"expected={peer_id} received={received_peer_id}" ) diff --git a/libp2p/security/simple/transport.py b/libp2p/security/simple/transport.py index 8eed2a6..e63e651 100644 --- a/libp2p/security/simple/transport.py +++ b/libp2p/security/simple/transport.py @@ -6,6 +6,7 @@ from libp2p.peer.id import ID from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.insecure.transport import InsecureSession from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.transport.exceptions import SecurityUpgradeFailure class SimpleSecurityTransport(BaseSecureTransport): @@ -25,14 +26,14 @@ class SimpleSecurityTransport(BaseSecureTransport): incoming = (await conn.read()).decode() if incoming != self.key_phrase: - raise Exception( + raise SecurityUpgradeFailure( "Key phrase differed between nodes. Expected " + self.key_phrase ) session = InsecureSession(self, conn, ID(b"")) - # TODO: Calls handshake to make them know the peer id each other, otherwise tests fail. - # However, it seems pretty weird that `SimpleSecurityTransport` sends peer id through - # `Insecure`. + # NOTE: Here we calls `run_handshake` for both sides to exchange their public keys and + # peer ids, otherwise tests fail. However, it seems pretty weird that + # `SimpleSecurityTransport` sends peer id through `Insecure`. await session.run_handshake() # NOTE: this is abusing the abstraction we have here # but this code may be deprecated soon and this exists @@ -55,14 +56,14 @@ class SimpleSecurityTransport(BaseSecureTransport): await asyncio.sleep(0) if incoming != self.key_phrase: - raise Exception( + raise SecurityUpgradeFailure( "Key phrase differed between nodes. Expected " + self.key_phrase ) session = InsecureSession(self, conn, peer_id) - # TODO: Calls handshake to make them know the peer id each other, otherwise tests fail. - # However, it seems pretty weird that `SimpleSecurityTransport` sends peer id through - # `Insecure`. + # NOTE: Here we calls `run_handshake` for both sides to exchange their public keys and + # peer ids, otherwise tests fail. However, it seems pretty weird that + # `SimpleSecurityTransport` sends peer id through `Insecure`. await session.run_handshake() # NOTE: this is abusing the abstraction we have here # but this code may be deprecated soon and this exists diff --git a/libp2p/transport/exceptions.py b/libp2p/transport/exceptions.py new file mode 100644 index 0000000..5826f83 --- /dev/null +++ b/libp2p/transport/exceptions.py @@ -0,0 +1,7 @@ +# TODO: Add `BaseLibp2pError` and `UpgradeFailure` can inherit from it? +class UpgradeFailure(Exception): + pass + + +class SecurityUpgradeFailure(UpgradeFailure): + pass From 0b466ddc86762b08448c8ab7329e10613e8f4772 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 20 Aug 2019 17:09:38 +0800 Subject: [PATCH 11/19] Add lock to `RawConnection` To avoid `self.writer.drain()` is called in parallel. Reference: https://bugs.python.org/issue29930 --- libp2p/network/connection/raw_connection.py | 15 +++++++--- .../multiselect_communicator.py | 13 ++++---- libp2p/security/insecure/transport.py | 3 +- libp2p/stream_muxer/mplex/mplex.py | 3 +- libp2p/utils.py | 30 +++++++++++-------- 5 files changed, 37 insertions(+), 27 deletions(-) diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 3d12f0d..3277901 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -9,9 +9,11 @@ class RawConnection(IRawConnection): conn_port: str reader: asyncio.StreamReader writer: asyncio.StreamWriter - _next_id: int initiator: bool + _drain_lock: asyncio.Lock + _next_id: int + def __init__( self, ip: str, @@ -24,13 +26,18 @@ class RawConnection(IRawConnection): self.conn_port = port self.reader = reader self.writer = writer - self._next_id = 0 if initiator else 1 self.initiator = initiator + self._drain_lock = asyncio.Lock() + self._next_id = 0 if initiator else 1 + async def write(self, data: bytes) -> None: self.writer.write(data) - self.writer.write("\n".encode()) - await self.writer.drain() + # Reference: https://github.com/ethereum/lahja/blob/93610b2eb46969ff1797e0748c7ac2595e130aef/lahja/asyncio/endpoint.py#L99-L102 # noqa: E501 + # Use a lock to serialize drain() calls. Circumvents this bug: + # https://bugs.python.org/issue29930 + async with self._drain_lock: + await self.writer.drain() async def read(self) -> bytes: line = await self.reader.readline() diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index ebfcc23..e01e9cc 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -12,12 +12,12 @@ class RawConnectionCommunicator(IMultiselectCommunicator): self.conn = conn async def write(self, msg_str: str) -> None: - msg_bytes = encode_delim(msg_str) - self.conn.writer.write(msg_bytes) - await self.conn.writer.drain() + msg_bytes = encode_delim(msg_str.encode()) + await self.conn.write(msg_bytes) async def read(self) -> str: - return await read_delim(self.conn.reader) + data = await read_delim(self.conn.reader) + return data.decode() class StreamCommunicator(IMultiselectCommunicator): @@ -27,8 +27,9 @@ class StreamCommunicator(IMultiselectCommunicator): self.stream = stream async def write(self, msg_str: str) -> None: - msg_bytes = encode_delim(msg_str) + msg_bytes = encode_delim(msg_str.encode()) await self.stream.write(msg_bytes) async def read(self) -> str: - return await read_delim(self.stream) + data = await read_delim(self.stream) + return data.decode() diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 337d20f..fa4a1a8 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -21,8 +21,7 @@ class InsecureSession(BaseSession): msg = make_exchange_message(self.local_private_key.get_public_key()) msg_bytes = msg.SerializeToString() encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) - self.writer.write(encoded_msg_bytes) - await self.writer.drain() + await self.write(encoded_msg_bytes) msg_bytes_other_side = await read_fixedint_prefixed(self.reader) msg_other_side = plaintext_pb2.Exchange() diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 16d1019..765dd56 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -150,8 +150,7 @@ class Mplex(IMuxedConn): :param _bytes: byte array to write :return: length written """ - self.conn.writer.write(_bytes) - await self.conn.writer.drain() + await self.conn.write(_bytes) return len(_bytes) async def handle_incoming(self) -> None: diff --git a/libp2p/utils.py b/libp2p/utils.py index 5fbc8ac..9a1f0cb 100644 --- a/libp2p/utils.py +++ b/libp2p/utils.py @@ -4,8 +4,6 @@ from typing import Tuple from libp2p.typing import StreamReader -TIMEOUT = 10 - def encode_uvarint(number: int) -> bytes: """Pack `number` into varint bytes""" @@ -57,25 +55,31 @@ def encode_varint_prefixed(msg_bytes: bytes) -> bytes: return varint_len + msg_bytes -async def read_varint_prefixed_bytes( - reader: StreamReader, timeout: int = TIMEOUT -) -> bytes: - len_msg = await decode_uvarint_from_stream(reader, timeout) - return await reader.read(len_msg) +async def read_varint_prefixed_bytes(reader: StreamReader) -> bytes: + len_msg = await decode_uvarint_from_stream(reader, None) + data = await reader.read(len_msg) + if len(data) != len_msg: + raise ValueError( + f"failed to read enough bytes: len_msg={len_msg}, data={data!r}" + ) + return data # Delimited read/write, used by multistream-select. # Reference: https://github.com/gogo/protobuf/blob/07eab6a8298cf32fac45cceaac59424f98421bbc/io/varint.go#L109-L126 # noqa: E501 -def encode_delim(msg_str: str) -> bytes: - delimited_msg = msg_str + "\n" - return encode_varint_prefixed(delimited_msg.encode()) +def encode_delim(msg: bytes) -> bytes: + delimited_msg = msg + b"\n" + return encode_varint_prefixed(delimited_msg) -async def read_delim(reader: StreamReader, timeout: int = TIMEOUT) -> str: - msg_bytes = await read_varint_prefixed_bytes(reader, timeout) - return msg_bytes.decode().rstrip() +async def read_delim(reader: StreamReader) -> bytes: + msg_bytes = await read_varint_prefixed_bytes(reader) + # TODO: Investigate if it is possible to have empty `msg_bytes` + if len(msg_bytes) != 0 and msg_bytes[-1:] != b"\n": + raise ValueError(f'msg_bytes is not delimited by b"\\n": msg_bytes={msg_bytes}') + return msg_bytes[:-1] SIZE_LEN_BYTES = 4 From ef476e555bb41d255d26301374eb3e27ee9cb28b Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 20 Aug 2019 18:09:36 +0800 Subject: [PATCH 12/19] Use RawConnection.read Instead of accessing its reader and writer directly. TODO: considering add `ReaderWriterCloser` interface and let connection and stream inherit from it. --- libp2p/network/connection/raw_connection.py | 9 ++++++--- libp2p/network/connection/raw_connection_interface.py | 8 +------- libp2p/protocol_muxer/multiselect_communicator.py | 2 +- libp2p/security/base_session.py | 6 ++---- libp2p/security/insecure/transport.py | 2 +- libp2p/security/security_multistream.py | 4 ---- libp2p/security/simple/transport.py | 9 +++++---- libp2p/stream_muxer/mplex/mplex.py | 8 +++----- libp2p/typing.py | 5 +++-- 9 files changed, 22 insertions(+), 31 deletions(-) diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 3277901..0d20de5 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -39,9 +39,12 @@ class RawConnection(IRawConnection): async with self._drain_lock: await self.writer.drain() - async def read(self) -> bytes: - line = await self.reader.readline() - return line.rstrip(b"\n") + async def read(self, n: int = -1) -> bytes: + """ + Read up to ``n`` bytes from the underlying stream. + This call is delegated directly to the underlying ``self.reader``. + """ + return await self.reader.read(n) def close(self) -> None: self.writer.close() diff --git a/libp2p/network/connection/raw_connection_interface.py b/libp2p/network/connection/raw_connection_interface.py index 1810f58..fd1b469 100644 --- a/libp2p/network/connection/raw_connection_interface.py +++ b/libp2p/network/connection/raw_connection_interface.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -import asyncio class IRawConnection(ABC): @@ -9,17 +8,12 @@ class IRawConnection(ABC): initiator: bool - # TODO: reader and writer shouldn't be exposed. - # Need better API for the consumers - reader: asyncio.StreamReader - writer: asyncio.StreamWriter - @abstractmethod async def write(self, data: bytes) -> None: pass @abstractmethod - async def read(self) -> bytes: + async def read(self, n: int = -1) -> bytes: pass @abstractmethod diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index e01e9cc..e252304 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -16,7 +16,7 @@ class RawConnectionCommunicator(IMultiselectCommunicator): await self.conn.write(msg_bytes) async def read(self) -> str: - data = await read_delim(self.conn.reader) + data = await read_delim(self.conn) return data.decode() diff --git a/libp2p/security/base_session.py b/libp2p/security/base_session.py index ba14037..a41c52b 100644 --- a/libp2p/security/base_session.py +++ b/libp2p/security/base_session.py @@ -23,8 +23,6 @@ class BaseSession(ISecureConn): self.remote_permanent_pubkey = None self.initiator = self.conn.initiator - self.writer = self.conn.writer - self.reader = self.conn.reader # TODO clean up how this is passed around? def next_stream_id(self) -> int: @@ -33,8 +31,8 @@ class BaseSession(ISecureConn): async def write(self, data: bytes) -> None: await self.conn.write(data) - async def read(self) -> bytes: - return await self.conn.read() + async def read(self, n: int = -1) -> bytes: + return await self.conn.read(n) def close(self) -> None: self.conn.close() diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index fa4a1a8..c16713b 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -23,7 +23,7 @@ class InsecureSession(BaseSession): encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) await self.write(encoded_msg_bytes) - msg_bytes_other_side = await read_fixedint_prefixed(self.reader) + msg_bytes_other_side = await read_fixedint_prefixed(self.conn) msg_other_side = plaintext_pb2.Exchange() msg_other_side.ParseFromString(msg_bytes_other_side) diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index ec24b46..6e69d7a 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -87,10 +87,6 @@ class SecurityMultistream(ABC): :param initiator: true if we are the initiator, false otherwise :return: selected secure transport """ - # TODO: Is conn acceptable to multiselect/multiselect_client - # instead of stream? In go repo, they pass in a raw conn - # (https://raw.githubusercontent.com/libp2p/go-conn-security-multistream/master/ssms.go) - protocol: TProtocol communicator = RawConnectionCommunicator(conn) if initiator: diff --git a/libp2p/security/simple/transport.py b/libp2p/security/simple/transport.py index e63e651..e70edcc 100644 --- a/libp2p/security/simple/transport.py +++ b/libp2p/security/simple/transport.py @@ -7,6 +7,7 @@ from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.insecure.transport import InsecureSession from libp2p.security.secure_conn_interface import ISecureConn from libp2p.transport.exceptions import SecurityUpgradeFailure +from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed class SimpleSecurityTransport(BaseSecureTransport): @@ -22,8 +23,8 @@ class SimpleSecurityTransport(BaseSecureTransport): for an inbound connection (i.e. we are not the initiator) :return: secure connection object (that implements secure_conn_interface) """ - await conn.write(self.key_phrase.encode()) - incoming = (await conn.read()).decode() + await conn.write(encode_fixedint_prefixed(self.key_phrase.encode())) + incoming = (await read_fixedint_prefixed(conn)).decode() if incoming != self.key_phrase: raise SecurityUpgradeFailure( @@ -48,8 +49,8 @@ class SimpleSecurityTransport(BaseSecureTransport): for an inbound connection (i.e. we are the initiator) :return: secure connection object (that implements secure_conn_interface) """ - await conn.write(self.key_phrase.encode()) - incoming = (await conn.read()).decode() + await conn.write(encode_fixedint_prefixed(self.key_phrase.encode())) + incoming = (await read_fixedint_prefixed(conn)).decode() # Force context switch, as this security transport is built for testing locally # in a single event loop diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 765dd56..8f95124 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -188,11 +188,9 @@ class Mplex(IMuxedConn): # loop in handle_incoming timeout = 0.1 try: - header = await decode_uvarint_from_stream(self.conn.reader, timeout) - length = await decode_uvarint_from_stream(self.conn.reader, timeout) - message = await asyncio.wait_for( - self.conn.reader.read(length), timeout=timeout - ) + header = await decode_uvarint_from_stream(self.conn, timeout) + length = await decode_uvarint_from_stream(self.conn, timeout) + message = await asyncio.wait_for(self.conn.read(length), timeout=timeout) except asyncio.TimeoutError: return None, None, None diff --git a/libp2p/typing.py b/libp2p/typing.py index 9810746..f36d8ab 100644 --- a/libp2p/typing.py +++ b/libp2p/typing.py @@ -1,6 +1,7 @@ -import asyncio from typing import TYPE_CHECKING, Awaitable, Callable, NewType, Union +from libp2p.network.connection.raw_connection_interface import IRawConnection + if TYPE_CHECKING: from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401 from libp2p.stream_muxer.abc import IMuxedStream # noqa: F401 @@ -9,4 +10,4 @@ TProtocol = NewType("TProtocol", str) StreamHandlerFn = Callable[["INetStream"], Awaitable[None]] -StreamReader = Union["IMuxedStream", asyncio.StreamReader] +StreamReader = Union["IMuxedStream", IRawConnection] From 921bfb65ccd60d770b3cc99c8353c2c1a0d21265 Mon Sep 17 00:00:00 2001 From: mhchia Date: Tue, 20 Aug 2019 23:54:33 +0800 Subject: [PATCH 13/19] Verify the remote pubkey and peer_id - Add `from_bytes` in RSAPublicKey and Secp256k1PublicKey - Add `pubkey_from_protobuf` to parse pubkey from protobuf - Verify key and peer_id in `InsecureSession.run_handshake` --- libp2p/crypto/rsa.py | 10 +++++++++ libp2p/crypto/secp256k1.py | 5 +++++ libp2p/crypto/utils.py | 16 ++++++++++++++ libp2p/security/base_session.py | 6 ++++++ libp2p/security/insecure/transport.py | 31 +++++++++++++++++++++------ 5 files changed, 62 insertions(+), 6 deletions(-) create mode 100644 libp2p/crypto/utils.py diff --git a/libp2p/crypto/rsa.py b/libp2p/crypto/rsa.py index ceea008..a0f8439 100644 --- a/libp2p/crypto/rsa.py +++ b/libp2p/crypto/rsa.py @@ -11,6 +11,11 @@ class RSAPublicKey(PublicKey): def to_bytes(self) -> bytes: return self.impl.export_key("DER") + @classmethod + def from_bytes(cls, key_bytes: bytes) -> "RSAPublicKey": + rsakey = RSA.import_key(key_bytes) + return cls(rsakey) + def get_type(self) -> KeyType: return KeyType.RSA @@ -30,6 +35,11 @@ class RSAPrivateKey(PrivateKey): def to_bytes(self) -> bytes: return self.impl.export_key("DER") + @classmethod + def from_bytes(cls, key_bytes: bytes) -> "RSAPrivateKey": + rsakey = RSA.import_key(key_bytes) + return cls(rsakey) + def get_type(self) -> KeyType: return KeyType.RSA diff --git a/libp2p/crypto/secp256k1.py b/libp2p/crypto/secp256k1.py index 79ffc9d..e2d5fb2 100644 --- a/libp2p/crypto/secp256k1.py +++ b/libp2p/crypto/secp256k1.py @@ -10,6 +10,11 @@ class Secp256k1PublicKey(PublicKey): def to_bytes(self) -> bytes: return self.impl.format() + @classmethod + def from_bytes(cls, key_bytes: bytes) -> "Secp256k1PublicKey": + secp256k1_pubkey = coincurve.PublicKey(key_bytes) + return cls(secp256k1_pubkey) + def get_type(self) -> KeyType: return KeyType.Secp256k1 diff --git a/libp2p/crypto/utils.py b/libp2p/crypto/utils.py new file mode 100644 index 0000000..78b8b7b --- /dev/null +++ b/libp2p/crypto/utils.py @@ -0,0 +1,16 @@ +from .keys import PublicKey +from .pb import crypto_pb2 as protobuf +from .rsa import RSAPublicKey +from .secp256k1 import Secp256k1PublicKey + + +def pubkey_from_protobuf(pubkey_pb: protobuf.PublicKey) -> PublicKey: + if pubkey_pb.key_type == protobuf.RSA: + return RSAPublicKey.from_bytes(pubkey_pb.data) + # TODO: Test against secp256k1 keys + elif pubkey_pb.key_type == protobuf.Secp256k1: + return Secp256k1PublicKey.from_bytes(pubkey_pb.data) + else: + raise ValueError( + f"unsupported key_type={pubkey_pb.key_type}, data={pubkey_pb.data!r}" + ) diff --git a/libp2p/security/base_session.py b/libp2p/security/base_session.py index a41c52b..c4a7275 100644 --- a/libp2p/security/base_session.py +++ b/libp2p/security/base_session.py @@ -13,6 +13,12 @@ class BaseSession(ISecureConn): is only meant to be used in clases that derive from it. """ + local_peer: ID + local_private_key: PrivateKey + conn: IRawConnection + remote_peer_id: ID + remote_permanent_pubkey: PublicKey + def __init__( self, transport: BaseSecureTransport, conn: IRawConnection, peer_id: ID ) -> None: diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index c16713b..1048791 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -1,4 +1,5 @@ from libp2p.crypto.keys import PublicKey +from libp2p.crypto.utils import pubkey_from_protobuf from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID from libp2p.security.base_session import BaseSession @@ -23,13 +24,31 @@ class InsecureSession(BaseSession): encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) await self.write(encoded_msg_bytes) - msg_bytes_other_side = await read_fixedint_prefixed(self.conn) - msg_other_side = plaintext_pb2.Exchange() - msg_other_side.ParseFromString(msg_bytes_other_side) + remote_msg_bytes = await read_fixedint_prefixed(self.conn) + remote_msg = plaintext_pb2.Exchange() + remote_msg.ParseFromString(remote_msg_bytes) - # TODO: Verify public key with peer id - # TODO: Store public key - self.remote_peer_id = ID(msg_other_side.id) + # Verify if the given `pubkey` matches the given `peer_id` + try: + remote_pubkey = pubkey_from_protobuf(remote_msg.pubkey) + except ValueError as error: + raise SecurityUpgradeFailure( + f"unknown protocol of remote_msg.pubkey={remote_msg.pubkey}" + ) from error + remote_peer_id = ID(remote_msg.id) + remote_peer_id_from_pubkey = ID.from_pubkey(remote_pubkey) + if remote_peer_id_from_pubkey != remote_peer_id: + raise SecurityUpgradeFailure( + "peer id and pubkey from the remote mismatch: " + f"remote_peer_id={remote_peer_id}, remote_pubkey={remote_pubkey}, " + f"remote_peer_id_from_pubkey={remote_peer_id_from_pubkey}" + ) + + # Nothing is wrong. Store the `pubkey` and `peer_id` in the session. + self.remote_permanent_pubkey = remote_pubkey + self.remote_peer_id = ID(remote_msg.id) + + # TODO: Store `pubkey` and `peer_id` to `PeerStore` class InsecureTransport(BaseSecureTransport): From 80452d958907cc3a26ffee1df186de119578cd28 Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 21 Aug 2019 13:59:42 +0800 Subject: [PATCH 14/19] Fix `make_exchange_message` to use the new API --- libp2p/security/insecure/transport.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 1048791..850bdf3 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -1,4 +1,5 @@ from libp2p.crypto.keys import PublicKey +from libp2p.crypto.pb import crypto_pb2 from libp2p.crypto.utils import pubkey_from_protobuf from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID @@ -86,6 +87,8 @@ class InsecureTransport(BaseSecureTransport): def make_exchange_message(pubkey: PublicKey) -> plaintext_pb2.Exchange: - pubkey_pb = pubkey.serialize_to_protobuf() + pubkey_pb = crypto_pb2.PublicKey( + key_type=pubkey.get_type().value, data=pubkey.to_bytes() + ) id_bytes = ID.from_pubkey(pubkey).to_bytes() return plaintext_pb2.Exchange(id=id_bytes, pubkey=pubkey_pb) From 3e04480d622ab652f915d86ea31f04ba88445d6b Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 21 Aug 2019 15:12:35 +0800 Subject: [PATCH 15/19] Raise `HandshakeFailure` in transport Change the exception handling flow. Raise `SecurityUpgradeFailure` in security_multistream. --- libp2p/crypto/utils.py | 1 + libp2p/exceptions.py | 8 +++-- libp2p/security/insecure/transport.py | 17 +++++----- libp2p/security/security_multistream.py | 36 +++++++++++++++++---- libp2p/transport/exceptions.py | 9 +++++- tests/security/test_security_multistream.py | 4 +-- 6 files changed, 54 insertions(+), 21 deletions(-) diff --git a/libp2p/crypto/utils.py b/libp2p/crypto/utils.py index 78b8b7b..8519598 100644 --- a/libp2p/crypto/utils.py +++ b/libp2p/crypto/utils.py @@ -10,6 +10,7 @@ def pubkey_from_protobuf(pubkey_pb: protobuf.PublicKey) -> PublicKey: # TODO: Test against secp256k1 keys elif pubkey_pb.key_type == protobuf.Secp256k1: return Secp256k1PublicKey.from_bytes(pubkey_pb.data) + # TODO: Support `Ed25519` and `ECDSA` in the future? else: raise ValueError( f"unsupported key_type={pubkey_pb.key_type}, data={pubkey_pb.data!r}" diff --git a/libp2p/exceptions.py b/libp2p/exceptions.py index 8d8af44..0ea0078 100644 --- a/libp2p/exceptions.py +++ b/libp2p/exceptions.py @@ -1,6 +1,8 @@ -class ValidationError(Exception): +class BaseLibp2pError(Exception): + pass + + +class ValidationError(BaseLibp2pError): """ Raised when something does not pass a validation check. """ - - pass diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 850bdf3..8ce6f41 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -6,7 +6,7 @@ from libp2p.peer.id import ID from libp2p.security.base_session import BaseSession from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn -from libp2p.transport.exceptions import SecurityUpgradeFailure +from libp2p.transport.exceptions import HandshakeFailure from libp2p.typing import TProtocol from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed @@ -32,14 +32,14 @@ class InsecureSession(BaseSession): # Verify if the given `pubkey` matches the given `peer_id` try: remote_pubkey = pubkey_from_protobuf(remote_msg.pubkey) - except ValueError as error: - raise SecurityUpgradeFailure( - f"unknown protocol of remote_msg.pubkey={remote_msg.pubkey}" - ) from error + except ValueError: + raise HandshakeFailure( + f"unknown `key_type` of remote_msg.pubkey={remote_msg.pubkey}" + ) remote_peer_id = ID(remote_msg.id) remote_peer_id_from_pubkey = ID.from_pubkey(remote_pubkey) if remote_peer_id_from_pubkey != remote_peer_id: - raise SecurityUpgradeFailure( + raise HandshakeFailure( "peer id and pubkey from the remote mismatch: " f"remote_peer_id={remote_peer_id}, remote_pubkey={remote_pubkey}, " f"remote_peer_id_from_pubkey={remote_peer_id_from_pubkey}" @@ -76,10 +76,9 @@ class InsecureTransport(BaseSecureTransport): """ session = InsecureSession(self, conn, peer_id) await session.run_handshake() - # TODO: Check if `remote_public_key is not None`. If so, check if `session.remote_peer` received_peer_id = session.get_remote_peer() - if received_peer_id != peer_id: - raise SecurityUpgradeFailure( + if session.remote_permanent_pubkey is not None and received_peer_id != peer_id: + raise HandshakeFailure( "remote peer sent unexpected peer ID. " f"expected={peer_id} received={received_peer_id}" ) diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index 6e69d7a..f52e54a 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -4,11 +4,15 @@ from typing import Mapping from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID -from libp2p.protocol_muxer.multiselect import Multiselect -from libp2p.protocol_muxer.multiselect_client import MultiselectClient +from libp2p.protocol_muxer.multiselect import Multiselect, MultiselectError +from libp2p.protocol_muxer.multiselect_client import ( + MultiselectClient, + MultiselectClientError, +) from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport +from libp2p.transport.exceptions import HandshakeFailure, SecurityUpgradeFailure from libp2p.typing import TProtocol @@ -63,8 +67,18 @@ class SecurityMultistream(ABC): for an inbound connection (i.e. we are not the initiator) :return: secure connection object (that implements secure_conn_interface) """ - transport = await self.select_transport(conn, False) - secure_conn = await transport.secure_inbound(conn) + try: + transport = await self.select_transport(conn, False) + except MultiselectError as error: + raise SecurityUpgradeFailure( + "failed to negotiate the secure protocol" + ) from error + try: + secure_conn = await transport.secure_inbound(conn) + except HandshakeFailure as error: + raise SecurityUpgradeFailure( + "failed to secure the inbound transport" + ) from error return secure_conn async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: @@ -73,8 +87,18 @@ class SecurityMultistream(ABC): for an inbound connection (i.e. we are the initiator) :return: secure connection object (that implements secure_conn_interface) """ - transport = await self.select_transport(conn, True) - secure_conn = await transport.secure_outbound(conn, peer_id) + try: + transport = await self.select_transport(conn, True) + except MultiselectClientError as error: + raise SecurityUpgradeFailure( + "failed to negotiate the secure protocol" + ) from error + try: + secure_conn = await transport.secure_outbound(conn, peer_id) + except HandshakeFailure as error: + raise SecurityUpgradeFailure( + "failed to secure the outbound transport" + ) from error return secure_conn async def select_transport( diff --git a/libp2p/transport/exceptions.py b/libp2p/transport/exceptions.py index 5826f83..2a85bec 100644 --- a/libp2p/transport/exceptions.py +++ b/libp2p/transport/exceptions.py @@ -1,7 +1,14 @@ +from libp2p.exceptions import BaseLibp2pError + + # TODO: Add `BaseLibp2pError` and `UpgradeFailure` can inherit from it? -class UpgradeFailure(Exception): +class UpgradeFailure(BaseLibp2pError): pass class SecurityUpgradeFailure(UpgradeFailure): pass + + +class HandshakeFailure(BaseLibp2pError): + pass diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index 9a380b1..ef32b3b 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -4,9 +4,9 @@ import pytest from libp2p import new_node from libp2p.crypto.rsa import create_new_key_pair -from libp2p.protocol_muxer.multiselect_client import MultiselectClientError from libp2p.security.insecure.transport import InsecureSession, InsecureTransport from libp2p.security.simple.transport import SimpleSecurityTransport +from libp2p.transport.exceptions import SecurityUpgradeFailure from tests.configs import LISTEN_MADDR from tests.utils import cleanup, connect @@ -161,7 +161,7 @@ async def test_multiple_security_none_the_same_fails(): def assertion_func(_): assert False - with pytest.raises(MultiselectClientError): + with pytest.raises(SecurityUpgradeFailure): await perform_simple_test( assertion_func, transports_for_initiator, transports_for_noninitiator ) From 16a4fd33c13d9433989f74476d07511e66e5869b Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 21 Aug 2019 23:04:59 +0800 Subject: [PATCH 16/19] PR feedbacks - Move exceptions to exceptions.py - Raise `UpgradeFailure` in upgrader - Refine the try/catch for upgraders in swarm --- libp2p/network/exceptions.py | 5 ++ libp2p/network/network_interface.py | 2 +- libp2p/network/swarm.py | 52 +++++++++++++++------ libp2p/protocol_muxer/exceptions.py | 9 ++++ libp2p/protocol_muxer/multiselect.py | 5 +- libp2p/protocol_muxer/multiselect_client.py | 5 +- libp2p/security/insecure/transport.py | 2 +- libp2p/security/security_multistream.py | 36 +++----------- libp2p/transport/exceptions.py | 4 ++ libp2p/transport/upgrader.py | 35 +++++++++++--- tests/examples/test_chat.py | 2 +- tests/protocol_muxer/test_protocol_muxer.py | 2 +- tests/security/test_security_multistream.py | 4 +- 13 files changed, 97 insertions(+), 66 deletions(-) create mode 100644 libp2p/network/exceptions.py create mode 100644 libp2p/protocol_muxer/exceptions.py diff --git a/libp2p/network/exceptions.py b/libp2p/network/exceptions.py new file mode 100644 index 0000000..92be9b8 --- /dev/null +++ b/libp2p/network/exceptions.py @@ -0,0 +1,5 @@ +from libp2p.exceptions import BaseLibp2pError + + +class SwarmException(BaseLibp2pError): + pass diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 83c3b20..d9cdf48 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -33,7 +33,7 @@ class INetwork(ABC): dial_peer try to create a connection to peer_id :param peer_id: peer if we want to dial - :raises SwarmException: raised when no address if found for peer_id + :raises SwarmException: raised when an error occurs :return: muxed connection """ diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 19d1b76..3ddc4ab 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -10,13 +10,14 @@ from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator from libp2p.routing.interfaces import IPeerRouting from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream -from libp2p.transport.exceptions import UpgradeFailure +from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFailure from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader from libp2p.typing import StreamHandlerFn, TProtocol from .connection.raw_connection import RawConnection +from .exceptions import SwarmException from .network_interface import INetwork from .notifee_interface import INotifee from .stream.net_stream import NetStream @@ -85,7 +86,7 @@ class Swarm(INetwork): """ dial_peer try to create a connection to peer_id :param peer_id: peer if we want to dial - :raises SwarmException: raised when no address if found for peer_id + :raises SwarmException: raised when an error occurs :return: muxed connection """ @@ -111,10 +112,26 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn - secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) - muxed_conn = await self.upgrader.upgrade_connection( - secured_conn, self.generic_protocol_handler, peer_id - ) + try: + secured_conn = await self.upgrader.upgrade_security( + raw_conn, peer_id, True + ) + except SecurityUpgradeFailure as error: + # TODO: Add logging to indicate the failure + raw_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a secured connection from {peer_id}" + ) from error + try: + muxed_conn = await self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) + except MuxerUpgradeFailure as error: + # TODO: Add logging to indicate the failure + secured_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a muxed connection from {peer_id}" + ) from error # Store muxed connection in connections self.connections[peer_id] = muxed_conn @@ -197,19 +214,28 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn - # FIXME: This dummy `ID(b"")` for the remote peer is useless. try: + # FIXME: This dummy `ID(b"")` for the remote peer is useless. secured_conn = await self.upgrader.upgrade_security( raw_conn, ID(b""), False ) - peer_id = secured_conn.get_remote_peer() + except SecurityUpgradeFailure as error: + # TODO: Add logging to indicate the failure + raw_conn.close() + raise SwarmException( + "fail to upgrade the connection to a secured connection" + ) from error + peer_id = secured_conn.get_remote_peer() + try: muxed_conn = await self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id ) - except UpgradeFailure: + except MuxerUpgradeFailure as error: # TODO: Add logging to indicate the failure - raw_conn.close() - return + secured_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a muxed connection from {peer_id}" + ) from error # Store muxed_conn with peer id self.connections[peer_id] = muxed_conn @@ -283,7 +309,3 @@ def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn: asyncio.ensure_future(handler(net_stream)) return generic_protocol_handler - - -class SwarmException(Exception): - pass diff --git a/libp2p/protocol_muxer/exceptions.py b/libp2p/protocol_muxer/exceptions.py new file mode 100644 index 0000000..cf47aca --- /dev/null +++ b/libp2p/protocol_muxer/exceptions.py @@ -0,0 +1,9 @@ +from libp2p.exceptions import BaseLibp2pError + + +class MultiselectError(BaseLibp2pError): + """Raised when an error occurs in multiselect process""" + + +class MultiselectClientError(BaseLibp2pError): + """Raised when an error occurs in protocol selection process""" diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 9fd3de8..0c3dc72 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -2,6 +2,7 @@ from typing import Dict, Tuple from libp2p.typing import StreamHandlerFn, TProtocol +from .exceptions import MultiselectError from .multiselect_communicator_interface import IMultiselectCommunicator from .multiselect_muxer_interface import IMultiselectMuxer @@ -97,7 +98,3 @@ def validate_handshake(handshake_contents: str) -> bool: # TODO: Modify this when format used by go repo for messages # is added return handshake_contents == MULTISELECT_PROTOCOL_ID - - -class MultiselectError(ValueError): - """Raised when an error occurs in multiselect process""" diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 062aedc..5fcfc45 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -2,6 +2,7 @@ from typing import Sequence from libp2p.typing import TProtocol +from .exceptions import MultiselectClientError from .multiselect_client_interface import IMultiselectClient from .multiselect_communicator_interface import IMultiselectCommunicator @@ -116,7 +117,3 @@ def validate_handshake(handshake_contents: str) -> bool: # TODO: Modify this when format used by go repo for messages # is added return handshake_contents == MULTISELECT_PROTOCOL_ID - - -class MultiselectClientError(ValueError): - """Raised when an error occurs in protocol selection process""" diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 8ce6f41..6cb882a 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -19,7 +19,7 @@ PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/2.0.0") class InsecureSession(BaseSession): - async def run_handshake(self): + async def run_handshake(self) -> None: msg = make_exchange_message(self.local_private_key.get_public_key()) msg_bytes = msg.SerializeToString() encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index f52e54a..6e69d7a 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -4,15 +4,11 @@ from typing import Mapping from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID -from libp2p.protocol_muxer.multiselect import Multiselect, MultiselectError -from libp2p.protocol_muxer.multiselect_client import ( - MultiselectClient, - MultiselectClientError, -) +from libp2p.protocol_muxer.multiselect import Multiselect +from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport -from libp2p.transport.exceptions import HandshakeFailure, SecurityUpgradeFailure from libp2p.typing import TProtocol @@ -67,18 +63,8 @@ class SecurityMultistream(ABC): for an inbound connection (i.e. we are not the initiator) :return: secure connection object (that implements secure_conn_interface) """ - try: - transport = await self.select_transport(conn, False) - except MultiselectError as error: - raise SecurityUpgradeFailure( - "failed to negotiate the secure protocol" - ) from error - try: - secure_conn = await transport.secure_inbound(conn) - except HandshakeFailure as error: - raise SecurityUpgradeFailure( - "failed to secure the inbound transport" - ) from error + transport = await self.select_transport(conn, False) + secure_conn = await transport.secure_inbound(conn) return secure_conn async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: @@ -87,18 +73,8 @@ class SecurityMultistream(ABC): for an inbound connection (i.e. we are the initiator) :return: secure connection object (that implements secure_conn_interface) """ - try: - transport = await self.select_transport(conn, True) - except MultiselectClientError as error: - raise SecurityUpgradeFailure( - "failed to negotiate the secure protocol" - ) from error - try: - secure_conn = await transport.secure_outbound(conn, peer_id) - except HandshakeFailure as error: - raise SecurityUpgradeFailure( - "failed to secure the outbound transport" - ) from error + transport = await self.select_transport(conn, True) + secure_conn = await transport.secure_outbound(conn, peer_id) return secure_conn async def select_transport( diff --git a/libp2p/transport/exceptions.py b/libp2p/transport/exceptions.py index 2a85bec..b10cfc9 100644 --- a/libp2p/transport/exceptions.py +++ b/libp2p/transport/exceptions.py @@ -10,5 +10,9 @@ class SecurityUpgradeFailure(UpgradeFailure): pass +class MuxerUpgradeFailure(UpgradeFailure): + pass + + class HandshakeFailure(BaseLibp2pError): pass diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index b0373ec..762a811 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -3,11 +3,17 @@ from typing import Mapping from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID +from libp2p.protocol_muxer.exceptions import MultiselectClientError, MultiselectError from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.security_multistream import SecurityMultistream from libp2p.stream_muxer.abc import IMuxedConn from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream +from libp2p.transport.exceptions import ( + HandshakeFailure, + MuxerUpgradeFailure, + SecurityUpgradeFailure, +) from libp2p.typing import TProtocol from .listener_interface import IListener @@ -39,10 +45,20 @@ class TransportUpgrader: """ Upgrade conn to a secured connection """ - if initiator: - return await self.security_multistream.secure_outbound(raw_conn, peer_id) - - return await self.security_multistream.secure_inbound(raw_conn) + try: + if initiator: + return await self.security_multistream.secure_outbound( + raw_conn, peer_id + ) + return await self.security_multistream.secure_inbound(raw_conn) + except (MultiselectError, MultiselectClientError) as error: + raise SecurityUpgradeFailure( + "failed to negotiate the secure protocol" + ) from error + except HandshakeFailure as error: + raise SecurityUpgradeFailure( + "handshake failed when upgrading to secure connection" + ) from error async def upgrade_connection( self, @@ -53,6 +69,11 @@ class TransportUpgrader: """ Upgrade secured connection to a muxed connection """ - return await self.muxer_multistream.new_conn( - conn, generic_protocol_handler, peer_id - ) + try: + return await self.muxer_multistream.new_conn( + conn, generic_protocol_handler, peer_id + ) + except (MultiselectError, MultiselectClientError) as error: + raise MuxerUpgradeFailure( + "failed to negotiate the multiplexer protocol" + ) from error diff --git a/tests/examples/test_chat.py b/tests/examples/test_chat.py index 0422c95..f461d9d 100644 --- a/tests/examples/test_chat.py +++ b/tests/examples/test_chat.py @@ -3,7 +3,7 @@ import asyncio import pytest from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.protocol_muxer.multiselect_client import MultiselectClientError +from libp2p.protocol_muxer.exceptions import MultiselectClientError from tests.utils import cleanup, set_up_nodes_by_transport_opt PROTOCOL_ID = "/chat/1.0.0" diff --git a/tests/protocol_muxer/test_protocol_muxer.py b/tests/protocol_muxer/test_protocol_muxer.py index 775c460..02f08bd 100644 --- a/tests/protocol_muxer/test_protocol_muxer.py +++ b/tests/protocol_muxer/test_protocol_muxer.py @@ -1,6 +1,6 @@ import pytest -from libp2p.protocol_muxer.multiselect_client import MultiselectClientError +from libp2p.protocol_muxer.exceptions import MultiselectClientError from tests.utils import cleanup, set_up_nodes_by_transport_opt # TODO: Add tests for multiple streams being opened on different diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index ef32b3b..ea78d1f 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -4,9 +4,9 @@ import pytest from libp2p import new_node from libp2p.crypto.rsa import create_new_key_pair +from libp2p.network.exceptions import SwarmException from libp2p.security.insecure.transport import InsecureSession, InsecureTransport from libp2p.security.simple.transport import SimpleSecurityTransport -from libp2p.transport.exceptions import SecurityUpgradeFailure from tests.configs import LISTEN_MADDR from tests.utils import cleanup, connect @@ -161,7 +161,7 @@ async def test_multiple_security_none_the_same_fails(): def assertion_func(_): assert False - with pytest.raises(SecurityUpgradeFailure): + with pytest.raises(SwarmException): await perform_simple_test( assertion_func, transports_for_initiator, transports_for_noninitiator ) From 7c630df610a33576fdc0e708590185d3b9298b4c Mon Sep 17 00:00:00 2001 From: Kevin Mai-Husan Chia Date: Wed, 21 Aug 2019 23:17:49 +0800 Subject: [PATCH 17/19] Update libp2p/security/insecure/transport.py Co-Authored-By: NIC Lin --- libp2p/security/insecure/transport.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 6cb882a..6894cbd 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -47,7 +47,7 @@ class InsecureSession(BaseSession): # Nothing is wrong. Store the `pubkey` and `peer_id` in the session. self.remote_permanent_pubkey = remote_pubkey - self.remote_peer_id = ID(remote_msg.id) + self.remote_peer_id = remote_peer_id # TODO: Store `pubkey` and `peer_id` to `PeerStore` From c1eacf221fa5331375d6ef7acc769a3c5a054714 Mon Sep 17 00:00:00 2001 From: mhchia Date: Wed, 21 Aug 2019 23:56:04 +0800 Subject: [PATCH 18/19] PR feedback - Check if the received peer id matches the one we initialize the session with. - Move the check inside `run_handshake` --- libp2p/security/insecure/transport.py | 34 ++++++++++++++++----------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 6894cbd..2aad45c 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -28,26 +28,38 @@ class InsecureSession(BaseSession): remote_msg_bytes = await read_fixedint_prefixed(self.conn) remote_msg = plaintext_pb2.Exchange() remote_msg.ParseFromString(remote_msg_bytes) + received_peer_id = ID(remote_msg.id) + + # Verify if the receive `ID` matches the one we originally initialize the session. + # We only need to check it when we are the initiator, because only in that condition + # we possibly knows the `ID` of the remote. + if self.initiator and self.remote_peer_id != received_peer_id: + raise HandshakeFailure( + "remote peer sent unexpected peer ID. " + f"expected={self.remote_peer_id} received={received_peer_id}" + ) # Verify if the given `pubkey` matches the given `peer_id` try: - remote_pubkey = pubkey_from_protobuf(remote_msg.pubkey) + received_pubkey = pubkey_from_protobuf(remote_msg.pubkey) except ValueError: raise HandshakeFailure( f"unknown `key_type` of remote_msg.pubkey={remote_msg.pubkey}" ) - remote_peer_id = ID(remote_msg.id) - remote_peer_id_from_pubkey = ID.from_pubkey(remote_pubkey) - if remote_peer_id_from_pubkey != remote_peer_id: + peer_id_from_received_pubkey = ID.from_pubkey(received_pubkey) + if peer_id_from_received_pubkey != received_peer_id: raise HandshakeFailure( "peer id and pubkey from the remote mismatch: " - f"remote_peer_id={remote_peer_id}, remote_pubkey={remote_pubkey}, " - f"remote_peer_id_from_pubkey={remote_peer_id_from_pubkey}" + f"received_peer_id={received_peer_id}, remote_pubkey={received_pubkey}, " + f"peer_id_from_received_pubkey={peer_id_from_received_pubkey}" ) # Nothing is wrong. Store the `pubkey` and `peer_id` in the session. - self.remote_permanent_pubkey = remote_pubkey - self.remote_peer_id = remote_peer_id + self.remote_permanent_pubkey = received_pubkey + # Only need to set peer's id when we don't know it before, + # i.e. we are not the connection initiator. + if not self.initiator: + self.remote_peer_id = received_peer_id # TODO: Store `pubkey` and `peer_id` to `PeerStore` @@ -76,12 +88,6 @@ class InsecureTransport(BaseSecureTransport): """ session = InsecureSession(self, conn, peer_id) await session.run_handshake() - received_peer_id = session.get_remote_peer() - if session.remote_permanent_pubkey is not None and received_peer_id != peer_id: - raise HandshakeFailure( - "remote peer sent unexpected peer ID. " - f"expected={peer_id} received={received_peer_id}" - ) return session From b5165792564b2c16a49e180fb0291873cb4e1252 Mon Sep 17 00:00:00 2001 From: mhchia Date: Thu, 22 Aug 2019 22:54:14 +0800 Subject: [PATCH 19/19] Remove the unnecessary RSAPrivateKey.from_bytes --- libp2p/crypto/rsa.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libp2p/crypto/rsa.py b/libp2p/crypto/rsa.py index a0f8439..ed8c215 100644 --- a/libp2p/crypto/rsa.py +++ b/libp2p/crypto/rsa.py @@ -35,11 +35,6 @@ class RSAPrivateKey(PrivateKey): def to_bytes(self) -> bytes: return self.impl.export_key("DER") - @classmethod - def from_bytes(cls, key_bytes: bytes) -> "RSAPrivateKey": - rsakey = RSA.import_key(key_bytes) - return cls(rsakey) - def get_type(self) -> KeyType: return KeyType.RSA