Use asyncio.subprocess over pexpect

In the test for pubsub, since there were unknown issues when I test
against pexpect.
This commit is contained in:
mhchia 2019-09-01 23:39:53 +08:00
parent 1b5d064a8d
commit b77834d129
No known key found for this signature in database
GPG Key ID: 389EFBEA1362589A
2 changed files with 137 additions and 42 deletions

View File

@ -1,24 +1,113 @@
from multiaddr import Multiaddr import asyncio
from pexpect.spawnbase import SpawnBase import time
from typing import Any, List
import multiaddr
from multiaddr import Multiaddr
from p2pclient import Client from p2pclient import Client
from libp2p.peer.peerinfo import info_from_p2p_addr, PeerInfo
from libp2p.peer.id import ID from libp2p.peer.id import ID
from libp2p.peer.peerinfo import PeerInfo, info_from_p2p_addr
from .constants import PEXPECT_NEW_LINE, LOCALHOST_IP from .constants import LOCALHOST_IP
from .envs import GO_BIN_PATH from .envs import GO_BIN_PATH
P2PD_PATH = GO_BIN_PATH / "p2pd" P2PD_PATH = GO_BIN_PATH / "p2pd"
TIMEOUT_DURATION = 30
async def try_until_success(coro_func, timeout=TIMEOUT_DURATION):
"""
Keep running ``coro_func`` until the time is out.
All arguments of ``coro_func`` should be filled, i.e. it should be called without arguments.
"""
t_start = time.monotonic()
while True:
result = await coro_func()
if result:
break
if (time.monotonic() - t_start) >= timeout:
# timeout
assert False, f"{coro_func} still failed after `{timeout}` seconds"
await asyncio.sleep(0.01)
class P2PDProcess:
proc: asyncio.subprocess.Process
cmd: str = str(P2PD_PATH)
args: List[Any]
def __init__(
self,
control_maddr: Multiaddr,
is_secure: bool,
is_pubsub_enabled=True,
is_gossipsub=True,
is_pubsub_signing=False,
is_pubsub_signing_strict=False,
) -> None:
args = [f"-listen={str(control_maddr)}"]
# NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`.
if not is_secure:
args.append("-insecure=true")
if is_pubsub_enabled:
args.append("-pubsub")
if is_gossipsub:
args.append("-pubsubRouter=gossipsub")
else:
args.append("-pubsubRouter=floodsub")
if not is_pubsub_signing:
args.append("-pubsubSign=false")
if not is_pubsub_signing_strict:
args.append("-pubsubSignStrict=false")
# NOTE:
# Two other params are possibly what we want to configure:
# - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond # noqa: E501
# - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second
# Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501
self.args = args
async def wait_until_ready(self):
lines_head_pattern = (b"Control socket:", b"Peer ID:", b"Peer Addrs:")
lines_head_occurred = {line: False for line in lines_head_pattern}
async def read_from_daemon_and_check():
line = await self.proc.stdout.readline()
for head_pattern in lines_head_occurred:
if line.startswith(head_pattern):
lines_head_occurred[head_pattern] = True
return all([value for _, value in lines_head_occurred.items()])
await try_until_success(read_from_daemon_and_check)
# Sleep a little bit to ensure the listener is up after logs are emitted.
await asyncio.sleep(0.01)
async def start(self) -> None:
self.proc = await asyncio.subprocess.create_subprocess_exec(
self.cmd,
*self.args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
bufsize=0,
)
await self.wait_until_ready()
async def close(self) -> None:
self.proc.terminate()
await self.proc.wait()
class Daemon: class Daemon:
proc: SpawnBase p2pd_proc: P2PDProcess
control: Client control: Client
peer_info: PeerInfo peer_info: PeerInfo
def __init__(self, proc: SpawnBase, control: Client, peer_info: PeerInfo) -> None: def __init__(
self.proc = proc self, p2pd_proc: P2PDProcess, control: Client, peer_info: PeerInfo
) -> None:
self.p2pd_proc = p2pd_proc
self.control = control self.control = control
self.peer_info = peer_info self.peer_info = peer_info
@ -33,9 +122,12 @@ class Daemon:
def listen_maddr(self) -> Multiaddr: def listen_maddr(self) -> Multiaddr:
return self.peer_info.addrs[0] return self.peer_info.addrs[0]
async def close(self) -> None:
await self.p2pd_proc.close()
await self.control.close()
async def make_p2pd( async def make_p2pd(
proc_factory,
unused_tcp_port_factory, unused_tcp_port_factory,
is_secure: bool, is_secure: bool,
is_pubsub_enabled=True, is_pubsub_enabled=True,
@ -44,38 +136,34 @@ async def make_p2pd(
is_pubsub_signing_strict=False, is_pubsub_signing_strict=False,
) -> Daemon: ) -> Daemon:
control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}") control_maddr = Multiaddr(f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}")
args = [f"-listen={str(control_maddr)}"] p2pd_proc = P2PDProcess(
# NOTE: To support `-insecure`, we need to hack `go-libp2p-daemon`. control_maddr,
if not is_secure: is_secure,
args.append("-insecure=true") is_pubsub_enabled,
if is_pubsub_enabled: is_gossipsub,
args.append("-pubsub") is_pubsub_signing,
if is_gossipsub: is_pubsub_signing_strict,
args.append("-pubsubRouter=gossipsub")
else:
args.append("-pubsubRouter=floodsub")
if not is_pubsub_signing:
args.append("-pubsubSign=false")
if not is_pubsub_signing_strict:
args.append("-pubsubSignStrict=false")
# NOTE:
# Two other params are possibly what we want to configure:
# - gossipsubHeartbeatInterval: GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
# - gossipsubHeartbeatInitialDelay: GossipSubHeartbeatInterval = 1 * time.Second
# Referece: https://github.com/libp2p/go-libp2p-daemon/blob/b95e77dbfcd186ccf817f51e95f73f9fd5982600/p2pd/main.go#L348-L353 # noqa: E501
proc = proc_factory(str(P2PD_PATH), args)
await proc.expect(r"Peer ID:\s+(\w+)" + PEXPECT_NEW_LINE, async_=True)
peer_id_base58 = proc.match.group(1)
await proc.expect(r"Peer Addrs:", async_=True)
await proc.expect(
rf"(/ip4/{LOCALHOST_IP}/tcp/[\w]+)" + PEXPECT_NEW_LINE, async_=True
)
daemon_listener_maddr = Multiaddr(proc.match.group(1))
daemon_pinfo = info_from_p2p_addr(
daemon_listener_maddr.encapsulate(f"/p2p/{peer_id_base58}")
) )
await p2pd_proc.start()
client_callback_maddr = Multiaddr( client_callback_maddr = Multiaddr(
f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}" f"/ip4/{LOCALHOST_IP}/tcp/{unused_tcp_port_factory()}"
) )
p2pc = Client(control_maddr, client_callback_maddr) p2pc = Client(control_maddr, client_callback_maddr)
return Daemon(proc, p2pc, daemon_pinfo) await p2pc.listen()
peer_id, maddrs = await p2pc.identify()
listen_maddr: Multiaddr = None
for maddr in maddrs:
try:
ip = maddr.value_for_protocol(multiaddr.protocols.P_IP4)
maddr.value_for_protocol(multiaddr.protocols.P_TCP)
except multiaddr.exceptions.ProtocolLookupError:
continue
if ip == LOCALHOST_IP:
listen_maddr = maddr
break
assert listen_maddr is not None, "no loopback maddr is found"
peer_info = info_from_p2p_addr(
listen_maddr.encapsulate(Multiaddr(f"/p2p/{peer_id.to_string()}"))
)
print(f"!@# peer_info: peer_id={peer_info.peer_id}, maddrs={peer_info.addrs}")
return Daemon(p2pd_proc, p2pc, peer_info)

View File

@ -5,7 +5,14 @@ from .daemon import make_p2pd
@pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.parametrize("num_hosts", (1,))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pubsub_init( async def test_pubsub_init(hosts, is_host_secure, unused_tcp_port_factory):
hosts, proc_factory, is_host_secure, unused_tcp_port_factory try:
): p2pd = await make_p2pd(unused_tcp_port_factory, is_host_secure)
p2pd = await make_p2pd(proc_factory, unused_tcp_port_factory, is_host_secure) host = hosts[0]
peers = await p2pd.control.list_peers()
assert len(peers) == 0
await host.connect(p2pd.peer_info)
peers = await p2pd.control.list_peers()
assert len(peers) != 0
finally:
await p2pd.close()