Merge pull request #188 from mhchia/feature/add-typing-for-pubusb

Add tox and mypy
This commit is contained in:
Kevin Mai-Husan Chia 2019-07-24 21:45:19 +08:00 committed by GitHub
commit f329c5a627
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 166 additions and 52 deletions

View File

@ -197,8 +197,7 @@ spelling-store-unknown-words=no
[MISCELLANEOUS] [MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma. # List of note tags to take in consideration, separated by a comma.
notes=FIXME, notes=
XXX
[TYPECHECK] [TYPECHECK]

View File

@ -4,20 +4,17 @@ matrix:
include: include:
- python: 3.7 - python: 3.7
dist: xenial dist: xenial
sudo: true env: TOXENV=py37-test
- python: 3.7
dist: xenial
env: TOXENV=lint
install: install:
- pip install --upgrade pip - pip install --upgrade pip
- pip install -r requirements_dev.txt - pip install tox
- python setup.py develop
script: script:
- pytest --cov=./libp2p tests/ - tox
- pylint --rcfile=.pylintrc libp2p tests
after_success:
- codecov
notifications: notifications:
slack: py-libp2p:RK0WVoQZhQXLgIKfHNPL1TR2 slack: py-libp2p:RK0WVoQZhQXLgIKfHNPL1TR2

0
examples/__init__.py Normal file
View File

View File

View File

@ -1,8 +1,8 @@
import argparse
import asyncio import asyncio
import sys import sys
import urllib.request import urllib.request
import click
import multiaddr import multiaddr
from libp2p import new_node from libp2p import new_node
@ -23,6 +23,7 @@ async def read_data(stream):
print("\x1b[32m %s\x1b[0m " % read_string, end="") print("\x1b[32m %s\x1b[0m " % read_string, end="")
# FIXME(mhchia): Reconsider whether we should use a thread pool here.
async def write_data(stream): async def write_data(stream):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
while True: while True:
@ -52,9 +53,9 @@ async def run(port, destination):
(int(port) + 1, external_ip, port, host.get_id().pretty())) (int(port) + 1, external_ip, port, host.get_id().pretty()))
print("\nWaiting for incoming connection\n\n") print("\nWaiting for incoming connection\n\n")
else: # its the client else: # its the client
m = multiaddr.Multiaddr(destination) maddr = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(m) info = info_from_p2p_addr(maddr)
# Associate the peer with local ip address # Associate the peer with local ip address
await host.connect(info) await host.connect(info)
@ -67,22 +68,38 @@ async def run(port, destination):
print("Connected to peer %s" % info.addrs[0]) print("Connected to peer %s" % info.addrs[0])
@click.command() def main():
@click.option('--port', '-p', help='source port number', default=8000) description = """
@click.option('--destination', '-d', help="Destination multiaddr string") This program demonstrates a simple p2p chat application using libp2p.
@click.option('--help', is_flag=True, default=False, help='display help') To use it, first run 'python ./chat -p <PORT>', where <PORT> is the port number.
# @click.option('--debug', is_flag=True, default=False, help='Debug generates the same node ID on every execution') Then, run another host with 'python ./chat -p <ANOTHER_PORT> -d <DESTINATION>',
def main(port, destination, help): where <DESTINATION> is the multiaddress of the previous listener host.
"""
if help: example_maddr = "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
print("This program demonstrates a simple p2p chat application using libp2p\n\n") parser = argparse.ArgumentParser(description=description)
print("Usage: Run './chat -p <SOURCE_PORT>' where <SOURCE_PORT> can be any port number.") parser.add_argument(
print("Now run './chat -p <PORT> -d <MULTIADDR>' where <MULTIADDR> is multiaddress of previous listener host.") "--debug",
return action='store_true',
help='generate the same node ID on every execution',
)
parser.add_argument(
"-p",
"--port",
default=8000,
type=int,
help="source port number",
)
parser.add_argument(
"-d",
"--destination",
type=str,
help=f"destination multiaddr string, e.g. {example_maddr}",
)
args = parser.parse_args()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
asyncio.ensure_future(run(port, destination)) asyncio.ensure_future(run(args.port, args.destination))
loop.run_forever() loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@ -1,3 +1,7 @@
from libp2p.peer.id import (
ID,
)
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_router_interface import IPubsubRouter from .pubsub_router_interface import IPubsubRouter
@ -42,7 +46,7 @@ class FloodSub(IPubsubRouter):
:param rpc: rpc message :param rpc: rpc message
""" """
async def publish(self, sender_peer_id, rpc_message): async def publish(self, sender_peer_id: ID, rpc_message: rpc_pb2.Message) -> None:
""" """
Invoked to forward a new message that has been validated. Invoked to forward a new message that has been validated.
This is where the "flooding" part of floodsub happens This is where the "flooding" part of floodsub happens

View File

@ -1,16 +1,62 @@
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
import asyncio import asyncio
import time
from typing import (
Any,
Dict,
List,
Sequence,
Tuple,
)
from lru import LRU from lru import LRU
from libp2p.host.host_interface import (
IHost,
)
from libp2p.peer.id import (
ID,
)
from libp2p.network.stream.net_stream_interface import (
INetStream,
)
from .pb import rpc_pb2 from .pb import rpc_pb2
from .pubsub_notifee import PubsubNotifee from .pubsub_notifee import PubsubNotifee
from .pubsub_router_interface import (
IPubsubRouter,
)
class Pubsub(): def get_msg_id(msg: rpc_pb2.Message) -> Tuple[bytes, bytes]:
# NOTE: `string(from, seqno)` in Go
return (msg.seqno, msg.from_id)
class Pubsub:
# pylint: disable=too-many-instance-attributes, no-member # pylint: disable=too-many-instance-attributes, no-member
def __init__(self, host, router, my_id, cache_size=None): host: IHost
my_id: ID
router: IPubsubRouter
peer_queue: asyncio.Queue
protocols: Sequence[str]
incoming_msgs_from_peers: asyncio.Queue()
outgoing_messages: asyncio.Queue()
seen_messages: LRU
my_topics: Dict[str, asyncio.Queue]
peer_topics: Dict[str, List[ID]]
# FIXME: Should be changed to `Dict[ID, INetStream]`
peers: Dict[str, INetStream]
# NOTE: Be sure it is increased atomically everytime.
counter: int # uint64
def __init__(
self,
host: IHost,
router: IPubsubRouter,
my_id: ID,
cache_size: int = None) -> None:
""" """
Construct a new Pubsub object, which is responsible for handling all Construct a new Pubsub object, which is responsible for handling all
Pubsub-related messages and relaying messages as appropriate to the Pubsub-related messages and relaying messages as appropriate to the
@ -57,10 +103,12 @@ class Pubsub():
# Create peers map, which maps peer_id (as string) to stream (to a given peer) # Create peers map, which maps peer_id (as string) to stream (to a given peer)
self.peers = {} self.peers = {}
self.counter = time.time_ns()
# Call handle peer to keep waiting for updates to peer queue # Call handle peer to keep waiting for updates to peer queue
asyncio.ensure_future(self.handle_peer_queue()) asyncio.ensure_future(self.handle_peer_queue())
def get_hello_packet(self): def get_hello_packet(self) -> bytes:
""" """
Generate subscription message with all topics we are subscribed to Generate subscription message with all topics we are subscribed to
only send hello packet if we have subscribed topics only send hello packet if we have subscribed topics
@ -73,7 +121,7 @@ class Pubsub():
return packet.SerializeToString() return packet.SerializeToString()
async def continuously_read_stream(self, stream): async def continuously_read_stream(self, stream: INetStream) -> None:
""" """
Read from input stream in an infinite loop. Process Read from input stream in an infinite loop. Process
messages from other nodes messages from other nodes
@ -120,7 +168,7 @@ class Pubsub():
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
async def stream_handler(self, stream): async def stream_handler(self, stream: INetStream) -> None:
""" """
Stream handler for pubsub. Gets invoked whenever a new stream is created Stream handler for pubsub. Gets invoked whenever a new stream is created
on one of the supported pubsub protocols. on one of the supported pubsub protocols.
@ -139,7 +187,7 @@ class Pubsub():
# Pass stream off to stream reader # Pass stream off to stream reader
asyncio.ensure_future(self.continuously_read_stream(stream)) asyncio.ensure_future(self.continuously_read_stream(stream))
async def handle_peer_queue(self): async def handle_peer_queue(self) -> None:
""" """
Continuously read from peer queue and each time a new peer is found, Continuously read from peer queue and each time a new peer is found,
open a stream to the peer using a supported pubsub protocol open a stream to the peer using a supported pubsub protocol
@ -170,7 +218,8 @@ class Pubsub():
# Force context switch # Force context switch
await asyncio.sleep(0) await asyncio.sleep(0)
def handle_subscription(self, origin_id, sub_message): # FIXME: `sub_message` can be further type hinted with mypy_protobuf
def handle_subscription(self, origin_id: ID, sub_message: Any) -> None:
""" """
Handle an incoming subscription message from a peer. Update internal Handle an incoming subscription message from a peer. Update internal
mapping to mark the peer as subscribed or unsubscribed to topics as mapping to mark the peer as subscribed or unsubscribed to topics as
@ -189,7 +238,9 @@ class Pubsub():
if origin_id in self.peer_topics[sub_message.topicid]: if origin_id in self.peer_topics[sub_message.topicid]:
self.peer_topics[sub_message.topicid].remove(origin_id) self.peer_topics[sub_message.topicid].remove(origin_id)
async def handle_talk(self, publish_message): # FIXME(mhchia): Change the function name?
# FIXME(mhchia): `publish_message` can be further type hinted with mypy_protobuf
async def handle_talk(self, publish_message: Any) -> None:
""" """
Put incoming message from a peer onto my blocking queue Put incoming message from a peer onto my blocking queue
:param talk: RPC.Message format :param talk: RPC.Message format
@ -203,7 +254,7 @@ class Pubsub():
# for each topic # for each topic
await self.my_topics[topic].put(publish_message) await self.my_topics[topic].put(publish_message)
async def subscribe(self, topic_id): async def subscribe(self, topic_id: str) -> asyncio.Queue:
""" """
Subscribe ourself to a topic Subscribe ourself to a topic
:param topic_id: topic_id to subscribe to :param topic_id: topic_id to subscribe to
@ -232,7 +283,7 @@ class Pubsub():
# Return the asyncio queue for messages on this topic # Return the asyncio queue for messages on this topic
return self.my_topics[topic_id] return self.my_topics[topic_id]
async def unsubscribe(self, topic_id): async def unsubscribe(self, topic_id: str) -> None:
""" """
Unsubscribe ourself from a topic Unsubscribe ourself from a topic
:param topic_id: topic_id to unsubscribe from :param topic_id: topic_id to unsubscribe from
@ -257,15 +308,14 @@ class Pubsub():
# Tell router we are leaving this topic # Tell router we are leaving this topic
await self.router.leave(topic_id) await self.router.leave(topic_id)
async def message_all_peers(self, rpc_msg): # FIXME: `rpc_msg` can be further type hinted with mypy_protobuf
async def message_all_peers(self, rpc_msg: Any) -> None:
""" """
Broadcast a message to peers Broadcast a message to peers
:param raw_msg: raw contents of the message to broadcast :param raw_msg: raw contents of the message to broadcast
""" """
# Broadcast message # Broadcast message
for peer in self.peers: for _, stream in self.peers.items():
stream = self.peers[peer]
# Write message to stream # Write message to stream
await stream.write(rpc_msg) await stream.write(rpc_msg)

12
mypy.ini Normal file
View File

@ -0,0 +1,12 @@
[mypy]
warn_unused_ignores = True
ignore_missing_imports = True
strict_optional = False
check_untyped_defs = True
disallow_incomplete_defs = True
disallow_untyped_defs = True
disallow_any_generics = True
disallow_untyped_calls = True
warn_redundant_casts = True
warn_unused_configs = True
strict_equality = True

View File

@ -1,7 +0,0 @@
pytest>=3.7
codecov
pytest-cov
pytest-asyncio
pylint
grpcio
grpcio-tools

View File

@ -7,6 +7,26 @@ classifiers = [
] ]
# pylint: disable=invalid-name
extras_require = {
"test": [
"codecov>=2.0.15,<3.0.0",
"pytest>=4.6.3,<5.0.0",
"pytest-cov>=2.7.1,<3.0.0",
"pytest-asyncio>=0.10.0,<1.0.0",
],
"lint": [
"pylint>=2.3.1,<3.0.0",
"mypy>=0.701,<1.0",
],
"dev": [
"tox>=3.13.2,<4.0.0",
],
}
extras_require["dev"] = extras_require["test"] + extras_require["lint"] + extras_require["dev"]
setuptools.setup( setuptools.setup(
name="libp2p", name="libp2p",
description="libp2p implementation written in python", description="libp2p implementation written in python",
@ -16,7 +36,6 @@ setuptools.setup(
classifiers=classifiers, classifiers=classifiers,
install_requires=[ install_requires=[
"pycryptodome>=3.8.2,<4.0.0", "pycryptodome>=3.8.2,<4.0.0",
"click>=7.0,<8.0",
"base58>=1.0.3,<2.0.0", "base58>=1.0.3,<2.0.0",
"pymultihash>=0.8.2", "pymultihash>=0.8.2",
"multiaddr>=0.0.8,<0.1.0", "multiaddr>=0.0.8,<0.1.0",
@ -24,8 +43,8 @@ setuptools.setup(
"grpcio>=1.21.1,<2.0.0", "grpcio>=1.21.1,<2.0.0",
"grpcio-tools>=1.21.1,<2.0.0", "grpcio-tools>=1.21.1,<2.0.0",
"lru-dict>=1.1.6", "lru-dict>=1.1.6",
"aio_timers>=0.0.1,<0.1.0",
], ],
extras_require=extras_require,
packages=setuptools.find_packages(exclude=["tests", "tests.*"]), packages=setuptools.find_packages(exclude=["tests", "tests.*"]),
zip_safe=False, zip_safe=False,
) )

23
tox.ini Normal file
View File

@ -0,0 +1,23 @@
# Reference: https://github.com/ethereum/py_ecc/blob/d0da74402210ea1503ef83b3c489d5b5eba7f7bf/tox.ini
[tox]
envlist =
py37-test
lint
[testenv]
deps =
passenv = CI TRAVIS TRAVIS_*
extras = test
commands =
pytest --cov=./libp2p tests/
codecov
basepython =
py37: python3.7
[testenv:lint]
basepython = python3
extras = dev
commands =
pylint --rcfile={toxinidir}/.pylintrc libp2p examples tests
mypy -p libp2p -p examples --config-file {toxinidir}/mypy.ini