Merge pull request #253 from mhchia/feature/plaintext-2.0.0

Add `/plaintext/2.0.0` secure channel
This commit is contained in:
Kevin Mai-Husan Chia 2019-08-22 23:40:02 +08:00 committed by GitHub
commit da3c8be464
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 668 additions and 308 deletions

View File

@ -1,4 +1,10 @@
FILES_TO_LINT = libp2p tests examples setup.py FILES_TO_LINT = libp2p tests examples setup.py
PB = libp2p/crypto/pb/crypto.proto libp2p/pubsub/pb/rpc.proto libp2p/security/insecure/pb/plaintext.proto
PY = $(PB:.proto=_pb2.py)
PYI = $(PB:.proto=_pb2.pyi)
# Set default to `protobufs`, otherwise `format` is called when typing only `make`
all: protobufs
format: format:
black $(FILES_TO_LINT) black $(FILES_TO_LINT)
@ -10,6 +16,12 @@ lintroll:
isort --recursive --check-only $(FILES_TO_LINT) isort --recursive --check-only $(FILES_TO_LINT)
flake8 $(FILES_TO_LINT) flake8 $(FILES_TO_LINT)
protobufs: protobufs: $(PY)
cd libp2p/crypto/pb && protoc --python_out=. --mypy_out=. crypto.proto
cd libp2p/pubsub/pb && protoc --python_out=. --mypy_out=. rpc.proto %_pb2.py: %.proto
protoc --python_out=. --mypy_out=. $<
.PHONY: clean
clean:
rm -f $(PY) $(PYI)

View File

@ -98,7 +98,7 @@ def initialize_default_swarm(
muxer_transports_by_protocol = muxer_opt or {MPLEX_PROTOCOL_ID: Mplex} muxer_transports_by_protocol = muxer_opt or {MPLEX_PROTOCOL_ID: Mplex}
security_transports_by_protocol = sec_opt or { security_transports_by_protocol = sec_opt or {
PLAINTEXT_PROTOCOL_ID: InsecureTransport(key_pair) TProtocol(PLAINTEXT_PROTOCOL_ID): InsecureTransport(key_pair)
} }
upgrader = TransportUpgrader( upgrader = TransportUpgrader(
security_transports_by_protocol, muxer_transports_by_protocol security_transports_by_protocol, muxer_transports_by_protocol

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT! # Generated by the protocol buffer compiler. DO NOT EDIT!
# source: crypto.proto # source: libp2p/crypto/pb/crypto.proto
import sys import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
@ -9,6 +8,7 @@ from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message from google.protobuf import message as _message
from google.protobuf import reflection as _reflection from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports) # @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default() _sym_db = _symbol_database.Default()
@ -17,11 +17,10 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor( DESCRIPTOR = _descriptor.FileDescriptor(
name='crypto.proto', name='libp2p/crypto/pb/crypto.proto',
package='crypto.pb', package='crypto.pb',
syntax='proto2', syntax='proto2',
serialized_options=None, serialized_pb=_b('\n\x1dlibp2p/crypto/pb/crypto.proto\x12\tcrypto.pb\"?\n\tPublicKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"@\n\nPrivateKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c*9\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03')
serialized_pb=_b('\n\x0c\x63rypto.proto\x12\tcrypto.pb\"?\n\tPublicKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c\"@\n\nPrivateKey\x12$\n\x08key_type\x18\x01 \x02(\x0e\x32\x12.crypto.pb.KeyType\x12\x0c\n\x04\x64\x61ta\x18\x02 \x02(\x0c*9\n\x07KeyType\x12\x07\n\x03RSA\x10\x00\x12\x0b\n\x07\x45\x64\x32\x35\x35\x31\x39\x10\x01\x12\r\n\tSecp256k1\x10\x02\x12\t\n\x05\x45\x43\x44SA\x10\x03')
) )
_KEYTYPE = _descriptor.EnumDescriptor( _KEYTYPE = _descriptor.EnumDescriptor(
@ -32,25 +31,25 @@ _KEYTYPE = _descriptor.EnumDescriptor(
values=[ values=[
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='RSA', index=0, number=0, name='RSA', index=0, number=0,
serialized_options=None, options=None,
type=None), type=None),
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='Ed25519', index=1, number=1, name='Ed25519', index=1, number=1,
serialized_options=None, options=None,
type=None), type=None),
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='Secp256k1', index=2, number=2, name='Secp256k1', index=2, number=2,
serialized_options=None, options=None,
type=None), type=None),
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='ECDSA', index=3, number=3, name='ECDSA', index=3, number=3,
serialized_options=None, options=None,
type=None), type=None),
], ],
containing_type=None, containing_type=None,
serialized_options=None, options=None,
serialized_start=158, serialized_start=175,
serialized_end=215, serialized_end=232,
) )
_sym_db.RegisterEnumDescriptor(_KEYTYPE) _sym_db.RegisterEnumDescriptor(_KEYTYPE)
@ -75,28 +74,28 @@ _PUBLICKEY = _descriptor.Descriptor(
has_default_value=False, default_value=0, has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='data', full_name='crypto.pb.PublicKey.data', index=1, name='data', full_name='crypto.pb.PublicKey.data', index=1,
number=2, type=12, cpp_type=9, label=2, number=2, type=12, cpp_type=9, label=2,
has_default_value=False, default_value=_b(""), has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=27, serialized_start=44,
serialized_end=90, serialized_end=107,
) )
@ -113,28 +112,28 @@ _PRIVATEKEY = _descriptor.Descriptor(
has_default_value=False, default_value=0, has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='data', full_name='crypto.pb.PrivateKey.data', index=1, name='data', full_name='crypto.pb.PrivateKey.data', index=1,
number=2, type=12, cpp_type=9, label=2, number=2, type=12, cpp_type=9, label=2,
has_default_value=False, default_value=_b(""), has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=92, serialized_start=109,
serialized_end=156, serialized_end=173,
) )
_PUBLICKEY.fields_by_name['key_type'].enum_type = _KEYTYPE _PUBLICKEY.fields_by_name['key_type'].enum_type = _KEYTYPE
@ -146,14 +145,14 @@ _sym_db.RegisterFileDescriptor(DESCRIPTOR)
PublicKey = _reflection.GeneratedProtocolMessageType('PublicKey', (_message.Message,), dict( PublicKey = _reflection.GeneratedProtocolMessageType('PublicKey', (_message.Message,), dict(
DESCRIPTOR = _PUBLICKEY, DESCRIPTOR = _PUBLICKEY,
__module__ = 'crypto_pb2' __module__ = 'libp2p.crypto.pb.crypto_pb2'
# @@protoc_insertion_point(class_scope:crypto.pb.PublicKey) # @@protoc_insertion_point(class_scope:crypto.pb.PublicKey)
)) ))
_sym_db.RegisterMessage(PublicKey) _sym_db.RegisterMessage(PublicKey)
PrivateKey = _reflection.GeneratedProtocolMessageType('PrivateKey', (_message.Message,), dict( PrivateKey = _reflection.GeneratedProtocolMessageType('PrivateKey', (_message.Message,), dict(
DESCRIPTOR = _PRIVATEKEY, DESCRIPTOR = _PRIVATEKEY,
__module__ = 'crypto_pb2' __module__ = 'libp2p.crypto.pb.crypto_pb2'
# @@protoc_insertion_point(class_scope:crypto.pb.PrivateKey) # @@protoc_insertion_point(class_scope:crypto.pb.PrivateKey)
)) ))
_sym_db.RegisterMessage(PrivateKey) _sym_db.RegisterMessage(PrivateKey)

View File

@ -11,6 +11,11 @@ class RSAPublicKey(PublicKey):
def to_bytes(self) -> bytes: def to_bytes(self) -> bytes:
return self.impl.export_key("DER") return self.impl.export_key("DER")
@classmethod
def from_bytes(cls, key_bytes: bytes) -> "RSAPublicKey":
rsakey = RSA.import_key(key_bytes)
return cls(rsakey)
def get_type(self) -> KeyType: def get_type(self) -> KeyType:
return KeyType.RSA return KeyType.RSA

View File

@ -10,6 +10,11 @@ class Secp256k1PublicKey(PublicKey):
def to_bytes(self) -> bytes: def to_bytes(self) -> bytes:
return self.impl.format() return self.impl.format()
@classmethod
def from_bytes(cls, key_bytes: bytes) -> "Secp256k1PublicKey":
secp256k1_pubkey = coincurve.PublicKey(key_bytes)
return cls(secp256k1_pubkey)
def get_type(self) -> KeyType: def get_type(self) -> KeyType:
return KeyType.Secp256k1 return KeyType.Secp256k1

17
libp2p/crypto/utils.py Normal file
View File

@ -0,0 +1,17 @@
from .keys import PublicKey
from .pb import crypto_pb2 as protobuf
from .rsa import RSAPublicKey
from .secp256k1 import Secp256k1PublicKey
def pubkey_from_protobuf(pubkey_pb: protobuf.PublicKey) -> PublicKey:
if pubkey_pb.key_type == protobuf.RSA:
return RSAPublicKey.from_bytes(pubkey_pb.data)
# TODO: Test against secp256k1 keys
elif pubkey_pb.key_type == protobuf.Secp256k1:
return Secp256k1PublicKey.from_bytes(pubkey_pb.data)
# TODO: Support `Ed25519` and `ECDSA` in the future?
else:
raise ValueError(
f"unsupported key_type={pubkey_pb.key_type}, data={pubkey_pb.data!r}"
)

View File

@ -1,6 +1,8 @@
class ValidationError(Exception): class BaseLibp2pError(Exception):
pass
class ValidationError(BaseLibp2pError):
""" """
Raised when something does not pass a validation check. Raised when something does not pass a validation check.
""" """
pass

View File

@ -9,9 +9,11 @@ class RawConnection(IRawConnection):
conn_port: str conn_port: str
reader: asyncio.StreamReader reader: asyncio.StreamReader
writer: asyncio.StreamWriter writer: asyncio.StreamWriter
_next_id: int
initiator: bool initiator: bool
_drain_lock: asyncio.Lock
_next_id: int
def __init__( def __init__(
self, self,
ip: str, ip: str,
@ -24,17 +26,25 @@ class RawConnection(IRawConnection):
self.conn_port = port self.conn_port = port
self.reader = reader self.reader = reader
self.writer = writer self.writer = writer
self._next_id = 0 if initiator else 1
self.initiator = initiator self.initiator = initiator
self._drain_lock = asyncio.Lock()
self._next_id = 0 if initiator else 1
async def write(self, data: bytes) -> None: async def write(self, data: bytes) -> None:
self.writer.write(data) self.writer.write(data)
self.writer.write("\n".encode()) # Reference: https://github.com/ethereum/lahja/blob/93610b2eb46969ff1797e0748c7ac2595e130aef/lahja/asyncio/endpoint.py#L99-L102 # noqa: E501
await self.writer.drain() # Use a lock to serialize drain() calls. Circumvents this bug:
# https://bugs.python.org/issue29930
async with self._drain_lock:
await self.writer.drain()
async def read(self) -> bytes: async def read(self, n: int = -1) -> bytes:
line = await self.reader.readline() """
return line.rstrip(b"\n") Read up to ``n`` bytes from the underlying stream.
This call is delegated directly to the underlying ``self.reader``.
"""
return await self.reader.read(n)
def close(self) -> None: def close(self) -> None:
self.writer.close() self.writer.close()

View File

@ -1,5 +1,4 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import asyncio
class IRawConnection(ABC): class IRawConnection(ABC):
@ -9,17 +8,12 @@ class IRawConnection(ABC):
initiator: bool initiator: bool
# TODO: reader and writer shouldn't be exposed.
# Need better API for the consumers
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
@abstractmethod @abstractmethod
async def write(self, data: bytes) -> None: async def write(self, data: bytes) -> None:
pass pass
@abstractmethod @abstractmethod
async def read(self) -> bytes: async def read(self, n: int = -1) -> bytes:
pass pass
@abstractmethod @abstractmethod

View File

@ -0,0 +1,5 @@
from libp2p.exceptions import BaseLibp2pError
class SwarmException(BaseLibp2pError):
pass

View File

@ -33,7 +33,7 @@ class INetwork(ABC):
dial_peer try to create a connection to peer_id dial_peer try to create a connection to peer_id
:param peer_id: peer if we want to dial :param peer_id: peer if we want to dial
:raises SwarmException: raised when no address if found for peer_id :raises SwarmException: raised when an error occurs
:return: muxed connection :return: muxed connection
""" """

View File

@ -10,12 +10,14 @@ from libp2p.protocol_muxer.multiselect_client import MultiselectClient
from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator
from libp2p.routing.interfaces import IPeerRouting from libp2p.routing.interfaces import IPeerRouting
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
from libp2p.transport.exceptions import MuxerUpgradeFailure, SecurityUpgradeFailure
from libp2p.transport.listener_interface import IListener from libp2p.transport.listener_interface import IListener
from libp2p.transport.transport_interface import ITransport from libp2p.transport.transport_interface import ITransport
from libp2p.transport.upgrader import TransportUpgrader from libp2p.transport.upgrader import TransportUpgrader
from libp2p.typing import StreamHandlerFn, TProtocol from libp2p.typing import StreamHandlerFn, TProtocol
from .connection.raw_connection import RawConnection from .connection.raw_connection import RawConnection
from .exceptions import SwarmException
from .network_interface import INetwork from .network_interface import INetwork
from .notifee_interface import INotifee from .notifee_interface import INotifee
from .stream.net_stream import NetStream from .stream.net_stream import NetStream
@ -84,7 +86,7 @@ class Swarm(INetwork):
""" """
dial_peer try to create a connection to peer_id dial_peer try to create a connection to peer_id
:param peer_id: peer if we want to dial :param peer_id: peer if we want to dial
:raises SwarmException: raised when no address if found for peer_id :raises SwarmException: raised when an error occurs
:return: muxed connection :return: muxed connection
""" """
@ -110,10 +112,26 @@ class Swarm(INetwork):
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure
# the conn and then mux the conn # the conn and then mux the conn
secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, True) try:
muxed_conn = await self.upgrader.upgrade_connection( secured_conn = await self.upgrader.upgrade_security(
secured_conn, self.generic_protocol_handler, peer_id raw_conn, peer_id, True
) )
except SecurityUpgradeFailure as error:
# TODO: Add logging to indicate the failure
raw_conn.close()
raise SwarmException(
f"fail to upgrade the connection to a secured connection from {peer_id}"
) from error
try:
muxed_conn = await self.upgrader.upgrade_connection(
secured_conn, self.generic_protocol_handler, peer_id
)
except MuxerUpgradeFailure as error:
# TODO: Add logging to indicate the failure
secured_conn.close()
raise SwarmException(
f"fail to upgrade the connection to a muxed connection from {peer_id}"
) from error
# Store muxed connection in connections # Store muxed connection in connections
self.connections[peer_id] = muxed_conn self.connections[peer_id] = muxed_conn
@ -145,6 +163,7 @@ class Swarm(INetwork):
# Use muxed conn to open stream, which returns # Use muxed conn to open stream, which returns
# a muxed stream # a muxed stream
# TODO: Remove protocol id from being passed into muxed_conn # TODO: Remove protocol id from being passed into muxed_conn
# FIXME: Remove multiaddr from being passed into muxed_conn
muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr) muxed_stream = await muxed_conn.open_stream(protocol_ids[0], multiaddr)
# Perform protocol muxing to determine protocol to use # Perform protocol muxing to determine protocol to use
@ -176,24 +195,18 @@ class Swarm(INetwork):
Call listener listen with the multiaddr Call listener listen with the multiaddr
Map multiaddr to listener Map multiaddr to listener
""" """
for multiaddr in multiaddrs: for maddr in multiaddrs:
if str(multiaddr) in self.listeners: if str(maddr) in self.listeners:
return True return True
async def conn_handler( async def conn_handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None: ) -> None:
# Read in first message (should be peer_id of initiator) and ack
peer_id = ID.from_base58((await reader.read(1024)).decode())
writer.write("received peer id".encode())
await writer.drain()
# Upgrade reader/write to a net_stream and pass \ # Upgrade reader/write to a net_stream and pass \
# to appropriate stream handler (using multiaddr) # to appropriate stream handler (using multiaddr)
raw_conn = RawConnection( raw_conn = RawConnection(
multiaddr.value_for_protocol("ip4"), maddr.value_for_protocol("ip4"),
multiaddr.value_for_protocol("tcp"), maddr.value_for_protocol("tcp"),
reader, reader,
writer, writer,
False, False,
@ -201,12 +214,28 @@ class Swarm(INetwork):
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure
# the conn and then mux the conn # the conn and then mux the conn
secured_conn = await self.upgrader.upgrade_security( try:
raw_conn, peer_id, False # FIXME: This dummy `ID(b"")` for the remote peer is useless.
) secured_conn = await self.upgrader.upgrade_security(
muxed_conn = await self.upgrader.upgrade_connection( raw_conn, ID(b""), False
secured_conn, self.generic_protocol_handler, peer_id )
) except SecurityUpgradeFailure as error:
# TODO: Add logging to indicate the failure
raw_conn.close()
raise SwarmException(
"fail to upgrade the connection to a secured connection"
) from error
peer_id = secured_conn.get_remote_peer()
try:
muxed_conn = await self.upgrader.upgrade_connection(
secured_conn, self.generic_protocol_handler, peer_id
)
except MuxerUpgradeFailure as error:
# TODO: Add logging to indicate the failure
secured_conn.close()
raise SwarmException(
f"fail to upgrade the connection to a muxed connection from {peer_id}"
) from error
# Store muxed_conn with peer id # Store muxed_conn with peer id
self.connections[peer_id] = muxed_conn self.connections[peer_id] = muxed_conn
@ -218,19 +247,19 @@ class Swarm(INetwork):
try: try:
# Success # Success
listener = self.transport.create_listener(conn_handler) listener = self.transport.create_listener(conn_handler)
self.listeners[str(multiaddr)] = listener self.listeners[str(maddr)] = listener
await listener.listen(multiaddr) await listener.listen(maddr)
# Call notifiers since event occurred # Call notifiers since event occurred
for notifee in self.notifees: for notifee in self.notifees:
await notifee.listen(self, multiaddr) await notifee.listen(self, maddr)
return True return True
except IOError: except IOError:
# Failed. Continue looping. # Failed. Continue looping.
print("Failed to connect to: " + str(multiaddr)) print("Failed to connect to: " + str(maddr))
# No multiaddr succeeded # No maddr succeeded
return False return False
def notify(self, notifee: INotifee) -> bool: def notify(self, notifee: INotifee) -> bool:
@ -280,7 +309,3 @@ def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn:
asyncio.ensure_future(handler(net_stream)) asyncio.ensure_future(handler(net_stream))
return generic_protocol_handler return generic_protocol_handler
class SwarmException(Exception):
pass

View File

@ -0,0 +1,9 @@
from libp2p.exceptions import BaseLibp2pError
class MultiselectError(BaseLibp2pError):
"""Raised when an error occurs in multiselect process"""
class MultiselectClientError(BaseLibp2pError):
"""Raised when an error occurs in protocol selection process"""

View File

@ -2,6 +2,7 @@ from typing import Dict, Tuple
from libp2p.typing import StreamHandlerFn, TProtocol from libp2p.typing import StreamHandlerFn, TProtocol
from .exceptions import MultiselectError
from .multiselect_communicator_interface import IMultiselectCommunicator from .multiselect_communicator_interface import IMultiselectCommunicator
from .multiselect_muxer_interface import IMultiselectMuxer from .multiselect_muxer_interface import IMultiselectMuxer
@ -79,7 +80,10 @@ class Multiselect(IMultiselectMuxer):
# Confirm that the protocols are the same # Confirm that the protocols are the same
if not validate_handshake(handshake_contents): if not validate_handshake(handshake_contents):
raise MultiselectError("multiselect protocol ID mismatch") raise MultiselectError(
"multiselect protocol ID mismatch: "
f"received handshake_contents={handshake_contents}"
)
# Handshake succeeded if this point is reached # Handshake succeeded if this point is reached
@ -94,7 +98,3 @@ def validate_handshake(handshake_contents: str) -> bool:
# TODO: Modify this when format used by go repo for messages # TODO: Modify this when format used by go repo for messages
# is added # is added
return handshake_contents == MULTISELECT_PROTOCOL_ID return handshake_contents == MULTISELECT_PROTOCOL_ID
class MultiselectError(ValueError):
"""Raised when an error occurs in multiselect process"""

View File

@ -2,6 +2,7 @@ from typing import Sequence
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from .exceptions import MultiselectClientError
from .multiselect_client_interface import IMultiselectClient from .multiselect_client_interface import IMultiselectClient
from .multiselect_communicator_interface import IMultiselectCommunicator from .multiselect_communicator_interface import IMultiselectCommunicator
@ -116,7 +117,3 @@ def validate_handshake(handshake_contents: str) -> bool:
# TODO: Modify this when format used by go repo for messages # TODO: Modify this when format used by go repo for messages
# is added # is added
return handshake_contents == MULTISELECT_PROTOCOL_ID return handshake_contents == MULTISELECT_PROTOCOL_ID
class MultiselectClientError(ValueError):
"""Raised when an error occurs in protocol selection process"""

View File

@ -1,23 +1,10 @@
from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection_interface import IRawConnection
from libp2p.stream_muxer.abc import IMuxedStream from libp2p.stream_muxer.abc import IMuxedStream
from libp2p.stream_muxer.mplex.utils import decode_uvarint_from_stream, encode_uvarint from libp2p.utils import encode_delim, read_delim
from libp2p.typing import StreamReader
from .multiselect_communicator_interface import IMultiselectCommunicator from .multiselect_communicator_interface import IMultiselectCommunicator
def delim_encode(msg_str: str) -> bytes:
msg_bytes = msg_str.encode()
varint_len_msg = encode_uvarint(len(msg_bytes) + 1)
return varint_len_msg + msg_bytes + b"\n"
async def delim_read(reader: StreamReader, timeout: int = 10) -> str:
len_msg = await decode_uvarint_from_stream(reader, timeout)
msg_bytes = await reader.read(len_msg)
return msg_bytes.decode().rstrip()
class RawConnectionCommunicator(IMultiselectCommunicator): class RawConnectionCommunicator(IMultiselectCommunicator):
conn: IRawConnection conn: IRawConnection
@ -25,12 +12,12 @@ class RawConnectionCommunicator(IMultiselectCommunicator):
self.conn = conn self.conn = conn
async def write(self, msg_str: str) -> None: async def write(self, msg_str: str) -> None:
msg_bytes = delim_encode(msg_str) msg_bytes = encode_delim(msg_str.encode())
self.conn.writer.write(msg_bytes) await self.conn.write(msg_bytes)
await self.conn.writer.drain()
async def read(self) -> str: async def read(self) -> str:
return await delim_read(self.conn.reader) data = await read_delim(self.conn)
return data.decode()
class StreamCommunicator(IMultiselectCommunicator): class StreamCommunicator(IMultiselectCommunicator):
@ -40,8 +27,9 @@ class StreamCommunicator(IMultiselectCommunicator):
self.stream = stream self.stream = stream
async def write(self, msg_str: str) -> None: async def write(self, msg_str: str) -> None:
msg_bytes = delim_encode(msg_str) msg_bytes = encode_delim(msg_str.encode())
await self.stream.write(msg_bytes) await self.stream.write(msg_bytes)
async def read(self) -> str: async def read(self) -> str:
return await delim_read(self.stream) data = await read_delim(self.stream)
return data.decode()

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT! # Generated by the protocol buffer compiler. DO NOT EDIT!
# source: rpc.proto # source: libp2p/pubsub/pb/rpc.proto
import sys import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
@ -8,6 +7,7 @@ from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message from google.protobuf import message as _message
from google.protobuf import reflection as _reflection from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports) # @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default() _sym_db = _symbol_database.Default()
@ -16,11 +16,10 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor( DESCRIPTOR = _descriptor.FileDescriptor(
name='rpc.proto', name='libp2p/pubsub/pb/rpc.proto',
package='pubsub.pb', package='pubsub.pb',
syntax='proto2', syntax='proto2',
serialized_options=None, serialized_pb=_b('\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
serialized_pb=_b('\n\trpc.proto\x12\tpubsub.pb\"\xb4\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xb0\x01\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x1f\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02')
) )
@ -33,21 +32,21 @@ _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE = _descriptor.EnumDescriptor(
values=[ values=[
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='NONE', index=0, number=0, name='NONE', index=0, number=0,
serialized_options=None, options=None,
type=None), type=None),
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='KEY', index=1, number=1, name='KEY', index=1, number=1,
serialized_options=None, options=None,
type=None), type=None),
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='WOT', index=2, number=2, name='WOT', index=2, number=2,
serialized_options=None, options=None,
type=None), type=None),
], ],
containing_type=None, containing_type=None,
serialized_options=None, options=None,
serialized_start=868, serialized_start=885,
serialized_end=906, serialized_end=923,
) )
_sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE) _sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE)
@ -59,21 +58,21 @@ _TOPICDESCRIPTOR_ENCOPTS_ENCMODE = _descriptor.EnumDescriptor(
values=[ values=[
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='NONE', index=0, number=0, name='NONE', index=0, number=0,
serialized_options=None, options=None,
type=None), type=None),
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='SHAREDKEY', index=1, number=1, name='SHAREDKEY', index=1, number=1,
serialized_options=None, options=None,
type=None), type=None),
_descriptor.EnumValueDescriptor( _descriptor.EnumValueDescriptor(
name='WOT', index=2, number=2, name='WOT', index=2, number=2,
serialized_options=None, options=None,
type=None), type=None),
], ],
containing_type=None, containing_type=None,
serialized_options=None, options=None,
serialized_start=997, serialized_start=1014,
serialized_end=1040, serialized_end=1057,
) )
_sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_ENCOPTS_ENCMODE) _sym_db.RegisterEnumDescriptor(_TOPICDESCRIPTOR_ENCOPTS_ENCMODE)
@ -91,28 +90,28 @@ _RPC_SUBOPTS = _descriptor.Descriptor(
has_default_value=False, default_value=False, has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='topicid', full_name='pubsub.pb.RPC.SubOpts.topicid', index=1, name='topicid', full_name='pubsub.pb.RPC.SubOpts.topicid', index=1,
number=2, type=9, cpp_type=9, label=1, number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'), has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=160, serialized_start=177,
serialized_end=205, serialized_end=222,
) )
_RPC = _descriptor.Descriptor( _RPC = _descriptor.Descriptor(
@ -128,35 +127,35 @@ _RPC = _descriptor.Descriptor(
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='publish', full_name='pubsub.pb.RPC.publish', index=1, name='publish', full_name='pubsub.pb.RPC.publish', index=1,
number=2, type=11, cpp_type=10, label=3, number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='control', full_name='pubsub.pb.RPC.control', index=2, name='control', full_name='pubsub.pb.RPC.control', index=2,
number=3, type=11, cpp_type=10, label=1, number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None, has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[_RPC_SUBOPTS, ], nested_types=[_RPC_SUBOPTS, ],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=25, serialized_start=42,
serialized_end=205, serialized_end=222,
) )
@ -173,56 +172,56 @@ _MESSAGE = _descriptor.Descriptor(
has_default_value=False, default_value=_b(""), has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='data', full_name='pubsub.pb.Message.data', index=1, name='data', full_name='pubsub.pb.Message.data', index=1,
number=2, type=12, cpp_type=9, label=1, number=2, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""), has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='seqno', full_name='pubsub.pb.Message.seqno', index=2, name='seqno', full_name='pubsub.pb.Message.seqno', index=2,
number=3, type=12, cpp_type=9, label=1, number=3, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""), has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='topicIDs', full_name='pubsub.pb.Message.topicIDs', index=3, name='topicIDs', full_name='pubsub.pb.Message.topicIDs', index=3,
number=4, type=9, cpp_type=9, label=3, number=4, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='signature', full_name='pubsub.pb.Message.signature', index=4, name='signature', full_name='pubsub.pb.Message.signature', index=4,
number=5, type=12, cpp_type=9, label=1, number=5, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""), has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='key', full_name='pubsub.pb.Message.key', index=5, name='key', full_name='pubsub.pb.Message.key', index=5,
number=6, type=12, cpp_type=9, label=1, number=6, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""), has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=207, serialized_start=224,
serialized_end=312, serialized_end=329,
) )
@ -239,42 +238,42 @@ _CONTROLMESSAGE = _descriptor.Descriptor(
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='iwant', full_name='pubsub.pb.ControlMessage.iwant', index=1, name='iwant', full_name='pubsub.pb.ControlMessage.iwant', index=1,
number=2, type=11, cpp_type=10, label=3, number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='graft', full_name='pubsub.pb.ControlMessage.graft', index=2, name='graft', full_name='pubsub.pb.ControlMessage.graft', index=2,
number=3, type=11, cpp_type=10, label=3, number=3, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='prune', full_name='pubsub.pb.ControlMessage.prune', index=3, name='prune', full_name='pubsub.pb.ControlMessage.prune', index=3,
number=4, type=11, cpp_type=10, label=3, number=4, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=315, serialized_start=332,
serialized_end=491, serialized_end=508,
) )
@ -291,28 +290,28 @@ _CONTROLIHAVE = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'), has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='messageIDs', full_name='pubsub.pb.ControlIHave.messageIDs', index=1, name='messageIDs', full_name='pubsub.pb.ControlIHave.messageIDs', index=1,
number=2, type=9, cpp_type=9, label=3, number=2, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=493, serialized_start=510,
serialized_end=544, serialized_end=561,
) )
@ -329,21 +328,21 @@ _CONTROLIWANT = _descriptor.Descriptor(
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=546, serialized_start=563,
serialized_end=580, serialized_end=597,
) )
@ -360,21 +359,21 @@ _CONTROLGRAFT = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'), has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=582, serialized_start=599,
serialized_end=613, serialized_end=630,
) )
@ -391,21 +390,21 @@ _CONTROLPRUNE = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'), has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[], nested_types=[],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=615, serialized_start=632,
serialized_end=646, serialized_end=663,
) )
@ -422,14 +421,14 @@ _TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor(
has_default_value=False, default_value=0, has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='keys', full_name='pubsub.pb.TopicDescriptor.AuthOpts.keys', index=1, name='keys', full_name='pubsub.pb.TopicDescriptor.AuthOpts.keys', index=1,
number=2, type=12, cpp_type=9, label=3, number=2, type=12, cpp_type=9, label=3,
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
@ -437,14 +436,14 @@ _TOPICDESCRIPTOR_AUTHOPTS = _descriptor.Descriptor(
enum_types=[ enum_types=[
_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE, _TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE,
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=782, serialized_start=799,
serialized_end=906, serialized_end=923,
) )
_TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor( _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor(
@ -460,14 +459,14 @@ _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor(
has_default_value=False, default_value=0, has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='keyHashes', full_name='pubsub.pb.TopicDescriptor.EncOpts.keyHashes', index=1, name='keyHashes', full_name='pubsub.pb.TopicDescriptor.EncOpts.keyHashes', index=1,
number=2, type=12, cpp_type=9, label=3, number=2, type=12, cpp_type=9, label=3,
has_default_value=False, default_value=[], has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
@ -475,14 +474,14 @@ _TOPICDESCRIPTOR_ENCOPTS = _descriptor.Descriptor(
enum_types=[ enum_types=[
_TOPICDESCRIPTOR_ENCOPTS_ENCMODE, _TOPICDESCRIPTOR_ENCOPTS_ENCMODE,
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=909, serialized_start=926,
serialized_end=1040, serialized_end=1057,
) )
_TOPICDESCRIPTOR = _descriptor.Descriptor( _TOPICDESCRIPTOR = _descriptor.Descriptor(
@ -498,35 +497,35 @@ _TOPICDESCRIPTOR = _descriptor.Descriptor(
has_default_value=False, default_value=_b("").decode('utf-8'), has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='auth', full_name='pubsub.pb.TopicDescriptor.auth', index=1, name='auth', full_name='pubsub.pb.TopicDescriptor.auth', index=1,
number=2, type=11, cpp_type=10, label=1, number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None, has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='enc', full_name='pubsub.pb.TopicDescriptor.enc', index=2, name='enc', full_name='pubsub.pb.TopicDescriptor.enc', index=2,
number=3, type=11, cpp_type=10, label=1, number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None, has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR), options=None, file=DESCRIPTOR),
], ],
extensions=[ extensions=[
], ],
nested_types=[_TOPICDESCRIPTOR_AUTHOPTS, _TOPICDESCRIPTOR_ENCOPTS, ], nested_types=[_TOPICDESCRIPTOR_AUTHOPTS, _TOPICDESCRIPTOR_ENCOPTS, ],
enum_types=[ enum_types=[
], ],
serialized_options=None, options=None,
is_extendable=False, is_extendable=False,
syntax='proto2', syntax='proto2',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
], ],
serialized_start=649, serialized_start=666,
serialized_end=1040, serialized_end=1057,
) )
_RPC_SUBOPTS.containing_type = _RPC _RPC_SUBOPTS.containing_type = _RPC
@ -559,12 +558,12 @@ RPC = _reflection.GeneratedProtocolMessageType('RPC', (_message.Message,), dict(
SubOpts = _reflection.GeneratedProtocolMessageType('SubOpts', (_message.Message,), dict( SubOpts = _reflection.GeneratedProtocolMessageType('SubOpts', (_message.Message,), dict(
DESCRIPTOR = _RPC_SUBOPTS, DESCRIPTOR = _RPC_SUBOPTS,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.RPC.SubOpts) # @@protoc_insertion_point(class_scope:pubsub.pb.RPC.SubOpts)
)) ))
, ,
DESCRIPTOR = _RPC, DESCRIPTOR = _RPC,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.RPC) # @@protoc_insertion_point(class_scope:pubsub.pb.RPC)
)) ))
_sym_db.RegisterMessage(RPC) _sym_db.RegisterMessage(RPC)
@ -572,42 +571,42 @@ _sym_db.RegisterMessage(RPC.SubOpts)
Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict( Message = _reflection.GeneratedProtocolMessageType('Message', (_message.Message,), dict(
DESCRIPTOR = _MESSAGE, DESCRIPTOR = _MESSAGE,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.Message) # @@protoc_insertion_point(class_scope:pubsub.pb.Message)
)) ))
_sym_db.RegisterMessage(Message) _sym_db.RegisterMessage(Message)
ControlMessage = _reflection.GeneratedProtocolMessageType('ControlMessage', (_message.Message,), dict( ControlMessage = _reflection.GeneratedProtocolMessageType('ControlMessage', (_message.Message,), dict(
DESCRIPTOR = _CONTROLMESSAGE, DESCRIPTOR = _CONTROLMESSAGE,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlMessage) # @@protoc_insertion_point(class_scope:pubsub.pb.ControlMessage)
)) ))
_sym_db.RegisterMessage(ControlMessage) _sym_db.RegisterMessage(ControlMessage)
ControlIHave = _reflection.GeneratedProtocolMessageType('ControlIHave', (_message.Message,), dict( ControlIHave = _reflection.GeneratedProtocolMessageType('ControlIHave', (_message.Message,), dict(
DESCRIPTOR = _CONTROLIHAVE, DESCRIPTOR = _CONTROLIHAVE,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlIHave) # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIHave)
)) ))
_sym_db.RegisterMessage(ControlIHave) _sym_db.RegisterMessage(ControlIHave)
ControlIWant = _reflection.GeneratedProtocolMessageType('ControlIWant', (_message.Message,), dict( ControlIWant = _reflection.GeneratedProtocolMessageType('ControlIWant', (_message.Message,), dict(
DESCRIPTOR = _CONTROLIWANT, DESCRIPTOR = _CONTROLIWANT,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlIWant) # @@protoc_insertion_point(class_scope:pubsub.pb.ControlIWant)
)) ))
_sym_db.RegisterMessage(ControlIWant) _sym_db.RegisterMessage(ControlIWant)
ControlGraft = _reflection.GeneratedProtocolMessageType('ControlGraft', (_message.Message,), dict( ControlGraft = _reflection.GeneratedProtocolMessageType('ControlGraft', (_message.Message,), dict(
DESCRIPTOR = _CONTROLGRAFT, DESCRIPTOR = _CONTROLGRAFT,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlGraft) # @@protoc_insertion_point(class_scope:pubsub.pb.ControlGraft)
)) ))
_sym_db.RegisterMessage(ControlGraft) _sym_db.RegisterMessage(ControlGraft)
ControlPrune = _reflection.GeneratedProtocolMessageType('ControlPrune', (_message.Message,), dict( ControlPrune = _reflection.GeneratedProtocolMessageType('ControlPrune', (_message.Message,), dict(
DESCRIPTOR = _CONTROLPRUNE, DESCRIPTOR = _CONTROLPRUNE,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.ControlPrune) # @@protoc_insertion_point(class_scope:pubsub.pb.ControlPrune)
)) ))
_sym_db.RegisterMessage(ControlPrune) _sym_db.RegisterMessage(ControlPrune)
@ -616,19 +615,19 @@ TopicDescriptor = _reflection.GeneratedProtocolMessageType('TopicDescriptor', (_
AuthOpts = _reflection.GeneratedProtocolMessageType('AuthOpts', (_message.Message,), dict( AuthOpts = _reflection.GeneratedProtocolMessageType('AuthOpts', (_message.Message,), dict(
DESCRIPTOR = _TOPICDESCRIPTOR_AUTHOPTS, DESCRIPTOR = _TOPICDESCRIPTOR_AUTHOPTS,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.AuthOpts) # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.AuthOpts)
)) ))
, ,
EncOpts = _reflection.GeneratedProtocolMessageType('EncOpts', (_message.Message,), dict( EncOpts = _reflection.GeneratedProtocolMessageType('EncOpts', (_message.Message,), dict(
DESCRIPTOR = _TOPICDESCRIPTOR_ENCOPTS, DESCRIPTOR = _TOPICDESCRIPTOR_ENCOPTS,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.EncOpts) # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor.EncOpts)
)) ))
, ,
DESCRIPTOR = _TOPICDESCRIPTOR, DESCRIPTOR = _TOPICDESCRIPTOR,
__module__ = 'rpc_pb2' __module__ = 'libp2p.pubsub.pb.rpc_pb2'
# @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor) # @@protoc_insertion_point(class_scope:pubsub.pb.TopicDescriptor)
)) ))
_sym_db.RegisterMessage(TopicDescriptor) _sym_db.RegisterMessage(TopicDescriptor)

View File

@ -7,12 +7,18 @@ from libp2p.security.base_transport import BaseSecureTransport
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
class BaseSession(ISecureConn, IRawConnection): class BaseSession(ISecureConn):
""" """
``BaseSession`` is not fully instantiated from its abstract classes as it ``BaseSession`` is not fully instantiated from its abstract classes as it
is only meant to be used in clases that derive from it. is only meant to be used in clases that derive from it.
""" """
local_peer: ID
local_private_key: PrivateKey
conn: IRawConnection
remote_peer_id: ID
remote_permanent_pubkey: PublicKey
def __init__( def __init__(
self, transport: BaseSecureTransport, conn: IRawConnection, peer_id: ID self, transport: BaseSecureTransport, conn: IRawConnection, peer_id: ID
) -> None: ) -> None:
@ -23,8 +29,6 @@ class BaseSession(ISecureConn, IRawConnection):
self.remote_permanent_pubkey = None self.remote_permanent_pubkey = None
self.initiator = self.conn.initiator self.initiator = self.conn.initiator
self.writer = self.conn.writer
self.reader = self.conn.reader
# TODO clean up how this is passed around? # TODO clean up how this is passed around?
def next_stream_id(self) -> int: def next_stream_id(self) -> int:
@ -33,8 +37,8 @@ class BaseSession(ISecureConn, IRawConnection):
async def write(self, data: bytes) -> None: async def write(self, data: bytes) -> None:
await self.conn.write(data) await self.conn.write(data)
async def read(self) -> bytes: async def read(self, n: int = -1) -> bytes:
return await self.conn.read() return await self.conn.read(n)
def close(self) -> None: def close(self) -> None:
self.conn.close() self.conn.close()

View File

View File

@ -0,0 +1,10 @@
syntax = "proto2";
package plaintext.pb;
import "libp2p/crypto/pb/crypto.proto";
message Exchange {
optional bytes id = 1;
optional crypto.pb.PublicKey pubkey = 2;
}

View File

@ -0,0 +1,79 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: libp2p/security/insecure/pb/plaintext.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from libp2p.crypto.pb import crypto_pb2 as libp2p_dot_crypto_dot_pb_dot_crypto__pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='libp2p/security/insecure/pb/plaintext.proto',
package='plaintext.pb',
syntax='proto2',
serialized_pb=_b('\n+libp2p/security/insecure/pb/plaintext.proto\x12\x0cplaintext.pb\x1a\x1dlibp2p/crypto/pb/crypto.proto\"<\n\x08\x45xchange\x12\n\n\x02id\x18\x01 \x01(\x0c\x12$\n\x06pubkey\x18\x02 \x01(\x0b\x32\x14.crypto.pb.PublicKey')
,
dependencies=[libp2p_dot_crypto_dot_pb_dot_crypto__pb2.DESCRIPTOR,])
_EXCHANGE = _descriptor.Descriptor(
name='Exchange',
full_name='plaintext.pb.Exchange',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='plaintext.pb.Exchange.id', index=0,
number=1, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='pubkey', full_name='plaintext.pb.Exchange.pubkey', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=92,
serialized_end=152,
)
_EXCHANGE.fields_by_name['pubkey'].message_type = libp2p_dot_crypto_dot_pb_dot_crypto__pb2._PUBLICKEY
DESCRIPTOR.message_types_by_name['Exchange'] = _EXCHANGE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Exchange = _reflection.GeneratedProtocolMessageType('Exchange', (_message.Message,), dict(
DESCRIPTOR = _EXCHANGE,
__module__ = 'libp2p.security.insecure.pb.plaintext_pb2'
# @@protoc_insertion_point(class_scope:plaintext.pb.Exchange)
))
_sym_db.RegisterMessage(Exchange)
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,45 @@
# @generated by generate_proto_mypy_stubs.py. Do not edit!
import sys
from google.protobuf.descriptor import (
Descriptor as google___protobuf___descriptor___Descriptor,
)
from google.protobuf.message import (
Message as google___protobuf___message___Message,
)
from libp2p.crypto.pb.crypto_pb2 import (
PublicKey as libp2p___crypto___pb___crypto_pb2___PublicKey,
)
from typing import (
Optional as typing___Optional,
)
from typing_extensions import (
Literal as typing_extensions___Literal,
)
class Exchange(google___protobuf___message___Message):
DESCRIPTOR: google___protobuf___descriptor___Descriptor = ...
id = ... # type: bytes
@property
def pubkey(self) -> libp2p___crypto___pb___crypto_pb2___PublicKey: ...
def __init__(self,
*,
id : typing___Optional[bytes] = None,
pubkey : typing___Optional[libp2p___crypto___pb___crypto_pb2___PublicKey] = None,
) -> None: ...
@classmethod
def FromString(cls, s: bytes) -> Exchange: ...
def MergeFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
def CopyFrom(self, other_msg: google___protobuf___message___Message) -> None: ...
if sys.version_info >= (3,):
def HasField(self, field_name: typing_extensions___Literal[u"id",u"pubkey"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"id",u"pubkey"]) -> None: ...
else:
def HasField(self, field_name: typing_extensions___Literal[u"id",b"id",u"pubkey",b"pubkey"]) -> bool: ...
def ClearField(self, field_name: typing_extensions___Literal[u"id",b"id",u"pubkey",b"pubkey"]) -> None: ...

View File

@ -1,15 +1,67 @@
from libp2p.crypto.keys import PublicKey
from libp2p.crypto.pb import crypto_pb2
from libp2p.crypto.utils import pubkey_from_protobuf
from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection_interface import IRawConnection
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.security.base_session import BaseSession from libp2p.security.base_session import BaseSession
from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.base_transport import BaseSecureTransport
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.transport.exceptions import HandshakeFailure
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed
PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/1.0.0") from .pb import plaintext_pb2
# Reference: https://github.com/libp2p/go-libp2p-core/blob/master/sec/insecure/insecure.go
PLAINTEXT_PROTOCOL_ID = TProtocol("/plaintext/2.0.0")
class InsecureSession(BaseSession): class InsecureSession(BaseSession):
pass async def run_handshake(self) -> None:
msg = make_exchange_message(self.local_private_key.get_public_key())
msg_bytes = msg.SerializeToString()
encoded_msg_bytes = encode_fixedint_prefixed(msg_bytes)
await self.write(encoded_msg_bytes)
remote_msg_bytes = await read_fixedint_prefixed(self.conn)
remote_msg = plaintext_pb2.Exchange()
remote_msg.ParseFromString(remote_msg_bytes)
received_peer_id = ID(remote_msg.id)
# Verify if the receive `ID` matches the one we originally initialize the session.
# We only need to check it when we are the initiator, because only in that condition
# we possibly knows the `ID` of the remote.
if self.initiator and self.remote_peer_id != received_peer_id:
raise HandshakeFailure(
"remote peer sent unexpected peer ID. "
f"expected={self.remote_peer_id} received={received_peer_id}"
)
# Verify if the given `pubkey` matches the given `peer_id`
try:
received_pubkey = pubkey_from_protobuf(remote_msg.pubkey)
except ValueError:
raise HandshakeFailure(
f"unknown `key_type` of remote_msg.pubkey={remote_msg.pubkey}"
)
peer_id_from_received_pubkey = ID.from_pubkey(received_pubkey)
if peer_id_from_received_pubkey != received_peer_id:
raise HandshakeFailure(
"peer id and pubkey from the remote mismatch: "
f"received_peer_id={received_peer_id}, remote_pubkey={received_pubkey}, "
f"peer_id_from_received_pubkey={peer_id_from_received_pubkey}"
)
# Nothing is wrong. Store the `pubkey` and `peer_id` in the session.
self.remote_permanent_pubkey = received_pubkey
# Only need to set peer's id when we don't know it before,
# i.e. we are not the connection initiator.
if not self.initiator:
self.remote_peer_id = received_peer_id
# TODO: Store `pubkey` and `peer_id` to `PeerStore`
class InsecureTransport(BaseSecureTransport): class InsecureTransport(BaseSecureTransport):
@ -24,7 +76,9 @@ class InsecureTransport(BaseSecureTransport):
for an inbound connection (i.e. we are not the initiator) for an inbound connection (i.e. we are not the initiator)
:return: secure connection object (that implements secure_conn_interface) :return: secure connection object (that implements secure_conn_interface)
""" """
return InsecureSession(self, conn, ID(b"")) session = InsecureSession(self, conn, ID(b""))
await session.run_handshake()
return session
async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn: async def secure_outbound(self, conn: IRawConnection, peer_id: ID) -> ISecureConn:
""" """
@ -32,4 +86,14 @@ class InsecureTransport(BaseSecureTransport):
for an inbound connection (i.e. we are the initiator) for an inbound connection (i.e. we are the initiator)
:return: secure connection object (that implements secure_conn_interface) :return: secure connection object (that implements secure_conn_interface)
""" """
return InsecureSession(self, conn, peer_id) session = InsecureSession(self, conn, peer_id)
await session.run_handshake()
return session
def make_exchange_message(pubkey: PublicKey) -> plaintext_pb2.Exchange:
pubkey_pb = crypto_pb2.PublicKey(
key_type=pubkey.get_type().value, data=pubkey.to_bytes()
)
id_bytes = ID.from_pubkey(pubkey).to_bytes()
return plaintext_pb2.Exchange(id=id_bytes, pubkey=pubkey_pb)

View File

@ -87,10 +87,6 @@ class SecurityMultistream(ABC):
:param initiator: true if we are the initiator, false otherwise :param initiator: true if we are the initiator, false otherwise
:return: selected secure transport :return: selected secure transport
""" """
# TODO: Is conn acceptable to multiselect/multiselect_client
# instead of stream? In go repo, they pass in a raw conn
# (https://raw.githubusercontent.com/libp2p/go-conn-security-multistream/master/ssms.go)
protocol: TProtocol protocol: TProtocol
communicator = RawConnectionCommunicator(conn) communicator = RawConnectionCommunicator(conn)
if initiator: if initiator:

View File

@ -6,6 +6,8 @@ from libp2p.peer.id import ID
from libp2p.security.base_transport import BaseSecureTransport from libp2p.security.base_transport import BaseSecureTransport
from libp2p.security.insecure.transport import InsecureSession from libp2p.security.insecure.transport import InsecureSession
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.transport.exceptions import SecurityUpgradeFailure
from libp2p.utils import encode_fixedint_prefixed, read_fixedint_prefixed
class SimpleSecurityTransport(BaseSecureTransport): class SimpleSecurityTransport(BaseSecureTransport):
@ -21,15 +23,19 @@ class SimpleSecurityTransport(BaseSecureTransport):
for an inbound connection (i.e. we are not the initiator) for an inbound connection (i.e. we are not the initiator)
:return: secure connection object (that implements secure_conn_interface) :return: secure connection object (that implements secure_conn_interface)
""" """
await conn.write(self.key_phrase.encode()) await conn.write(encode_fixedint_prefixed(self.key_phrase.encode()))
incoming = (await conn.read()).decode() incoming = (await read_fixedint_prefixed(conn)).decode()
if incoming != self.key_phrase: if incoming != self.key_phrase:
raise Exception( raise SecurityUpgradeFailure(
"Key phrase differed between nodes. Expected " + self.key_phrase "Key phrase differed between nodes. Expected " + self.key_phrase
) )
session = InsecureSession(self, conn, ID(b"")) session = InsecureSession(self, conn, ID(b""))
# NOTE: Here we calls `run_handshake` for both sides to exchange their public keys and
# peer ids, otherwise tests fail. However, it seems pretty weird that
# `SimpleSecurityTransport` sends peer id through `Insecure`.
await session.run_handshake()
# NOTE: this is abusing the abstraction we have here # NOTE: this is abusing the abstraction we have here
# but this code may be deprecated soon and this exists # but this code may be deprecated soon and this exists
# mainly to satisfy a test that will go along w/ it # mainly to satisfy a test that will go along w/ it
@ -43,19 +49,23 @@ class SimpleSecurityTransport(BaseSecureTransport):
for an inbound connection (i.e. we are the initiator) for an inbound connection (i.e. we are the initiator)
:return: secure connection object (that implements secure_conn_interface) :return: secure connection object (that implements secure_conn_interface)
""" """
await conn.write(self.key_phrase.encode()) await conn.write(encode_fixedint_prefixed(self.key_phrase.encode()))
incoming = (await conn.read()).decode() incoming = (await read_fixedint_prefixed(conn)).decode()
# Force context switch, as this security transport is built for testing locally # Force context switch, as this security transport is built for testing locally
# in a single event loop # in a single event loop
await asyncio.sleep(0) await asyncio.sleep(0)
if incoming != self.key_phrase: if incoming != self.key_phrase:
raise Exception( raise SecurityUpgradeFailure(
"Key phrase differed between nodes. Expected " + self.key_phrase "Key phrase differed between nodes. Expected " + self.key_phrase
) )
session = InsecureSession(self, conn, peer_id) session = InsecureSession(self, conn, peer_id)
# NOTE: Here we calls `run_handshake` for both sides to exchange their public keys and
# peer ids, otherwise tests fail. However, it seems pretty weird that
# `SimpleSecurityTransport` sends peer id through `Insecure`.
await session.run_handshake()
# NOTE: this is abusing the abstraction we have here # NOTE: this is abusing the abstraction we have here
# but this code may be deprecated soon and this exists # but this code may be deprecated soon and this exists
# mainly to satisfy a test that will go along w/ it # mainly to satisfy a test that will go along w/ it

View File

@ -17,7 +17,6 @@ class IMuxedConn(ABC):
reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go reference: https://github.com/libp2p/go-stream-muxer/blob/master/muxer.go
""" """
initiator: bool
peer_id: ID peer_id: ID
@abstractmethod @abstractmethod
@ -35,6 +34,11 @@ class IMuxedConn(ABC):
:param peer_id: peer_id of peer the connection is to :param peer_id: peer_id of peer the connection is to
""" """
@property
@abstractmethod
def initiator(self) -> bool:
pass
@abstractmethod @abstractmethod
def close(self) -> None: def close(self) -> None:
""" """
@ -62,6 +66,7 @@ class IMuxedConn(ABC):
Read a message from `stream_id`'s buffer, non-blockingly. Read a message from `stream_id`'s buffer, non-blockingly.
""" """
# FIXME: Remove multiaddr from being passed into muxed_conn
@abstractmethod @abstractmethod
async def open_stream( async def open_stream(
self, protocol_id: str, multi_addr: Multiaddr self, protocol_id: str, multi_addr: Multiaddr

View File

@ -9,11 +9,11 @@ from libp2p.peer.id import ID
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from libp2p.utils import decode_uvarint_from_stream, encode_uvarint
from .constants import HeaderTags from .constants import HeaderTags
from .exceptions import StreamNotFound from .exceptions import StreamNotFound
from .mplex_stream import MplexStream from .mplex_stream import MplexStream
from .utils import decode_uvarint_from_stream, encode_uvarint
MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0") MPLEX_PROTOCOL_ID = TProtocol("/mplex/6.7.0")
@ -25,7 +25,6 @@ class Mplex(IMuxedConn):
secured_conn: ISecureConn secured_conn: ISecureConn
raw_conn: IRawConnection raw_conn: IRawConnection
initiator: bool
peer_id: ID peer_id: ID
# TODO: `dataIn` in go implementation. Should be size of 8. # TODO: `dataIn` in go implementation. Should be size of 8.
# TODO: Also, `dataIn` is closed indicating EOF in Go. We don't have similar strategies # TODO: Also, `dataIn` is closed indicating EOF in Go. We don't have similar strategies
@ -41,13 +40,12 @@ class Mplex(IMuxedConn):
) -> None: ) -> None:
""" """
create a new muxed connection create a new muxed connection
:param conn: an instance of raw connection :param secured_conn: an instance of ``ISecureConn``
:param generic_protocol_handler: generic protocol handler :param generic_protocol_handler: generic protocol handler
for new muxed streams for new muxed streams
:param peer_id: peer_id of peer the connection is to :param peer_id: peer_id of peer the connection is to
""" """
self.conn = secured_conn self.conn = secured_conn
self.initiator = secured_conn.initiator
# Store generic protocol handler # Store generic protocol handler
self.generic_protocol_handler = generic_protocol_handler self.generic_protocol_handler = generic_protocol_handler
@ -63,6 +61,10 @@ class Mplex(IMuxedConn):
# Kick off reading # Kick off reading
asyncio.ensure_future(self.handle_incoming()) asyncio.ensure_future(self.handle_incoming())
@property
def initiator(self) -> bool:
return self.conn.initiator
def close(self) -> None: def close(self) -> None:
""" """
close the stream muxer and underlying raw connection close the stream muxer and underlying raw connection
@ -98,6 +100,7 @@ class Mplex(IMuxedConn):
return None return None
return await self.buffers[stream_id].get() return await self.buffers[stream_id].get()
# FIXME: Remove multiaddr from being passed into muxed_conn
async def open_stream( async def open_stream(
self, protocol_id: str, multi_addr: Multiaddr self, protocol_id: str, multi_addr: Multiaddr
) -> IMuxedStream: ) -> IMuxedStream:
@ -108,7 +111,7 @@ class Mplex(IMuxedConn):
:return: a new stream :return: a new stream
""" """
stream_id = self.conn.next_stream_id() stream_id = self.conn.next_stream_id()
stream = MplexStream(stream_id, multi_addr, self) stream = MplexStream(stream_id, True, self)
self.buffers[stream_id] = asyncio.Queue() self.buffers[stream_id] = asyncio.Queue()
await self.send_message(HeaderTags.NewStream, None, stream_id) await self.send_message(HeaderTags.NewStream, None, stream_id)
return stream return stream
@ -147,8 +150,7 @@ class Mplex(IMuxedConn):
:param _bytes: byte array to write :param _bytes: byte array to write
:return: length written :return: length written
""" """
self.conn.writer.write(_bytes) await self.conn.write(_bytes)
await self.conn.writer.drain()
return len(_bytes) return len(_bytes)
async def handle_incoming(self) -> None: async def handle_incoming(self) -> None:
@ -186,11 +188,9 @@ class Mplex(IMuxedConn):
# loop in handle_incoming # loop in handle_incoming
timeout = 0.1 timeout = 0.1
try: try:
header = await decode_uvarint_from_stream(self.conn.reader, timeout) header = await decode_uvarint_from_stream(self.conn, timeout)
length = await decode_uvarint_from_stream(self.conn.reader, timeout) length = await decode_uvarint_from_stream(self.conn, timeout)
message = await asyncio.wait_for( message = await asyncio.wait_for(self.conn.read(length), timeout=timeout)
self.conn.reader.read(length), timeout=timeout
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return None, None, None return None, None, None

View File

@ -1,47 +0,0 @@
import asyncio
import struct
from typing import Tuple
from libp2p.typing import StreamReader
def encode_uvarint(number: int) -> bytes:
"""Pack `number` into varint bytes"""
buf = b""
while True:
towrite = number & 0x7F
number >>= 7
if number:
buf += bytes((towrite | 0x80,))
else:
buf += bytes((towrite,))
break
return buf
def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]:
shift = 0
result = 0
while True:
i = buff[index]
result |= (i & 0x7F) << shift
shift += 7
if not i & 0x80:
break
index += 1
return result, index + 1
async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> int:
shift = 0
result = 0
while True:
byte = await asyncio.wait_for(reader.read(1), timeout=timeout)
i = struct.unpack(">H", b"\x00" + byte)[0]
result |= (i & 0x7F) << shift
shift += 7
if not i & 0x80:
break
return result

View File

@ -0,0 +1,18 @@
from libp2p.exceptions import BaseLibp2pError
# TODO: Add `BaseLibp2pError` and `UpgradeFailure` can inherit from it?
class UpgradeFailure(BaseLibp2pError):
pass
class SecurityUpgradeFailure(UpgradeFailure):
pass
class MuxerUpgradeFailure(UpgradeFailure):
pass
class HandshakeFailure(BaseLibp2pError):
pass

View File

@ -62,6 +62,7 @@ class TCPListener(IListener):
class TCP(ITransport): class TCP(ITransport):
# TODO: Remove `self_id`
async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection: async def dial(self, maddr: Multiaddr, self_id: ID) -> IRawConnection:
""" """
dial a transport to peer listening on multiaddr dial a transport to peer listening on multiaddr
@ -74,18 +75,6 @@ class TCP(ITransport):
reader, writer = await asyncio.open_connection(host, int(port)) reader, writer = await asyncio.open_connection(host, int(port))
# TODO: Change this `sending peer id` process to `/plaintext/2.0.0`
# First: send our peer ID so receiver knows it
writer.write(self_id.to_base58().encode())
await writer.drain()
# Await ack for peer id
expected_ack_str = "received peer id"
ack = (await reader.read(len(expected_ack_str))).decode()
if ack != expected_ack_str:
raise Exception("Receiver did not receive peer id")
return RawConnection(host, port, reader, writer, True) return RawConnection(host, port, reader, writer, True)
def create_listener(self, handler_function: THandler) -> TCPListener: def create_listener(self, handler_function: THandler) -> TCPListener:

View File

@ -3,11 +3,17 @@ from typing import Mapping
from libp2p.network.connection.raw_connection_interface import IRawConnection from libp2p.network.connection.raw_connection_interface import IRawConnection
from libp2p.network.typing import GenericProtocolHandlerFn from libp2p.network.typing import GenericProtocolHandlerFn
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.protocol_muxer.exceptions import MultiselectClientError, MultiselectError
from libp2p.security.secure_conn_interface import ISecureConn from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.security.secure_transport_interface import ISecureTransport from libp2p.security.secure_transport_interface import ISecureTransport
from libp2p.security.security_multistream import SecurityMultistream from libp2p.security.security_multistream import SecurityMultistream
from libp2p.stream_muxer.abc import IMuxedConn from libp2p.stream_muxer.abc import IMuxedConn
from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream from libp2p.stream_muxer.muxer_multistream import MuxerClassType, MuxerMultistream
from libp2p.transport.exceptions import (
HandshakeFailure,
MuxerUpgradeFailure,
SecurityUpgradeFailure,
)
from libp2p.typing import TProtocol from libp2p.typing import TProtocol
from .listener_interface import IListener from .listener_interface import IListener
@ -39,10 +45,20 @@ class TransportUpgrader:
""" """
Upgrade conn to a secured connection Upgrade conn to a secured connection
""" """
if initiator: try:
return await self.security_multistream.secure_outbound(raw_conn, peer_id) if initiator:
return await self.security_multistream.secure_outbound(
return await self.security_multistream.secure_inbound(raw_conn) raw_conn, peer_id
)
return await self.security_multistream.secure_inbound(raw_conn)
except (MultiselectError, MultiselectClientError) as error:
raise SecurityUpgradeFailure(
"failed to negotiate the secure protocol"
) from error
except HandshakeFailure as error:
raise SecurityUpgradeFailure(
"handshake failed when upgrading to secure connection"
) from error
async def upgrade_connection( async def upgrade_connection(
self, self,
@ -53,6 +69,11 @@ class TransportUpgrader:
""" """
Upgrade secured connection to a muxed connection Upgrade secured connection to a muxed connection
""" """
return await self.muxer_multistream.new_conn( try:
conn, generic_protocol_handler, peer_id return await self.muxer_multistream.new_conn(
) conn, generic_protocol_handler, peer_id
)
except (MultiselectError, MultiselectClientError) as error:
raise MuxerUpgradeFailure(
"failed to negotiate the multiplexer protocol"
) from error

View File

@ -1,6 +1,7 @@
import asyncio
from typing import TYPE_CHECKING, Awaitable, Callable, NewType, Union from typing import TYPE_CHECKING, Awaitable, Callable, NewType, Union
from libp2p.network.connection.raw_connection_interface import IRawConnection
if TYPE_CHECKING: if TYPE_CHECKING:
from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401 from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401
from libp2p.stream_muxer.abc import IMuxedStream # noqa: F401 from libp2p.stream_muxer.abc import IMuxedStream # noqa: F401
@ -9,4 +10,4 @@ TProtocol = NewType("TProtocol", str)
StreamHandlerFn = Callable[["INetStream"], Awaitable[None]] StreamHandlerFn = Callable[["INetStream"], Awaitable[None]]
StreamReader = Union["IMuxedStream", asyncio.StreamReader] StreamReader = Union["IMuxedStream", IRawConnection]

99
libp2p/utils.py Normal file
View File

@ -0,0 +1,99 @@
import asyncio
import struct
from typing import Tuple
from libp2p.typing import StreamReader
def encode_uvarint(number: int) -> bytes:
"""Pack `number` into varint bytes"""
buf = b""
while True:
towrite = number & 0x7F
number >>= 7
if number:
buf += bytes((towrite | 0x80,))
else:
buf += bytes((towrite,))
break
return buf
def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]:
shift = 0
result = 0
while True:
i = buff[index]
result |= (i & 0x7F) << shift
shift += 7
if not i & 0x80:
break
index += 1
return result, index + 1
async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> int:
shift = 0
result = 0
while True:
byte = await asyncio.wait_for(reader.read(1), timeout=timeout)
i = struct.unpack(">H", b"\x00" + byte)[0]
result |= (i & 0x7F) << shift
shift += 7
if not i & 0x80:
break
return result
# Varint-prefixed read/write
def encode_varint_prefixed(msg_bytes: bytes) -> bytes:
varint_len = encode_uvarint(len(msg_bytes))
return varint_len + msg_bytes
async def read_varint_prefixed_bytes(reader: StreamReader) -> bytes:
len_msg = await decode_uvarint_from_stream(reader, None)
data = await reader.read(len_msg)
if len(data) != len_msg:
raise ValueError(
f"failed to read enough bytes: len_msg={len_msg}, data={data!r}"
)
return data
# Delimited read/write, used by multistream-select.
# Reference: https://github.com/gogo/protobuf/blob/07eab6a8298cf32fac45cceaac59424f98421bbc/io/varint.go#L109-L126 # noqa: E501
def encode_delim(msg: bytes) -> bytes:
delimited_msg = msg + b"\n"
return encode_varint_prefixed(delimited_msg)
async def read_delim(reader: StreamReader) -> bytes:
msg_bytes = await read_varint_prefixed_bytes(reader)
# TODO: Investigate if it is possible to have empty `msg_bytes`
if len(msg_bytes) != 0 and msg_bytes[-1:] != b"\n":
raise ValueError(f'msg_bytes is not delimited by b"\\n": msg_bytes={msg_bytes}')
return msg_bytes[:-1]
SIZE_LEN_BYTES = 4
# Fixed-prefixed read/write, used by "/plaintext/2.0.0".
# Reference: https://github.com/libp2p/go-msgio/blob/d5bbf59d3c4240266b1d2e5df9dc993454c42011/num.go#L11-L33 # noqa: E501 # noqa: E501
def encode_fixedint_prefixed(msg_bytes: bytes) -> bytes:
len_prefix = len(msg_bytes).to_bytes(SIZE_LEN_BYTES, "big")
return len_prefix + msg_bytes
async def read_fixedint_prefixed(reader: StreamReader) -> bytes:
len_bytes = await reader.read(SIZE_LEN_BYTES)
len_int = int.from_bytes(len_bytes, "big")
return await reader.read(len_int)

View File

@ -3,7 +3,7 @@ import asyncio
import pytest import pytest
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.protocol_muxer.multiselect_client import MultiselectClientError from libp2p.protocol_muxer.exceptions import MultiselectClientError
from tests.utils import cleanup, set_up_nodes_by_transport_opt from tests.utils import cleanup, set_up_nodes_by_transport_opt
PROTOCOL_ID = "/chat/1.0.0" PROTOCOL_ID = "/chat/1.0.0"

View File

@ -1,6 +1,6 @@
import pytest import pytest
from libp2p.protocol_muxer.multiselect_client import MultiselectClientError from libp2p.protocol_muxer.exceptions import MultiselectClientError
from tests.utils import cleanup, set_up_nodes_by_transport_opt from tests.utils import cleanup, set_up_nodes_by_transport_opt
# TODO: Add tests for multiple streams being opened on different # TODO: Add tests for multiple streams being opened on different

View File

@ -1,14 +1,13 @@
import asyncio import asyncio
import multiaddr
import pytest import pytest
from libp2p import new_node from libp2p import new_node
from libp2p.crypto.rsa import create_new_key_pair from libp2p.crypto.rsa import create_new_key_pair
from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.network.exceptions import SwarmException
from libp2p.protocol_muxer.multiselect_client import MultiselectClientError
from libp2p.security.insecure.transport import InsecureSession, InsecureTransport from libp2p.security.insecure.transport import InsecureSession, InsecureTransport
from libp2p.security.simple.transport import SimpleSecurityTransport from libp2p.security.simple.transport import SimpleSecurityTransport
from tests.configs import LISTEN_MADDR
from tests.utils import cleanup, connect from tests.utils import cleanup, connect
# TODO: Add tests for multiple streams being opened on different # TODO: Add tests for multiple streams being opened on different
@ -16,9 +15,7 @@ from tests.utils import cleanup, connect
def peer_id_for_node(node): def peer_id_for_node(node):
addr = node.get_addrs()[0] return node.get_id()
info = info_from_p2p_addr(addr)
return info.peer_id
initiator_key_pair = create_new_key_pair() initiator_key_pair = create_new_key_pair()
@ -35,14 +32,16 @@ async def perform_simple_test(
# TODO: implement -- note we need to introduce the notion of communicating over a raw connection # TODO: implement -- note we need to introduce the notion of communicating over a raw connection
# for testing, we do NOT want to communicate over a stream so we can't just create two nodes # for testing, we do NOT want to communicate over a stream so we can't just create two nodes
# and use their conn because our mplex will internally relay messages to a stream # and use their conn because our mplex will internally relay messages to a stream
sec_opt1 = transports_for_initiator
sec_opt2 = transports_for_noninitiator
node1 = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"], sec_opt=sec_opt1) node1 = await new_node(
node2 = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/0"], sec_opt=sec_opt2) key_pair=initiator_key_pair, sec_opt=transports_for_initiator
)
node2 = await new_node(
key_pair=noninitiator_key_pair, sec_opt=transports_for_noninitiator
)
await node1.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) await node1.get_network().listen(LISTEN_MADDR)
await node2.get_network().listen(multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0")) await node2.get_network().listen(LISTEN_MADDR)
await connect(node1, node2) await connect(node1, node2)
@ -162,7 +161,7 @@ async def test_multiple_security_none_the_same_fails():
def assertion_func(_): def assertion_func(_):
assert False assert False
with pytest.raises(MultiselectClientError): with pytest.raises(SwarmException):
await perform_simple_test( await perform_simple_test(
assertion_func, transports_for_initiator, transports_for_noninitiator assertion_func, transports_for_initiator, transports_for_noninitiator
) )