Merge pull request #291 from ralexstokes/add-ed25519-support

Adds support for verifying ed25519 signatures, for secio
This commit is contained in:
Alex Stokes 2019-09-24 10:21:01 -07:00 committed by GitHub
commit e6a0361e09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 124 additions and 22 deletions

70
libp2p/crypto/ed25519.py Normal file
View File

@ -0,0 +1,70 @@
from Crypto.Hash import SHA256
from nacl.exceptions import BadSignatureError
from nacl.public import PrivateKey as PrivateKeyImpl
from nacl.public import PublicKey as PublicKeyImpl
from nacl.signing import SigningKey, VerifyKey
import nacl.utils as utils
from libp2p.crypto.keys import KeyPair, KeyType, PrivateKey, PublicKey
class Ed25519PublicKey(PublicKey):
def __init__(self, impl: PublicKeyImpl) -> None:
self.impl = impl
def to_bytes(self) -> bytes:
return bytes(self.impl)
@classmethod
def from_bytes(cls, key_bytes: bytes) -> "Ed25519PublicKey":
return cls(PublicKeyImpl(key_bytes))
def get_type(self) -> KeyType:
return KeyType.Ed25519
def verify(self, data: bytes, signature: bytes) -> bool:
verify_key = VerifyKey(self.to_bytes())
h = SHA256.new(data)
try:
verify_key.verify(h, signature)
except BadSignatureError:
return False
return True
class Ed25519PrivateKey(PrivateKey):
def __init__(self, impl: PrivateKeyImpl) -> None:
self.impl = impl
@classmethod
def new(cls, seed: bytes = None) -> "Ed25519PrivateKey":
if not seed:
seed = utils.random()
private_key_impl = PrivateKeyImpl.from_seed(seed)
return cls(private_key_impl)
def to_bytes(self) -> bytes:
return bytes(self.impl)
@classmethod
def from_bytes(cls, data: bytes) -> "Ed25519PrivateKey":
impl = PrivateKeyImpl(data)
return cls(impl)
def get_type(self) -> KeyType:
return KeyType.Ed25519
def sign(self, data: bytes) -> bytes:
h = SHA256.new(data)
signing_key = SigningKey(self.to_bytes())
return signing_key.sign(h)
def get_public_key(self) -> PublicKey:
return Ed25519PublicKey(self.impl.public_key)
def create_new_key_pair(seed: bytes = None) -> KeyPair:
private_key = Ed25519PrivateKey.new(seed)
public_key = private_key.get_public_key()
return KeyPair(private_key, public_key)

View File

@ -0,0 +1,14 @@
from libp2p.exceptions import BaseLibp2pError
class CryptographyError(BaseLibp2pError):
pass
class MissingDeserializerError(CryptographyError):
"""
Raise if the requested deserialization routine is missing for
some type of cryptographic key.
"""
pass

View File

@ -1,3 +1,5 @@
from libp2p.crypto.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
from libp2p.crypto.exceptions import MissingDeserializerError
from libp2p.crypto.keys import KeyType, PrivateKey, PublicKey
from libp2p.crypto.rsa import RSAPublicKey
from libp2p.crypto.secp256k1 import Secp256k1PrivateKey, Secp256k1PublicKey
@ -5,20 +7,28 @@ from libp2p.crypto.secp256k1 import Secp256k1PrivateKey, Secp256k1PublicKey
key_type_to_public_key_deserializer = {
KeyType.Secp256k1.value: Secp256k1PublicKey.from_bytes,
KeyType.RSA.value: RSAPublicKey.from_bytes,
KeyType.Ed25519.value: Ed25519PublicKey.from_bytes,
}
key_type_to_private_key_deserializer = {
KeyType.Secp256k1.value: Secp256k1PrivateKey.from_bytes
KeyType.Secp256k1.value: Secp256k1PrivateKey.from_bytes,
KeyType.Ed25519.value: Ed25519PrivateKey.from_bytes,
}
def deserialize_public_key(data: bytes) -> PublicKey:
f = PublicKey.deserialize_from_protobuf(data)
try:
deserializer = key_type_to_public_key_deserializer[f.key_type]
except KeyError:
raise MissingDeserializerError({"key_type": f.key_type, "key": "public_key"})
return deserializer(f.data)
def deserialize_private_key(data: bytes) -> PrivateKey:
f = PrivateKey.deserialize_from_protobuf(data)
try:
deserializer = key_type_to_private_key_deserializer[f.key_type]
except KeyError:
raise MissingDeserializerError({"key_type": f.key_type, "key": "private_key"})
return deserializer(f.data)

View File

@ -1,17 +0,0 @@
from .keys import PublicKey
from .pb import crypto_pb2 as protobuf
from .rsa import RSAPublicKey
from .secp256k1 import Secp256k1PublicKey
def pubkey_from_protobuf(pubkey_pb: protobuf.PublicKey) -> PublicKey:
if pubkey_pb.key_type == protobuf.RSA:
return RSAPublicKey.from_bytes(pubkey_pb.data)
# TODO: Test against secp256k1 keys
elif pubkey_pb.key_type == protobuf.Secp256k1:
return Secp256k1PublicKey.from_bytes(pubkey_pb.data)
# TODO: Support `Ed25519` and `ECDSA` in the future?
else:
raise ValueError(
f"unsupported key_type={pubkey_pb.key_type}, data={pubkey_pb.data!r}"
)

View File

@ -2,7 +2,7 @@ from typing import Optional
from libp2p.crypto.keys import PrivateKey, PublicKey
from libp2p.crypto.pb import crypto_pb2
from libp2p.crypto.utils import pubkey_from_protobuf
from libp2p.crypto.serialization import deserialize_public_key
from libp2p.io.abc import ReadWriteCloser
from libp2p.network.connection.exceptions import RawConnError
from libp2p.network.connection.raw_connection_interface import IRawConnection
@ -75,7 +75,9 @@ class InsecureSession(BaseSession):
# Verify if the given `pubkey` matches the given `peer_id`
try:
received_pubkey = pubkey_from_protobuf(remote_msg.pubkey)
received_pubkey = deserialize_public_key(
remote_msg.pubkey.SerializeToString()
)
except ValueError:
raise HandshakeFailure(
f"unknown `key_type` of remote_msg.pubkey={remote_msg.pubkey}"

View File

@ -41,6 +41,7 @@ setuptools.setup(
"protobuf==3.9.0",
"coincurve>=10.0.0,<11.0.0",
"fastecdsa==1.7.4",
"pynacl==1.3.0",
],
extras_require=extras_require,
packages=setuptools.find_packages(exclude=["tests", "tests.*"]),

View File

@ -0,0 +1,22 @@
from libp2p.crypto.ed25519 import create_new_key_pair
from libp2p.crypto.serialization import deserialize_private_key, deserialize_public_key
def test_public_key_serialize_deserialize_round_trip():
key_pair = create_new_key_pair()
public_key = key_pair.public_key
public_key_bytes = public_key.serialize()
another_public_key = deserialize_public_key(public_key_bytes)
assert public_key == another_public_key
def test_private_key_serialize_deserialize_round_trip():
key_pair = create_new_key_pair()
private_key = key_pair.private_key
private_key_bytes = private_key.serialize()
another_private_key = deserialize_private_key(private_key_bytes)
assert private_key == another_private_key