diff --git a/Makefile b/Makefile index d44fd6b..e5b6509 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,10 @@ FILES_TO_LINT = libp2p tests examples setup.py +PB = libp2p/crypto/pb/crypto.proto libp2p/pubsub/pb/rpc.proto libp2p/security/insecure/pb/plaintext.proto +PY = $(PB:.proto=_pb2.py) +PYI = $(PB:.proto=_pb2.pyi) + +# Set default to `protobufs`, otherwise `format` is called when typing only `make` +all: protobufs format: black $(FILES_TO_LINT) @@ -10,6 +16,12 @@ lintroll: isort --recursive --check-only $(FILES_TO_LINT) flake8 $(FILES_TO_LINT) -protobufs: - cd libp2p/crypto/pb && protoc --python_out=. --mypy_out=. crypto.proto - cd libp2p/pubsub/pb && protoc --python_out=. --mypy_out=. rpc.proto +protobufs: $(PY) + +%_pb2.py: %.proto + protoc --python_out=. --mypy_out=. $< + +.PHONY: clean + +clean: + rm -f $(PY) $(PYI) diff --git a/libp2p/__init__.py b/libp2p/__init__.py index 21f1d9c..10e71db 100644 --- a/libp2p/__init__.py +++ b/libp2p/__init__.py @@ -98,7 +98,7 @@ def initialize_default_swarm( muxer_transports_by_protocol = muxer_opt or {MPLEX_PROTOCOL_ID: Mplex} security_transports_by_protocol = sec_opt or { - PLAINTEXT_PROTOCOL_ID: InsecureTransport(key_pair) + TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair) } upgrader = TransportUpgrader( security_transports_by_protocol, muxer_transports_by_protocol diff --git a/libp2p/crypto/pb/crypto_pb2.py b/libp2p/crypto/pb/crypto_pb2.py index 0e4aa1f..1b51897 100644 --- a/libp2p/crypto/pb/crypto_pb2.py +++ b/libp2p/crypto/pb/crypto_pb2.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: crypto.proto +# source: libp2p/crypto/pb/crypto.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) @@ -9,6 +8,7 @@ from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -17,11 +17,10 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( - name='crypto.proto', + name='libp2p/crypto/pb/crypto.proto', package='crypto.pb', syntax='proto2', - serialized_options=None, - serialized_pb=_b('\n\x0c\x63rypto.proto\x12\tcrypto.pb\"?\n\tPublicKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"@\n\nPrivateKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c*9\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03') + serialized_pb=_b('\n\x1dlibp2p/crypto/pb/crypto.proto\x12\tcrypto.pb\"?\n\tPublicKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"@\n\nPrivateKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c*9\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03') ) _KEYTYPE = _descriptor.EnumDescriptor( @@ -32,25 +31,25 @@ _KEYTYPE = _descriptor.EnumDescriptor( values=[ _descriptor.EnumValueDescriptor( name='RSA', index=0, number=0, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='Ed25519', index=1, number=1, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='Secp256k1', index=2, number=2, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='ECDSA', index=3, number=3, - serialized_options=None, + options=None, type=None), ], containing_type=None, - serialized_options=None, - serialized_start=158, - serialized_end=215, + options=None, + serialized_start=175, + serialized_end=232, ) _sym_db.RegisterEnumDescriptor(_KEYTYPE) @@ -75,28 +74,28 @@ _PUBLICKEY = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='data', full_name='crypto.pb.PublicKey.data', index=1, number=2, type=12, cpp_type=9, label=2, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=27, - serialized_end=90, + serialized_start=44, + serialized_end=107, ) @@ -113,28 +112,28 @@ _PRIVATEKEY = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='data', full_name='crypto.pb.PrivateKey.data', index=1, number=2, type=12, cpp_type=9, label=2, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=92, - serialized_end=156, + serialized_start=109, + serialized_end=173, ) _PUBLICKEY.fields_by_name['key_type'].enum_type = _KEYTYPE @@ -146,14 +145,14 @@ _sym_db.RegisterFileDescriptor(DESCRIPTOR) PublicKey = _reflection.GeneratedProtocolMessageType('PublicKey', (_message.Message,), dict( DESCRIPTOR = _PUBLICKEY, - __module__ = 'crypto_pb2' + __module__ = 'libp2p.crypto.pb.crypto_pb2' # @@protoc_insertion_point(class_scope:crypto.pb.PublicKey) )) _sym_db.RegisterMessage(PublicKey) PrivateKey = _reflection.GeneratedProtocolMessageType('PrivateKey', (_message.Message,), dict( DESCRIPTOR = _PRIVATEKEY, - __module__ = 'crypto_pb2' + __module__ = 'libp2p.crypto.pb.crypto_pb2' # @@protoc_insertion_point(class_scope:crypto.pb.PrivateKey) )) _sym_db.RegisterMessage(PrivateKey) diff --git a/libp2p/crypto/rsa.py b/libp2p/crypto/rsa.py index ceea008..ed8c215 100644 --- a/libp2p/crypto/rsa.py +++ b/libp2p/crypto/rsa.py @@ -11,6 +11,11 @@ class RSAPublicKey(PublicKey): def to_bytes(self) -> bytes: return self.impl.export_key("DER") + @classmethod + def from_bytes(cls, key_bytes: bytes) -> "RSAPublicKey": + rsakey = RSA.import_key(key_bytes) + return cls(rsakey) + def get_type(self) -> KeyType: return KeyType.RSA diff --git a/libp2p/crypto/secp256k1.py b/libp2p/crypto/secp256k1.py index 79ffc9d..e2d5fb2 100644 --- a/libp2p/crypto/secp256k1.py +++ b/libp2p/crypto/secp256k1.py @@ -10,6 +10,11 @@ class Secp256k1PublicKey(PublicKey): def to_bytes(self) -> bytes: return self.impl.format() + @classmethod + def from_bytes(cls, key_bytes: bytes) -> "Secp256k1PublicKey": + secp256k1_pubkey = coincurve.PublicKey(key_bytes) + return cls(secp256k1_pubkey) + def get_type(self) -> KeyType: return KeyType.Secp256k1 diff --git a/libp2p/crypto/utils.py b/libp2p/crypto/utils.py new file mode 100644 index 0000000..8519598 --- /dev/null +++ b/libp2p/crypto/utils.py @@ -0,0 +1,17 @@ +from .keys import PublicKey +from .pb import crypto_pb2 as protobuf +from .rsa import RSAPublicKey +from .secp256k1 import Secp256k1PublicKey + + +def pubkey_from_protobuf(pubkey_pb: protobuf.PublicKey) -> PublicKey: + if pubkey_pb.key_type == protobuf.RSA: + return RSAPublicKey.from_bytes(pubkey_pb.data) + # TODO: Test against secp256k1 keys + elif pubkey_pb.key_type == protobuf.Secp256k1: + return Secp256k1PublicKey.from_bytes(pubkey_pb.data) + # TODO: Support `Ed25519` and `ECDSA` in the future? + else: + raise ValueError( + f"unsupported key_type={pubkey_pb.key_type}, data={pubkey_pb.data!r}" + ) diff --git a/libp2p/exceptions.py b/libp2p/exceptions.py index 8d8af44..0ea0078 100644 --- a/libp2p/exceptions.py +++ b/libp2p/exceptions.py @@ -1,6 +1,8 @@ -class ValidationError(Exception): +class BaseLibp2pError(Exception): + pass + + +class ValidationError(BaseLibp2pError): """ Raised when something does not pass a validation check. """ - - pass diff --git a/libp2p/network/connection/raw_connection.py b/libp2p/network/connection/raw_connection.py index 3d12f0d..0d20de5 100644 --- a/libp2p/network/connection/raw_connection.py +++ b/libp2p/network/connection/raw_connection.py @@ -9,9 +9,11 @@ class RawConnection(IRawConnection): conn_port: str reader: asyncio.StreamReader writer: asyncio.StreamWriter - _next_id: int initiator: bool + _drain_lock: asyncio.Lock + _next_id: int + def __init__( self, ip: str, @@ -24,17 +26,25 @@ class RawConnection(IRawConnection): self.conn_port = port self.reader = reader self.writer = writer - self._next_id = 0 if initiator else 1 self.initiator = initiator + self._drain_lock = asyncio.Lock() + self._next_id = 0 if initiator else 1 + async def write(self, data: bytes) -> None: self.writer.write(data) - self.writer.write("\n".encode()) - await self.writer.drain() + # Reference: https://github.com/ethereum/lahja/blob/93610b2eb46969ff1797e0748c7ac2595e130aef/lahja/asyncio/endpoint.py#L99-L102 # noqa: E501 + # Use a lock to serialize drain() calls. Circumvents this bug: + # https://bugs.python.org/issue29930 + async with self._drain_lock: + await self.writer.drain() - async def read(self) -> bytes: - line = await self.reader.readline() - return line.rstrip(b"\n") + async def read(self, n: int = -1) -> bytes: + """ + Read up to ``n`` bytes from the underlying stream. + This call is delegated directly to the underlying ``self.reader``. + """ + return await self.reader.read(n) def close(self) -> None: self.writer.close() diff --git a/libp2p/network/connection/raw_connection_interface.py b/libp2p/network/connection/raw_connection_interface.py index 1810f58..fd1b469 100644 --- a/libp2p/network/connection/raw_connection_interface.py +++ b/libp2p/network/connection/raw_connection_interface.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -import asyncio class IRawConnection(ABC): @@ -9,17 +8,12 @@ class IRawConnection(ABC): initiator: bool - # TODO: reader and writer shouldn't be exposed. - # Need better API for the consumers - reader: asyncio.StreamReader - writer: asyncio.StreamWriter - @abstractmethod async def write(self, data: bytes) -> None: pass @abstractmethod - async def read(self) -> bytes: + async def read(self, n: int = -1) -> bytes: pass @abstractmethod diff --git a/libp2p/network/exceptions.py b/libp2p/network/exceptions.py new file mode 100644 index 0000000..92be9b8 --- /dev/null +++ b/libp2p/network/exceptions.py @@ -0,0 +1,5 @@ +from libp2p.exceptions import BaseLibp2pError + + +class SwarmException(BaseLibp2pError): + pass diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index 83c3b20..d9cdf48 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -33,7 +33,7 @@ class INetwork(ABC): dial_peer try to create a connection to peer_id :param peer_id: peer if we want to dial - :raises SwarmException: raised when no address if found for peer_id + :raises SwarmException: raised when an error occurs :return: muxed connection """ diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 9dd1c15..3ddc4ab 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -10,12 +10,14 @@ from libp2p.protocol_muxer.multiselect_client import MultiselectClient from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator from libp2p.routing.interfaces import IPeerRouting from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream +from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFailure from libp2p.transport.listener_interface import IListener from libp2p.transport.transport_interface import ITransport from libp2p.transport.upgrader import TransportUpgrader from libp2p.typing import StreamHandlerFn, TProtocol from .connection.raw_connection import RawConnection +from .exceptions import SwarmException from .network_interface import INetwork from .notifee_interface import INotifee from .stream.net_stream import NetStream @@ -84,7 +86,7 @@ class Swarm(INetwork): """ dial_peer try to create a connection to peer_id :param peer_id: peer if we want to dial - :raises SwarmException: raised when no address if found for peer_id + :raises SwarmException: raised when an error occurs :return: muxed connection """ @@ -110,10 +112,26 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn - secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) - muxed_conn = await self.upgrader.upgrade_connection( - secured_conn, self.generic_protocol_handler, peer_id - ) + try: + secured_conn = await self.upgrader.upgrade_security( + raw_conn, peer_id, True + ) + except SecurityUpgradeFailure as error: + # TODO: Add logging to indicate the failure + raw_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a secured connection from {peer_id}" + ) from error + try: + muxed_conn = await self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) + except MuxerUpgradeFailure as error: + # TODO: Add logging to indicate the failure + secured_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a muxed connection from {peer_id}" + ) from error # Store muxed connection in connections self.connections[peer_id] = muxed_conn @@ -145,6 +163,7 @@ class Swarm(INetwork): # Use muxed conn to open stream, which returns # a muxed stream # TODO: Remove protocol id from being passed into muxed_conn + # FIXME: Remove multiaddr from being passed into muxed_conn muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr) # Perform protocol muxing to determine protocol to use @@ -176,24 +195,18 @@ class Swarm(INetwork): Call listener listen with the multiaddr Map multiaddr to listener """ - for multiaddr in multiaddrs: - if str(multiaddr) in self.listeners: + for maddr in multiaddrs: + if str(maddr) in self.listeners: return True async def conn_handler( reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: - # Read in first message (should be peer_id of initiator) and ack - peer_id = ID.from_base58((await reader.read(1024)).decode()) - - writer.write("received peer id".encode()) - await writer.drain() - # Upgrade reader/write to a net_stream and pass \ # to appropriate stream handler (using multiaddr) raw_conn = RawConnection( - multiaddr.value_for_protocol("ip4"), - multiaddr.value_for_protocol("tcp"), + maddr.value_for_protocol("ip4"), + maddr.value_for_protocol("tcp"), reader, writer, False, @@ -201,12 +214,28 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn - secured_conn = await self.upgrader.upgrade_security( - raw_conn, peer_id, False - ) - muxed_conn = await self.upgrader.upgrade_connection( - secured_conn, self.generic_protocol_handler, peer_id - ) + try: + # FIXME: This dummy `ID(b"")` for the remote peer is useless. + secured_conn = await self.upgrader.upgrade_security( + raw_conn, ID(b""), False + ) + except SecurityUpgradeFailure as error: + # TODO: Add logging to indicate the failure + raw_conn.close() + raise SwarmException( + "fail to upgrade the connection to a secured connection" + ) from error + peer_id = secured_conn.get_remote_peer() + try: + muxed_conn = await self.upgrader.upgrade_connection( + secured_conn, self.generic_protocol_handler, peer_id + ) + except MuxerUpgradeFailure as error: + # TODO: Add logging to indicate the failure + secured_conn.close() + raise SwarmException( + f"fail to upgrade the connection to a muxed connection from {peer_id}" + ) from error # Store muxed_conn with peer id self.connections[peer_id] = muxed_conn @@ -218,19 +247,19 @@ class Swarm(INetwork): try: # Success listener = self.transport.create_listener(conn_handler) - self.listeners[str(multiaddr)] = listener - await listener.listen(multiaddr) + self.listeners[str(maddr)] = listener + await listener.listen(maddr) # Call notifiers since event occurred for notifee in self.notifees: - await notifee.listen(self, multiaddr) + await notifee.listen(self, maddr) return True except IOError: # Failed. Continue looping. - print("Failed to connect to: " + str(multiaddr)) + print("Failed to connect to: " + str(maddr)) - # No multiaddr succeeded + # No maddr succeeded return False def notify(self, notifee: INotifee) -> bool: @@ -280,7 +309,3 @@ def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn: asyncio.ensure_future(handler(net_stream)) return generic_protocol_handler - - -class SwarmException(Exception): - pass diff --git a/libp2p/protocol_muxer/exceptions.py b/libp2p/protocol_muxer/exceptions.py new file mode 100644 index 0000000..cf47aca --- /dev/null +++ b/libp2p/protocol_muxer/exceptions.py @@ -0,0 +1,9 @@ +from libp2p.exceptions import BaseLibp2pError + + +class MultiselectError(BaseLibp2pError): + """Raised when an error occurs in multiselect process""" + + +class MultiselectClientError(BaseLibp2pError): + """Raised when an error occurs in protocol selection process""" diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index c854415..0c3dc72 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -2,6 +2,7 @@ from typing import Dict, Tuple from libp2p.typing import StreamHandlerFn, TProtocol +from .exceptions import MultiselectError from .multiselect_communicator_interface import IMultiselectCommunicator from .multiselect_muxer_interface import IMultiselectMuxer @@ -79,7 +80,10 @@ class Multiselect(IMultiselectMuxer): # Confirm that the protocols are the same if not validate_handshake(handshake_contents): - raise MultiselectError("multiselect protocol ID mismatch") + raise MultiselectError( + "multiselect protocol ID mismatch: " + f"received handshake_contents={handshake_contents}" + ) # Handshake succeeded if this point is reached @@ -94,7 +98,3 @@ def validate_handshake(handshake_contents: str) -> bool: # TODO: Modify this when format used by go repo for messages # is added return handshake_contents == MULTISELECT_PROTOCOL_ID - - -class MultiselectError(ValueError): - """Raised when an error occurs in multiselect process""" diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index 062aedc..5fcfc45 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -2,6 +2,7 @@ from typing import Sequence from libp2p.typing import TProtocol +from .exceptions import MultiselectClientError from .multiselect_client_interface import IMultiselectClient from .multiselect_communicator_interface import IMultiselectCommunicator @@ -116,7 +117,3 @@ def validate_handshake(handshake_contents: str) -> bool: # TODO: Modify this when format used by go repo for messages # is added return handshake_contents == MULTISELECT_PROTOCOL_ID - - -class MultiselectClientError(ValueError): - """Raised when an error occurs in protocol selection process""" diff --git a/libp2p/protocol_muxer/multiselect_communicator.py b/libp2p/protocol_muxer/multiselect_communicator.py index 783c380..e252304 100644 --- a/libp2p/protocol_muxer/multiselect_communicator.py +++ b/libp2p/protocol_muxer/multiselect_communicator.py @@ -1,23 +1,10 @@ from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.stream_muxer.abc import IMuxedStream -from libp2p.stream_muxer.mplex.utils import decode_uvarint_from_stream, encode_uvarint -from libp2p.typing import StreamReader +from libp2p.utils import encode_delim, read_delim from .multiselect_communicator_interface import IMultiselectCommunicator -def delim_encode(msg_str: str) -> bytes: - msg_bytes = msg_str.encode() - varint_len_msg = encode_uvarint(len(msg_bytes) + 1) - return varint_len_msg + msg_bytes + b"\n" - - -async def delim_read(reader: StreamReader, timeout: int = 10) -> str: - len_msg = await decode_uvarint_from_stream(reader, timeout) - msg_bytes = await reader.read(len_msg) - return msg_bytes.decode().rstrip() - - class RawConnectionCommunicator(IMultiselectCommunicator): conn: IRawConnection @@ -25,12 +12,12 @@ class RawConnectionCommunicator(IMultiselectCommunicator): self.conn = conn async def write(self, msg_str: str) -> None: - msg_bytes = delim_encode(msg_str) - self.conn.writer.write(msg_bytes) - await self.conn.writer.drain() + msg_bytes = encode_delim(msg_str.encode()) + await self.conn.write(msg_bytes) async def read(self) -> str: - return await delim_read(self.conn.reader) + data = await read_delim(self.conn) + return data.decode() class StreamCommunicator(IMultiselectCommunicator): @@ -40,8 +27,9 @@ class StreamCommunicator(IMultiselectCommunicator): self.stream = stream async def write(self, msg_str: str) -> None: - msg_bytes = delim_encode(msg_str) + msg_bytes = encode_delim(msg_str.encode()) await self.stream.write(msg_bytes) async def read(self) -> str: - return await delim_read(self.stream) + data = await read_delim(self.stream) + return data.decode() diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index ab794f2..7c53fc4 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: rpc.proto +# source: libp2p/pubsub/pb/rpc.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) @@ -8,6 +7,7 @@ from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -16,11 +16,10 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( - name='rpc.proto', + name='libp2p/pubsub/pb/rpc.proto', package='pubsub.pb', syntax='proto2', - serialized_options=None, - serialized_pb=_b('\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') + serialized_pb=_b('\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') ) @@ -33,21 +32,21 @@ _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE = _descriptor.EnumDescriptor( values=[ _descriptor.EnumValueDescriptor( name='NONE', index=0, number=0, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='KEY', index=1, number=1, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='WOT', index=2, number=2, - serialized_options=None, + options=None, type=None), ], containing_type=None, - serialized_options=None, - serialized_start=868, - serialized_end=906, + options=None, + serialized_start=885, + serialized_end=923, ) _sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE) @@ -59,21 +58,21 @@ _TOPICDESCRIPTOR_ENCOPTS_ENCMODE = _descriptor.EnumDescriptor( values=[ _descriptor.EnumValueDescriptor( name='NONE', index=0, number=0, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='SHAREDKEY', index=1, number=1, - serialized_options=None, + options=None, type=None), _descriptor.EnumValueDescriptor( name='WOT', index=2, number=2, - serialized_options=None, + options=None, type=None), ], containing_type=None, - serialized_options=None, - serialized_start=997, - serialized_end=1040, + options=None, + serialized_start=1014, + serialized_end=1057, ) _sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_ENCOPTS_ENCMODE) @@ -91,28 +90,28 @@ _RPC_SUBOPTS = _descriptor.Descriptor( has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='topicid', full_name='pubsub.pb.RPC.SubOpts.topicid', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=160, - serialized_end=205, + serialized_start=177, + serialized_end=222, ) _RPC = _descriptor.Descriptor( @@ -128,35 +127,35 @@ _RPC = _descriptor.Descriptor( has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='publish', full_name='pubsub.pb.RPC.publish', index=1, number=2, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='control', full_name='pubsub.pb.RPC.control', index=2, number=3, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[_RPC_SUBOPTS, ], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=25, - serialized_end=205, + serialized_start=42, + serialized_end=222, ) @@ -173,56 +172,56 @@ _MESSAGE = _descriptor.Descriptor( has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='data', full_name='pubsub.pb.Message.data', index=1, number=2, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='seqno', full_name='pubsub.pb.Message.seqno', index=2, number=3, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='topicIDs', full_name='pubsub.pb.Message.topicIDs', index=3, number=4, type=9, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='signature', full_name='pubsub.pb.Message.signature', index=4, number=5, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='key', full_name='pubsub.pb.Message.key', index=5, number=6, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=207, - serialized_end=312, + serialized_start=224, + serialized_end=329, ) @@ -239,42 +238,42 @@ _CONTROLMESSAGE = _descriptor.Descriptor( has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='iwant', full_name='pubsub.pb.ControlMessage.iwant', index=1, number=2, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='graft', full_name='pubsub.pb.ControlMessage.graft', index=2, number=3, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='prune', full_name='pubsub.pb.ControlMessage.prune', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=315, - serialized_end=491, + serialized_start=332, + serialized_end=508, ) @@ -291,28 +290,28 @@ _CONTROLIHAVE = _descriptor.Descriptor( has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='messageIDs', full_name='pubsub.pb.ControlIHave.messageIDs', index=1, number=2, type=9, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=493, - serialized_end=544, + serialized_start=510, + serialized_end=561, ) @@ -329,21 +328,21 @@ _CONTROLIWANT = _descriptor.Descriptor( has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=546, - serialized_end=580, + serialized_start=563, + serialized_end=597, ) @@ -360,21 +359,21 @@ _CONTROLGRAFT = _descriptor.Descriptor( has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=582, - serialized_end=613, + serialized_start=599, + serialized_end=630, ) @@ -391,21 +390,21 @@ _CONTROLPRUNE = _descriptor.Descriptor( has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=615, - serialized_end=646, + serialized_start=632, + serialized_end=663, ) @@ -422,14 +421,14 @@ _TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='keys', full_name='pubsub.pb.TopicDescriptor.AuthOpts.keys', index=1, number=2, type=12, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -437,14 +436,14 @@ _TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor( enum_types=[ _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE, ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=782, - serialized_end=906, + serialized_start=799, + serialized_end=923, ) _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( @@ -460,14 +459,14 @@ _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='keyHashes', full_name='pubsub.pb.TopicDescriptor.EncOpts.keyHashes', index=1, number=2, type=12, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -475,14 +474,14 @@ _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( enum_types=[ _TOPICDESCRIPTOR_ENCOPTS_ENCMODE, ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=909, - serialized_end=1040, + serialized_start=926, + serialized_end=1057, ) _TOPICDESCRIPTOR = _descriptor.Descriptor( @@ -498,35 +497,35 @@ _TOPICDESCRIPTOR = _descriptor.Descriptor( has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='auth', full_name='pubsub.pb.TopicDescriptor.auth', index=1, number=2, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='enc', full_name='pubsub.pb.TopicDescriptor.enc', index=2, number=3, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[_TOPICDESCRIPTOR_AUTHOPTS, _TOPICDESCRIPTOR_ENCOPTS, ], enum_types=[ ], - serialized_options=None, + options=None, is_extendable=False, syntax='proto2', extension_ranges=[], oneofs=[ ], - serialized_start=649, - serialized_end=1040, + serialized_start=666, + serialized_end=1057, ) _RPC_SUBOPTS.containing_type = _RPC @@ -559,12 +558,12 @@ RPC = _reflection.GeneratedProtocolMessageType('RPC', (_message.Message,), dict( SubOpts = _reflection.GeneratedProtocolMessageType('SubOpts', (_message.Message,), dict( DESCRIPTOR = _RPC_SUBOPTS, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.RPC.SubOpts) )) , DESCRIPTOR = _RPC, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.RPC) )) _sym_db.RegisterMessage(RPC) @@ -572,42 +571,42 @@ _sym_db.RegisterMessage(RPC.SubOpts) Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict( DESCRIPTOR = _MESSAGE, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.Message) )) _sym_db.RegisterMessage(Message) ControlMessage = _reflection.GeneratedProtocolMessageType('ControlMessage', (_message.Message,), dict( DESCRIPTOR = _CONTROLMESSAGE, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlMessage) )) _sym_db.RegisterMessage(ControlMessage) ControlIHave = _reflection.GeneratedProtocolMessageType('ControlIHave', (_message.Message,), dict( DESCRIPTOR = _CONTROLIHAVE, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIHave) )) _sym_db.RegisterMessage(ControlIHave) ControlIWant = _reflection.GeneratedProtocolMessageType('ControlIWant', (_message.Message,), dict( DESCRIPTOR = _CONTROLIWANT, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIWant) )) _sym_db.RegisterMessage(ControlIWant) ControlGraft = _reflection.GeneratedProtocolMessageType('ControlGraft', (_message.Message,), dict( DESCRIPTOR = _CONTROLGRAFT, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlGraft) )) _sym_db.RegisterMessage(ControlGraft) ControlPrune = _reflection.GeneratedProtocolMessageType('ControlPrune', (_message.Message,), dict( DESCRIPTOR = _CONTROLPRUNE, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.ControlPrune) )) _sym_db.RegisterMessage(ControlPrune) @@ -616,19 +615,19 @@ TopicDescriptor = _reflection.GeneratedProtocolMessageType('TopicDescriptor', (_ AuthOpts = _reflection.GeneratedProtocolMessageType('AuthOpts', (_message.Message,), dict( DESCRIPTOR = _TOPICDESCRIPTOR_AUTHOPTS, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.AuthOpts) )) , EncOpts = _reflection.GeneratedProtocolMessageType('EncOpts', (_message.Message,), dict( DESCRIPTOR = _TOPICDESCRIPTOR_ENCOPTS, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.EncOpts) )) , DESCRIPTOR = _TOPICDESCRIPTOR, - __module__ = 'rpc_pb2' + __module__ = 'libp2p.pubsub.pb.rpc_pb2' # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor) )) _sym_db.RegisterMessage(TopicDescriptor) diff --git a/libp2p/security/base_session.py b/libp2p/security/base_session.py index 8fc4dab..c4a7275 100644 --- a/libp2p/security/base_session.py +++ b/libp2p/security/base_session.py @@ -7,12 +7,18 @@ from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn -class BaseSession(ISecureConn, IRawConnection): +class BaseSession(ISecureConn): """ ``BaseSession`` is not fully instantiated from its abstract classes as it is only meant to be used in clases that derive from it. """ + local_peer: ID + local_private_key: PrivateKey + conn: IRawConnection + remote_peer_id: ID + remote_permanent_pubkey: PublicKey + def __init__( self, transport: BaseSecureTransport, conn: IRawConnection, peer_id: ID ) -> None: @@ -23,8 +29,6 @@ class BaseSession(ISecureConn, IRawConnection): self.remote_permanent_pubkey = None self.initiator = self.conn.initiator - self.writer = self.conn.writer - self.reader = self.conn.reader # TODO clean up how this is passed around? def next_stream_id(self) -> int: @@ -33,8 +37,8 @@ class BaseSession(ISecureConn, IRawConnection): async def write(self, data: bytes) -> None: await self.conn.write(data) - async def read(self) -> bytes: - return await self.conn.read() + async def read(self, n: int = -1) -> bytes: + return await self.conn.read(n) def close(self) -> None: self.conn.close() diff --git a/libp2p/security/insecure/pb/__init__.py b/libp2p/security/insecure/pb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libp2p/security/insecure/pb/plaintext.proto b/libp2p/security/insecure/pb/plaintext.proto new file mode 100644 index 0000000..0c8ffb0 --- /dev/null +++ b/libp2p/security/insecure/pb/plaintext.proto @@ -0,0 +1,10 @@ +syntax = "proto2"; + +package plaintext.pb; + +import "libp2p/crypto/pb/crypto.proto"; + +message Exchange { + optional bytes id = 1; + optional crypto.pb.PublicKey pubkey = 2; +} diff --git a/libp2p/security/insecure/pb/plaintext_pb2.py b/libp2p/security/insecure/pb/plaintext_pb2.py new file mode 100644 index 0000000..72b2740 --- /dev/null +++ b/libp2p/security/insecure/pb/plaintext_pb2.py @@ -0,0 +1,79 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: libp2p/security/insecure/pb/plaintext.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from libp2p.crypto.pb import crypto_pb2 as libp2p_dot_crypto_dot_pb_dot_crypto__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='libp2p/security/insecure/pb/plaintext.proto', + package='plaintext.pb', + syntax='proto2', + serialized_pb=_b('\n+libp2p/security/insecure/pb/plaintext.proto\x12\x0cplaintext.pb\x1a\x1dlibp2p/crypto/pb/crypto.proto\"<\n\x08\x45xchange\x12\n\n\x02id\x18\x01 \x01(\x0c\x12$\n\x06pubkey\x18\x02 \x01(\x0b\x32\x14.crypto.pb.PublicKey') + , + dependencies=[libp2p_dot_crypto_dot_pb_dot_crypto__pb2.DESCRIPTOR,]) + + + + +_EXCHANGE = _descriptor.Descriptor( + name='Exchange', + full_name='plaintext.pb.Exchange', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='plaintext.pb.Exchange.id', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='pubkey', full_name='plaintext.pb.Exchange.pubkey', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto2', + extension_ranges=[], + oneofs=[ + ], + serialized_start=92, + serialized_end=152, +) + +_EXCHANGE.fields_by_name['pubkey'].message_type = libp2p_dot_crypto_dot_pb_dot_crypto__pb2._PUBLICKEY +DESCRIPTOR.message_types_by_name['Exchange'] = _EXCHANGE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Exchange = _reflection.GeneratedProtocolMessageType('Exchange', (_message.Message,), dict( + DESCRIPTOR = _EXCHANGE, + __module__ = 'libp2p.security.insecure.pb.plaintext_pb2' + # @@protoc_insertion_point(class_scope:plaintext.pb.Exchange) + )) +_sym_db.RegisterMessage(Exchange) + + +# @@protoc_insertion_point(module_scope) diff --git a/libp2p/security/insecure/pb/plaintext_pb2.pyi b/libp2p/security/insecure/pb/plaintext_pb2.pyi new file mode 100644 index 0000000..641bd9a --- /dev/null +++ b/libp2p/security/insecure/pb/plaintext_pb2.pyi @@ -0,0 +1,45 @@ +# @generated by generate_proto_mypy_stubs.py. Do not edit! +import sys +from google.protobuf.descriptor import ( + Descriptor as google___protobuf___descriptor___Descriptor, +) + +from google.protobuf.message import ( + Message as google___protobuf___message___Message, +) + +from libp2p.crypto.pb.crypto_pb2 import ( + PublicKey as libp2p___crypto___pb___crypto_pb2___PublicKey, +) + +from typing import ( + Optional as typing___Optional, +) + +from typing_extensions import ( + Literal as typing_extensions___Literal, +) + + +class Exchange(google___protobuf___message___Message): + DESCRIPTOR: google___protobuf___descriptor___Descriptor = ... + id = ... # type: bytes + + @property + def pubkey(self) -> libp2p___crypto___pb___crypto_pb2___PublicKey: ... + + def __init__(self, + *, + id : typing___Optional[bytes] = None, + pubkey : typing___Optional[libp2p___crypto___pb___crypto_pb2___PublicKey] = None, + ) -> None: ... + @classmethod + def FromString(cls, s: bytes) -> Exchange: ... + def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ... + def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ... + if sys.version_info >= (3,): + def HasField(self, field_name: typing_extensions___Literal[u"id",u"pubkey"]) -> bool: ... + def ClearField(self, field_name: typing_extensions___Literal[u"id",u"pubkey"]) -> None: ... + else: + def HasField(self, field_name: typing_extensions___Literal[u"id",b"id",u"pubkey",b"pubkey"]) -> bool: ... + def ClearField(self, field_name: typing_extensions___Literal[u"id",b"id",u"pubkey",b"pubkey"]) -> None: ... diff --git a/libp2p/security/insecure/transport.py b/libp2p/security/insecure/transport.py index 97c9676..2aad45c 100644 --- a/libp2p/security/insecure/transport.py +++ b/libp2p/security/insecure/transport.py @@ -1,15 +1,67 @@ +from libp2p.crypto.keys import PublicKey +from libp2p.crypto.pb import crypto_pb2 +from libp2p.crypto.utils import pubkey_from_protobuf from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.peer.id import ID from libp2p.security.base_session import BaseSession from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.transport.exceptions import HandshakeFailure from libp2p.typing import TProtocol +from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed -PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/1.0.0") +from .pb import plaintext_pb2 + +# Reference: https://github.com/libp2p/go-libp2p-core/blob/master/sec/insecure/insecure.go + + +PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/2.0.0") class InsecureSession(BaseSession): - pass + async def run_handshake(self) -> None: + msg = make_exchange_message(self.local_private_key.get_public_key()) + msg_bytes = msg.SerializeToString() + encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes) + await self.write(encoded_msg_bytes) + + remote_msg_bytes = await read_fixedint_prefixed(self.conn) + remote_msg = plaintext_pb2.Exchange() + remote_msg.ParseFromString(remote_msg_bytes) + received_peer_id = ID(remote_msg.id) + + # Verify if the receive `ID` matches the one we originally initialize the session. + # We only need to check it when we are the initiator, because only in that condition + # we possibly knows the `ID` of the remote. + if self.initiator and self.remote_peer_id != received_peer_id: + raise HandshakeFailure( + "remote peer sent unexpected peer ID. " + f"expected={self.remote_peer_id} received={received_peer_id}" + ) + + # Verify if the given `pubkey` matches the given `peer_id` + try: + received_pubkey = pubkey_from_protobuf(remote_msg.pubkey) + except ValueError: + raise HandshakeFailure( + f"unknown `key_type` of remote_msg.pubkey={remote_msg.pubkey}" + ) + peer_id_from_received_pubkey = ID.from_pubkey(received_pubkey) + if peer_id_from_received_pubkey != received_peer_id: + raise HandshakeFailure( + "peer id and pubkey from the remote mismatch: " + f"received_peer_id={received_peer_id}, remote_pubkey={received_pubkey}, " + f"peer_id_from_received_pubkey={peer_id_from_received_pubkey}" + ) + + # Nothing is wrong. Store the `pubkey` and `peer_id` in the session. + self.remote_permanent_pubkey = received_pubkey + # Only need to set peer's id when we don't know it before, + # i.e. we are not the connection initiator. + if not self.initiator: + self.remote_peer_id = received_peer_id + + # TODO: Store `pubkey` and `peer_id` to `PeerStore` class InsecureTransport(BaseSecureTransport): @@ -24,7 +76,9 @@ class InsecureTransport(BaseSecureTransport): for an inbound connection (i.e. we are not the initiator) :return: secure connection object (that implements secure_conn_interface) """ - return InsecureSession(self, conn, ID(b"")) + session = InsecureSession(self, conn, ID(b"")) + await session.run_handshake() + return session async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: """ @@ -32,4 +86,14 @@ class InsecureTransport(BaseSecureTransport): for an inbound connection (i.e. we are the initiator) :return: secure connection object (that implements secure_conn_interface) """ - return InsecureSession(self, conn, peer_id) + session = InsecureSession(self, conn, peer_id) + await session.run_handshake() + return session + + +def make_exchange_message(pubkey: PublicKey) -> plaintext_pb2.Exchange: + pubkey_pb = crypto_pb2.PublicKey( + key_type=pubkey.get_type().value, data=pubkey.to_bytes() + ) + id_bytes = ID.from_pubkey(pubkey).to_bytes() + return plaintext_pb2.Exchange(id=id_bytes, pubkey=pubkey_pb) diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index ec24b46..6e69d7a 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -87,10 +87,6 @@ class SecurityMultistream(ABC): :param initiator: true if we are the initiator, false otherwise :return: selected secure transport """ - # TODO: Is conn acceptable to multiselect/multiselect_client - # instead of stream? In go repo, they pass in a raw conn - # (https://raw.githubusercontent.com/libp2p/go-conn-security-multistream/master/ssms.go) - protocol: TProtocol communicator = RawConnectionCommunicator(conn) if initiator: diff --git a/libp2p/security/simple/transport.py b/libp2p/security/simple/transport.py index 7a19a9f..e70edcc 100644 --- a/libp2p/security/simple/transport.py +++ b/libp2p/security/simple/transport.py @@ -6,6 +6,8 @@ from libp2p.peer.id import ID from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.insecure.transport import InsecureSession from libp2p.security.secure_conn_interface import ISecureConn +from libp2p.transport.exceptions import SecurityUpgradeFailure +from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed class SimpleSecurityTransport(BaseSecureTransport): @@ -21,15 +23,19 @@ class SimpleSecurityTransport(BaseSecureTransport): for an inbound connection (i.e. we are not the initiator) :return: secure connection object (that implements secure_conn_interface) """ - await conn.write(self.key_phrase.encode()) - incoming = (await conn.read()).decode() + await conn.write(encode_fixedint_prefixed(self.key_phrase.encode())) + incoming = (await read_fixedint_prefixed(conn)).decode() if incoming != self.key_phrase: - raise Exception( + raise SecurityUpgradeFailure( "Key phrase differed between nodes. Expected " + self.key_phrase ) session = InsecureSession(self, conn, ID(b"")) + # NOTE: Here we calls `run_handshake` for both sides to exchange their public keys and + # peer ids, otherwise tests fail. However, it seems pretty weird that + # `SimpleSecurityTransport` sends peer id through `Insecure`. + await session.run_handshake() # NOTE: this is abusing the abstraction we have here # but this code may be deprecated soon and this exists # mainly to satisfy a test that will go along w/ it @@ -43,19 +49,23 @@ class SimpleSecurityTransport(BaseSecureTransport): for an inbound connection (i.e. we are the initiator) :return: secure connection object (that implements secure_conn_interface) """ - await conn.write(self.key_phrase.encode()) - incoming = (await conn.read()).decode() + await conn.write(encode_fixedint_prefixed(self.key_phrase.encode())) + incoming = (await read_fixedint_prefixed(conn)).decode() # Force context switch, as this security transport is built for testing locally # in a single event loop await asyncio.sleep(0) if incoming != self.key_phrase: - raise Exception( + raise SecurityUpgradeFailure( "Key phrase differed between nodes. Expected " + self.key_phrase ) session = InsecureSession(self, conn, peer_id) + # NOTE: Here we calls `run_handshake` for both sides to exchange their public keys and + # peer ids, otherwise tests fail. However, it seems pretty weird that + # `SimpleSecurityTransport` sends peer id through `Insecure`. + await session.run_handshake() # NOTE: this is abusing the abstraction we have here # but this code may be deprecated soon and this exists # mainly to satisfy a test that will go along w/ it diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 7084d7a..7270c15 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -17,7 +17,6 @@ class IMuxedConn(ABC): reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go """ - initiator: bool peer_id: ID @abstractmethod @@ -35,6 +34,11 @@ class IMuxedConn(ABC): :param peer_id: peer_id of peer the connection is to """ + @property + @abstractmethod + def initiator(self) -> bool: + pass + @abstractmethod def close(self) -> None: """ @@ -62,6 +66,7 @@ class IMuxedConn(ABC): Read a message from `stream_id`'s buffer, non-blockingly. """ + # FIXME: Remove multiaddr from being passed into muxed_conn @abstractmethod async def open_stream( self, protocol_id: str, multi_addr: Multiaddr diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index aa64b69..8f95124 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -9,11 +9,11 @@ from libp2p.peer.id import ID from libp2p.security.secure_conn_interface import ISecureConn from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.typing import TProtocol +from libp2p.utils import decode_uvarint_from_stream, encode_uvarint from .constants import HeaderTags from .exceptions import StreamNotFound from .mplex_stream import MplexStream -from .utils import decode_uvarint_from_stream, encode_uvarint MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0") @@ -25,7 +25,6 @@ class Mplex(IMuxedConn): secured_conn: ISecureConn raw_conn: IRawConnection - initiator: bool peer_id: ID # TODO: `dataIn` in go implementation. Should be size of 8. # TODO: Also, `dataIn` is closed indicating EOF in Go. We don't have similar strategies @@ -41,13 +40,12 @@ class Mplex(IMuxedConn): ) -> None: """ create a new muxed connection - :param conn: an instance of raw connection + :param secured_conn: an instance of ``ISecureConn`` :param generic_protocol_handler: generic protocol handler for new muxed streams :param peer_id: peer_id of peer the connection is to """ self.conn = secured_conn - self.initiator = secured_conn.initiator # Store generic protocol handler self.generic_protocol_handler = generic_protocol_handler @@ -63,6 +61,10 @@ class Mplex(IMuxedConn): # Kick off reading asyncio.ensure_future(self.handle_incoming()) + @property + def initiator(self) -> bool: + return self.conn.initiator + def close(self) -> None: """ close the stream muxer and underlying raw connection @@ -98,6 +100,7 @@ class Mplex(IMuxedConn): return None return await self.buffers[stream_id].get() + # FIXME: Remove multiaddr from being passed into muxed_conn async def open_stream( self, protocol_id: str, multi_addr: Multiaddr ) -> IMuxedStream: @@ -108,7 +111,7 @@ class Mplex(IMuxedConn): :return: a new stream """ stream_id = self.conn.next_stream_id() - stream = MplexStream(stream_id, multi_addr, self) + stream = MplexStream(stream_id, True, self) self.buffers[stream_id] = asyncio.Queue() await self.send_message(HeaderTags.NewStream, None, stream_id) return stream @@ -147,8 +150,7 @@ class Mplex(IMuxedConn): :param _bytes: byte array to write :return: length written """ - self.conn.writer.write(_bytes) - await self.conn.writer.drain() + await self.conn.write(_bytes) return len(_bytes) async def handle_incoming(self) -> None: @@ -186,11 +188,9 @@ class Mplex(IMuxedConn): # loop in handle_incoming timeout = 0.1 try: - header = await decode_uvarint_from_stream(self.conn.reader, timeout) - length = await decode_uvarint_from_stream(self.conn.reader, timeout) - message = await asyncio.wait_for( - self.conn.reader.read(length), timeout=timeout - ) + header = await decode_uvarint_from_stream(self.conn, timeout) + length = await decode_uvarint_from_stream(self.conn, timeout) + message = await asyncio.wait_for(self.conn.read(length), timeout=timeout) except asyncio.TimeoutError: return None, None, None diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py deleted file mode 100644 index 44326ad..0000000 --- a/libp2p/stream_muxer/mplex/utils.py +++ /dev/null @@ -1,47 +0,0 @@ -import asyncio -import struct -from typing import Tuple - -from libp2p.typing import StreamReader - - -def encode_uvarint(number: int) -> bytes: - """Pack `number` into varint bytes""" - buf = b"" - while True: - towrite = number & 0x7F - number >>= 7 - if number: - buf += bytes((towrite | 0x80,)) - else: - buf += bytes((towrite,)) - break - return buf - - -def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]: - shift = 0 - result = 0 - while True: - i = buff[index] - result |= (i & 0x7F) << shift - shift += 7 - if not i & 0x80: - break - index += 1 - - return result, index + 1 - - -async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> int: - shift = 0 - result = 0 - while True: - byte = await asyncio.wait_for(reader.read(1), timeout=timeout) - i = struct.unpack(">H", b"\x00" + byte)[0] - result |= (i & 0x7F) << shift - shift += 7 - if not i & 0x80: - break - - return result diff --git a/libp2p/transport/exceptions.py b/libp2p/transport/exceptions.py new file mode 100644 index 0000000..b10cfc9 --- /dev/null +++ b/libp2p/transport/exceptions.py @@ -0,0 +1,18 @@ +from libp2p.exceptions import BaseLibp2pError + + +# TODO: Add `BaseLibp2pError` and `UpgradeFailure` can inherit from it? +class UpgradeFailure(BaseLibp2pError): + pass + + +class SecurityUpgradeFailure(UpgradeFailure): + pass + + +class MuxerUpgradeFailure(UpgradeFailure): + pass + + +class HandshakeFailure(BaseLibp2pError): + pass diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index ae1e776..6f6d4d9 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -62,6 +62,7 @@ class TCPListener(IListener): class TCP(ITransport): + # TODO: Remove `self_id` async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: """ dial a transport to peer listening on multiaddr @@ -74,18 +75,6 @@ class TCP(ITransport): reader, writer = await asyncio.open_connection(host, int(port)) - # TODO: Change this `sending peer id` process to `/plaintext/2.0.0` - # First: send our peer ID so receiver knows it - writer.write(self_id.to_base58().encode()) - await writer.drain() - - # Await ack for peer id - expected_ack_str = "received peer id" - ack = (await reader.read(len(expected_ack_str))).decode() - - if ack != expected_ack_str: - raise Exception("Receiver did not receive peer id") - return RawConnection(host, port, reader, writer, True) def create_listener(self, handler_function: THandler) -> TCPListener: diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index b0373ec..762a811 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -3,11 +3,17 @@ from typing import Mapping from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.peer.id import ID +from libp2p.protocol_muxer.exceptions import MultiselectClientError, MultiselectError from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.security_multistream import SecurityMultistream from libp2p.stream_muxer.abc import IMuxedConn from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream +from libp2p.transport.exceptions import ( + HandshakeFailure, + MuxerUpgradeFailure, + SecurityUpgradeFailure, +) from libp2p.typing import TProtocol from .listener_interface import IListener @@ -39,10 +45,20 @@ class TransportUpgrader: """ Upgrade conn to a secured connection """ - if initiator: - return await self.security_multistream.secure_outbound(raw_conn, peer_id) - - return await self.security_multistream.secure_inbound(raw_conn) + try: + if initiator: + return await self.security_multistream.secure_outbound( + raw_conn, peer_id + ) + return await self.security_multistream.secure_inbound(raw_conn) + except (MultiselectError, MultiselectClientError) as error: + raise SecurityUpgradeFailure( + "failed to negotiate the secure protocol" + ) from error + except HandshakeFailure as error: + raise SecurityUpgradeFailure( + "handshake failed when upgrading to secure connection" + ) from error async def upgrade_connection( self, @@ -53,6 +69,11 @@ class TransportUpgrader: """ Upgrade secured connection to a muxed connection """ - return await self.muxer_multistream.new_conn( - conn, generic_protocol_handler, peer_id - ) + try: + return await self.muxer_multistream.new_conn( + conn, generic_protocol_handler, peer_id + ) + except (MultiselectError, MultiselectClientError) as error: + raise MuxerUpgradeFailure( + "failed to negotiate the multiplexer protocol" + ) from error diff --git a/libp2p/typing.py b/libp2p/typing.py index 9810746..f36d8ab 100644 --- a/libp2p/typing.py +++ b/libp2p/typing.py @@ -1,6 +1,7 @@ -import asyncio from typing import TYPE_CHECKING, Awaitable, Callable, NewType, Union +from libp2p.network.connection.raw_connection_interface import IRawConnection + if TYPE_CHECKING: from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401 from libp2p.stream_muxer.abc import IMuxedStream # noqa: F401 @@ -9,4 +10,4 @@ TProtocol = NewType("TProtocol", str) StreamHandlerFn = Callable[["INetStream"], Awaitable[None]] -StreamReader = Union["IMuxedStream", asyncio.StreamReader] +StreamReader = Union["IMuxedStream", IRawConnection] diff --git a/libp2p/utils.py b/libp2p/utils.py new file mode 100644 index 0000000..9a1f0cb --- /dev/null +++ b/libp2p/utils.py @@ -0,0 +1,99 @@ +import asyncio +import struct +from typing import Tuple + +from libp2p.typing import StreamReader + + +def encode_uvarint(number: int) -> bytes: + """Pack `number` into varint bytes""" + buf = b"" + while True: + towrite = number & 0x7F + number >>= 7 + if number: + buf += bytes((towrite | 0x80,)) + else: + buf += bytes((towrite,)) + break + return buf + + +def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]: + shift = 0 + result = 0 + while True: + i = buff[index] + result |= (i & 0x7F) << shift + shift += 7 + if not i & 0x80: + break + index += 1 + + return result, index + 1 + + +async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> int: + shift = 0 + result = 0 + while True: + byte = await asyncio.wait_for(reader.read(1), timeout=timeout) + i = struct.unpack(">H", b"\x00" + byte)[0] + result |= (i & 0x7F) << shift + shift += 7 + if not i & 0x80: + break + + return result + + +# Varint-prefixed read/write + + +def encode_varint_prefixed(msg_bytes: bytes) -> bytes: + varint_len = encode_uvarint(len(msg_bytes)) + return varint_len + msg_bytes + + +async def read_varint_prefixed_bytes(reader: StreamReader) -> bytes: + len_msg = await decode_uvarint_from_stream(reader, None) + data = await reader.read(len_msg) + if len(data) != len_msg: + raise ValueError( + f"failed to read enough bytes: len_msg={len_msg}, data={data!r}" + ) + return data + + +# Delimited read/write, used by multistream-select. +# Reference: https://github.com/gogo/protobuf/blob/07eab6a8298cf32fac45cceaac59424f98421bbc/io/varint.go#L109-L126 # noqa: E501 + + +def encode_delim(msg: bytes) -> bytes: + delimited_msg = msg + b"\n" + return encode_varint_prefixed(delimited_msg) + + +async def read_delim(reader: StreamReader) -> bytes: + msg_bytes = await read_varint_prefixed_bytes(reader) + # TODO: Investigate if it is possible to have empty `msg_bytes` + if len(msg_bytes) != 0 and msg_bytes[-1:] != b"\n": + raise ValueError(f'msg_bytes is not delimited by b"\\n": msg_bytes={msg_bytes}') + return msg_bytes[:-1] + + +SIZE_LEN_BYTES = 4 + +# Fixed-prefixed read/write, used by "/plaintext/2.0.0". +# Reference: https://github.com/libp2p/go-msgio/blob/d5bbf59d3c4240266b1d2e5df9dc993454c42011/num.go#L11-L33 # noqa: E501 # noqa: E501 + + +def encode_fixedint_prefixed(msg_bytes: bytes) -> bytes: + len_prefix = len(msg_bytes).to_bytes(SIZE_LEN_BYTES, "big") + return len_prefix + msg_bytes + + +async def read_fixedint_prefixed(reader: StreamReader) -> bytes: + len_bytes = await reader.read(SIZE_LEN_BYTES) + len_int = int.from_bytes(len_bytes, "big") + return await reader.read(len_int) diff --git a/tests/examples/test_chat.py b/tests/examples/test_chat.py index 0422c95..f461d9d 100644 --- a/tests/examples/test_chat.py +++ b/tests/examples/test_chat.py @@ -3,7 +3,7 @@ import asyncio import pytest from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.protocol_muxer.multiselect_client import MultiselectClientError +from libp2p.protocol_muxer.exceptions import MultiselectClientError from tests.utils import cleanup, set_up_nodes_by_transport_opt PROTOCOL_ID = "/chat/1.0.0" diff --git a/tests/protocol_muxer/test_protocol_muxer.py b/tests/protocol_muxer/test_protocol_muxer.py index 775c460..02f08bd 100644 --- a/tests/protocol_muxer/test_protocol_muxer.py +++ b/tests/protocol_muxer/test_protocol_muxer.py @@ -1,6 +1,6 @@ import pytest -from libp2p.protocol_muxer.multiselect_client import MultiselectClientError +from libp2p.protocol_muxer.exceptions import MultiselectClientError from tests.utils import cleanup, set_up_nodes_by_transport_opt # TODO: Add tests for multiple streams being opened on different diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index ddbae8e..ea78d1f 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -1,14 +1,13 @@ import asyncio -import multiaddr import pytest from libp2p import new_node from libp2p.crypto.rsa import create_new_key_pair -from libp2p.peer.peerinfo import info_from_p2p_addr -from libp2p.protocol_muxer.multiselect_client import MultiselectClientError +from libp2p.network.exceptions import SwarmException from libp2p.security.insecure.transport import InsecureSession, InsecureTransport from libp2p.security.simple.transport import SimpleSecurityTransport +from tests.configs import LISTEN_MADDR from tests.utils import cleanup, connect # TODO: Add tests for multiple streams being opened on different @@ -16,9 +15,7 @@ from tests.utils import cleanup, connect def peer_id_for_node(node): - addr = node.get_addrs()[0] - info = info_from_p2p_addr(addr) - return info.peer_id + return node.get_id() initiator_key_pair = create_new_key_pair() @@ -35,14 +32,16 @@ async def perform_simple_test( # TODO: implement -- note we need to introduce the notion of communicating over a raw connection # for testing, we do NOT want to communicate over a stream so we can't just create two nodes # and use their conn because our mplex will internally relay messages to a stream - sec_opt1 = transports_for_initiator - sec_opt2 = transports_for_noninitiator - node1 = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"], sec_opt=sec_opt1) - node2 = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"], sec_opt=sec_opt2) + node1 = await new_node( + key_pair=initiator_key_pair, sec_opt=transports_for_initiator + ) + node2 = await new_node( + key_pair=noninitiator_key_pair, sec_opt=transports_for_noninitiator + ) - await node1.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) - await node2.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) + await node1.get_network().listen(LISTEN_MADDR) + await node2.get_network().listen(LISTEN_MADDR) await connect(node1, node2) @@ -162,7 +161,7 @@ async def test_multiple_security_none_the_same_fails(): def assertion_func(_): assert False - with pytest.raises(MultiselectClientError): + with pytest.raises(SwarmException): await perform_simple_test( assertion_func, transports_for_initiator, transports_for_noninitiator )