Add tests for failed signature validation cases

This commit is contained in:
NIC619 2019-11-29 14:13:07 +08:00
parent a262b94836
commit f4e86b1172
No known key found for this signature in database
GPG Key ID: 570C35F5C2D51B17

View File

@ -5,6 +5,7 @@ import pytest
from libp2p.exceptions import ValidationError
from libp2p.peer.id import ID
from libp2p.pubsub.pubsub import PUBSUB_SIGNING_PREFIX
from libp2p.pubsub.pb import rpc_pb2
from libp2p.tools.pubsub.utils import make_pubsub_msg
from libp2p.tools.utils import connect
@ -514,7 +515,7 @@ async def test_push_msg(pubsubs_fsub, monkeypatch):
@pytest.mark.parametrize("num_hosts, is_strict_signing", ((2, True),))
@pytest.mark.asyncio
async def test_strict_signing(pubsubs_fsub, hosts, monkeypatch):
async def test_strict_signing(pubsubs_fsub, hosts):
await connect(hosts[0], hosts[1])
await pubsubs_fsub[0].subscribe(TESTING_TOPIC)
await pubsubs_fsub[1].subscribe(TESTING_TOPIC)
@ -525,3 +526,57 @@ async def test_strict_signing(pubsubs_fsub, hosts, monkeypatch):
assert len(pubsubs_fsub[0].seen_messages) == 1
assert len(pubsubs_fsub[1].seen_messages) == 1
@pytest.mark.parametrize("num_hosts, is_strict_signing", ((2, True),))
@pytest.mark.asyncio
async def test_strict_signing_failed_validation(pubsubs_fsub, hosts, monkeypatch):
msg = make_pubsub_msg(
origin_id=pubsubs_fsub[0].my_id,
topic_ids=[TESTING_TOPIC],
data=TESTING_DATA,
seqno=b"\x00" * 8,
)
priv_key = pubsubs_fsub[0].sign_key
signature = priv_key.sign(
PUBSUB_SIGNING_PREFIX.encode() + msg.SerializeToString()
)
event = asyncio.Event()
def _is_msg_seen(msg):
return False
# Use router publish to check if `push_msg` succeed.
async def router_publish(*args, **kwargs):
# The event will only be set if `push_msg` succeed.
event.set()
monkeypatch.setattr(pubsubs_fsub[0], "_is_msg_seen", _is_msg_seen)
monkeypatch.setattr(pubsubs_fsub[0].router, "publish", router_publish)
# Test: no signature attached in `msg`
await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg)
await asyncio.sleep(0.01)
assert not event.is_set()
# Test: `msg.key` does not match `msg.from_id`
msg.key = hosts[1].get_public_key().serialize()
msg.signature = signature
await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg)
await asyncio.sleep(0.01)
assert not event.is_set()
# Test: invalid signature
msg.key = hosts[0].get_public_key().serialize()
msg.signature = b"\x12" * 100
await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg)
await asyncio.sleep(0.01)
assert not event.is_set()
# Finally, assert the signature indeed will pass validation
msg.key = hosts[0].get_public_key().serialize()
msg.signature = signature
await pubsubs_fsub[0].push_msg(pubsubs_fsub[0].my_id, msg)
await asyncio.sleep(0.01)
assert event.is_set()