Fixes to add python 3.6 compatibility

This commit is contained in:
Alex Stokes 2019-12-02 15:57:22 -08:00
parent dfdcf524b7
commit 63fd531ed0
No known key found for this signature in database
GPG Key ID: 51CE1721B245C086
9 changed files with 54 additions and 53 deletions

View File

@ -1,4 +1,5 @@
import asyncio
import sys
from .exceptions import RawConnError
from .raw_connection_interface import IRawConnection
@ -52,4 +53,5 @@ class RawConnection(IRawConnection):
async def close(self) -> None:
self.writer.close()
if sys.version_info[0:2] > (3, 6):
await self.writer.wait_closed()

View File

@ -150,7 +150,7 @@ class Pubsub:
# Map of topic to topic validator
self.topic_validators = {}
self.counter = time.time_ns()
self.counter = int(time.time())
self._tasks = []
# Call handle peer to keep waiting for updates to peer queue

View File

@ -1,5 +1,4 @@
import asyncio
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Dict, Tuple, cast
import factory
@ -33,6 +32,12 @@ from .constants import (
)
from .utils import connect, connect_swarm
try:
from contextlib import asynccontextmanager
except ImportError:
# NOTE: mypy complains about a duplicate import without the following ``# type: ignore``
from async_generator import asynccontextmanager # type: ignore
def initialize_peerstore_with_our_keypair(self_id: ID, key_pair: KeyPair) -> PeerStore:
peer_store = PeerStore()

View File

@ -143,6 +143,14 @@ floodsub_protocol_pytest_params = [
]
def _collect_node_ids(adj_list):
node_ids = set()
for node, neighbors in adj_list.items():
node_ids.add(node)
node_ids.update(set(neighbors))
return node_ids
async def perform_test_from_obj(obj, router_factory) -> None:
"""
Perform pubsub tests from a test object, which is composed as follows:
@ -180,59 +188,43 @@ async def perform_test_from_obj(obj, router_factory) -> None:
node_map = {}
pubsub_map = {}
async def add_node(node_id_str: str) -> None:
async def add_node(node_id_str: str):
pubsub_router = router_factory(protocols=obj["supported_protocols"])
pubsub = PubsubFactory(router=pubsub_router)
await pubsub.host.get_network().listen(LISTEN_MADDR)
node_map[node_id_str] = pubsub.host
pubsub_map[node_id_str] = pubsub
tasks_connect = []
for start_node_id in adj_list:
# Create node if node does not yet exist
if start_node_id not in node_map:
await add_node(start_node_id)
all_node_ids = _collect_node_ids(adj_list)
# For each neighbor of start_node, create if does not yet exist,
# then connect start_node to neighbor
for neighbor_id in adj_list[start_node_id]:
# Create neighbor if neighbor does not yet exist
if neighbor_id not in node_map:
await add_node(neighbor_id)
tasks_connect.append(
connect(node_map[start_node_id], node_map[neighbor_id])
)
# Connect nodes and wait at least for 2 seconds
await asyncio.gather(*tasks_connect, asyncio.sleep(2))
for node in all_node_ids:
await add_node(node)
for node, neighbors in adj_list.items():
for neighbor_id in neighbors:
await connect(node_map[node], node_map[neighbor_id])
# NOTE: the test using this routine will fail w/o these sleeps...
await asyncio.sleep(1)
# Step 2) Subscribe to topics
queues_map = {}
topic_map = obj["topic_map"]
tasks_topic = []
tasks_topic_data = []
for topic, node_ids in topic_map.items():
for node_id in node_ids:
tasks_topic.append(pubsub_map[node_id].subscribe(topic))
tasks_topic_data.append((node_id, topic))
tasks_topic.append(asyncio.sleep(2))
# Gather is like Promise.all
responses = await asyncio.gather(*tasks_topic)
for i in range(len(responses) - 1):
node_id, topic = tasks_topic_data[i]
queue = await pubsub_map[node_id].subscribe(topic)
if node_id not in queues_map:
queues_map[node_id] = {}
# Store queue in topic-queue map for node
queues_map[node_id][topic] = responses[i]
queues_map[node_id][topic] = queue
# Allow time for subscribing before continuing
await asyncio.sleep(0.01)
# NOTE: the test using this routine will fail w/o these sleeps...
await asyncio.sleep(1)
# Step 3) Publish messages
topics_in_msgs_ordered = []
messages = obj["messages"]
tasks_publish = []
for msg in messages:
topics = msg["topics"]
@ -242,21 +234,17 @@ async def perform_test_from_obj(obj, router_factory) -> None:
# Publish message
# TODO: Should be single RPC package with several topics
for topic in topics:
tasks_publish.append(pubsub_map[node_id].publish(topic, data))
await pubsub_map[node_id].publish(topic, data)
# For each topic in topics, add (topic, node_id, data) tuple to ordered test list
for topic in topics:
topics_in_msgs_ordered.append((topic, node_id, data))
# Allow time for publishing before continuing
await asyncio.gather(*tasks_publish, asyncio.sleep(2))
# Step 4) Check that all messages were received correctly.
for topic, origin_node_id, data in topics_in_msgs_ordered:
# Look at each node in each topic
for node_id in topic_map[topic]:
# Get message from subscription queue
msg = await queues_map[node_id][topic].get()
queue = queues_map[node_id][topic]
msg = await queue.get()
assert data == msg.data
# Check the message origin
assert node_map[origin_node_id].get_id().to_bytes() == msg.from_id

View File

@ -1,5 +1,6 @@
import asyncio
from socket import socket
import sys
from typing import List
from multiaddr import Multiaddr
@ -53,6 +54,7 @@ class TCPListener(IListener):
if self.server is None:
return
self.server.close()
if sys.version_info[0:2] > (3, 6):
await self.server.wait_closed()
self.server = None

View File

@ -58,6 +58,8 @@ install_requires = [
"protobuf>=3.10.0,<4.0.0",
"coincurve>=10.0.0,<11.0.0",
"pynacl==1.3.0",
"dataclasses>=0.7, <1;python_version<'3.7'",
"async_generator==1.10;python_version<'3.7'",
]
@ -80,7 +82,7 @@ setup(
url="https://github.com/libp2p/py-libp2p",
include_package_data=True,
install_requires=install_requires,
python_requires=">=3.7,<4",
python_requires=">=3.6,<4",
extras_require=extras_require,
py_modules=["libp2p"],
license="MIT/APACHE2.0",

View File

@ -58,11 +58,11 @@ async def test_peers_subscribe(pubsubs_fsub):
await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host)
await pubsubs_fsub[0].subscribe(TESTING_TOPIC)
# Yield to let 0 notify 1
await asyncio.sleep(0.1)
await asyncio.sleep(1)
assert pubsubs_fsub[0].my_id in pubsubs_fsub[1].peer_topics[TESTING_TOPIC]
await pubsubs_fsub[0].unsubscribe(TESTING_TOPIC)
# Yield to let 0 notify 1
await asyncio.sleep(0.1)
await asyncio.sleep(1)
assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics[TESTING_TOPIC]

View File

@ -76,8 +76,8 @@ async def test_create_secure_session():
local_conn = InMemoryConnection(local_peer, is_initiator=True)
remote_conn = InMemoryConnection(remote_peer)
local_pipe_task = asyncio.create_task(create_pipe(local_conn, remote_conn))
remote_pipe_task = asyncio.create_task(create_pipe(remote_conn, local_conn))
local_pipe_task = asyncio.ensure_future(create_pipe(local_conn, remote_conn))
remote_pipe_task = asyncio.ensure_future(create_pipe(remote_conn, local_conn))
local_session_builder = create_secure_session(
local_nonce, local_peer, local_key_pair.private_key, local_conn, remote_peer

View File

@ -151,6 +151,7 @@ class DaemonStream(ReadWriteCloser):
async def close(self) -> None:
self.writer.close()
if sys.version_info[0:2] > (3, 6):
await self.writer.wait_closed()
async def read(self, n: int = -1) -> bytes:
@ -196,6 +197,7 @@ async def py_to_daemon_stream_pair(hosts, p2pds, is_to_fail_daemon_stream):
# some day.
listener = p2pds[0].control.control.listener
listener.close()
if sys.version_info[0:2] > (3, 6):
await listener.wait_closed()
stream_py = await host.new_stream(p2pd.peer_id, [protocol_id])
if not is_to_fail_daemon_stream: