run black w/ extended line length

This commit is contained in:
Alex Stokes 2019-08-03 11:25:25 -07:00 committed by Kevin Mai-Husan Chia
parent 905dfa9a8d
commit 7477b29508
33 changed files with 71 additions and 229 deletions

View File

@ -80,23 +80,14 @@ def main():
Then, run another host with 'python ./chat -p <ANOTHER_PORT> -d <DESTINATION>',
where <DESTINATION> is the multiaddress of the previous listener host.
"""
example_maddr = (
"/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
example_maddr = "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"--debug",
action="store_true",
help="generate the same node ID on every execution",
"--debug", action="store_true", help="generate the same node ID on every execution"
)
parser.add_argument("-p", "--port", default=8000, type=int, help="source port number")
parser.add_argument(
"-p", "--port", default=8000, type=int, help="source port number"
)
parser.add_argument(
"-d",
"--destination",
type=str,
help=f"destination multiaddr string, e.g. {example_maddr}",
"-d", "--destination", type=str, help=f"destination multiaddr string, e.g. {example_maddr}"
)
parser.add_argument(
"-l",

View File

@ -52,12 +52,7 @@ def initialize_default_kademlia_router(ksize=20, alpha=3, id_opt=None, storage=N
def initialize_default_swarm(
id_opt=None,
transport_opt=None,
muxer_opt=None,
sec_opt=None,
peerstore_opt=None,
disc_opt=None,
id_opt=None, transport_opt=None, muxer_opt=None, sec_opt=None, peerstore_opt=None, disc_opt=None
):
"""
initialize swarm when no swarm is passed in

View File

@ -68,9 +68,7 @@ class BasicHost(IHost):
addrs.append(addr.encapsulate(p2p_part))
return addrs
def set_stream_handler(
self, protocol_id: str, stream_handler: StreamHandlerFn
) -> bool:
def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool:
"""
set stream handler for host
:param protocol_id: protocol id used on stream

View File

@ -38,9 +38,7 @@ class IHost(ABC):
"""
@abstractmethod
def set_stream_handler(
self, protocol_id: str, stream_handler: StreamHandlerFn
) -> bool:
def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool:
"""
set stream handler for host
:param protocol_id: protocol id used on stream

View File

@ -114,9 +114,7 @@ class ValueSpiderCrawl(SpiderCrawl):
"""
value_counts = Counter(values)
if len(value_counts) != 1:
log.warning(
"Got multiple values for key %i: %s", self.node.xor_id, str(values)
)
log.warning("Got multiple values for key %i: %s", self.node.xor_id, str(values))
value = value_counts.most_common(1)[0][0]
peer = self.nearest_without_value.popleft()

View File

@ -49,9 +49,7 @@ class KadPeerInfo(PeerInfo):
def encode(self):
return (
str(self.peer_id_bytes)
+ "\n"
+ str("/ip4/" + str(self.ip) + "/udp/" + str(self.port))
str(self.peer_id_bytes) + "\n" + str("/ip4/" + str(self.ip) + "/udp/" + str(self.port))
)
@ -139,17 +137,11 @@ class KadPeerHeap:
def create_kad_peerinfo(node_id_bytes=None, sender_ip=None, sender_port=None):
node_id = (
ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255)))
)
node_id = ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255)))
peer_data = None
if sender_ip and sender_port:
peer_data = PeerData()
addr = [
Multiaddr(
"/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port)
)
]
addr = [Multiaddr("/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port))]
peer_data.add_addrs(addr)
return KadPeerInfo(node_id, peer_data)

View File

@ -62,9 +62,7 @@ class KademliaServer:
Provide interface="::" to accept ipv6 address
"""
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(
self._create_protocol, local_addr=(interface, port)
)
listen = loop.create_datagram_endpoint(self._create_protocol, local_addr=(interface, port))
log.info("Node %i listening on %s:%i", self.node.xor_id, interface, port)
self.transport, self.protocol = await listen
# finally, schedule refreshing table
@ -85,9 +83,7 @@ class KademliaServer:
for node_id in self.protocol.get_refresh_ids():
node = create_kad_peerinfo(node_id)
nearest = self.protocol.router.find_neighbors(node, self.alpha)
spider = NodeSpiderCrawl(
self.protocol, node, nearest, self.ksize, self.alpha
)
spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
results.append(spider.find())
# do our crawling
@ -122,9 +118,7 @@ class KademliaServer:
cos = list(map(self.bootstrap_node, addrs))
gathered = await asyncio.gather(*cos)
nodes = [node for node in gathered if node is not None]
spider = NodeSpiderCrawl(
self.protocol, self.node, nodes, self.ksize, self.alpha
)
spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha)
return await spider.find()
async def bootstrap_node(self, addr):

View File

@ -50,9 +50,7 @@ class KademliaProtocol(RPCProtocol):
source = create_kad_peerinfo(nodeid, sender[0], sender[1])
self.welcome_if_new(source)
log.debug(
"got a store request from %s, storing '%s'='%s'", sender, key.hex(), value
)
log.debug("got a store request from %s, storing '%s'='%s'", sender, key.hex(), value)
self.storage[key] = value
return True
@ -82,9 +80,7 @@ class KademliaProtocol(RPCProtocol):
we store a map of content_id to peer_id (non xor)
"""
if nodeid == provider_id:
log.info(
"adding provider %s for key %s in local table", provider_id, str(key)
)
log.info("adding provider %s for key %s in local table", provider_id, str(key))
self.storage[key] = provider_id
return True
return False
@ -135,9 +131,7 @@ class KademliaProtocol(RPCProtocol):
async def call_add_provider(self, node_to_ask, key, provider_id):
address = (node_to_ask.ip, node_to_ask.port)
result = await self.add_provider(
address, self.source_node.peer_id_bytes, key, provider_id
)
result = await self.add_provider(address, self.source_node.peer_id_bytes, key, provider_id)
return self.handle_call_response(result, node_to_ask)

View File

@ -40,9 +40,7 @@ class INetwork(ABC):
"""
@abstractmethod
def set_stream_handler(
self, protocol_id: str, stream_handler: StreamHandlerFn
) -> bool:
def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool:
"""
:param protocol_id: protocol id used on stream
:param stream_handler: a stream handler instance

View File

@ -69,9 +69,7 @@ class Swarm(INetwork):
def get_peer_id(self) -> ID:
return self.self_id
def set_stream_handler(
self, protocol_id: str, stream_handler: StreamHandlerFn
) -> bool:
def set_stream_handler(self, protocol_id: str, stream_handler: StreamHandlerFn) -> bool:
"""
:param protocol_id: protocol id used on stream
:param stream_handler: a stream handler instance
@ -199,9 +197,7 @@ class Swarm(INetwork):
# Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure
# the conn and then mux the conn
secured_conn = await self.upgrader.upgrade_security(
raw_conn, peer_id, False
)
secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, False)
muxed_conn = self.upgrader.upgrade_connection(
secured_conn, self.generic_protocol_handler, peer_id
)

View File

@ -25,9 +25,7 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo:
parts = addr.split()
if not parts:
raise InvalidAddrError(
f"`parts`={parts} should at least have a protocol `P_P2P`"
)
raise InvalidAddrError(f"`parts`={parts} should at least have a protocol `P_P2P`")
p2p_part = parts[-1]
last_protocol_code = p2p_part.protocols()[0].code

View File

@ -66,9 +66,7 @@ class FloodSub(IPubsubRouter):
"""
peers_gen = self._get_peers_to_send(
pubsub_msg.topicIDs,
msg_forwarder=msg_forwarder,
origin=ID(pubsub_msg.from_id),
pubsub_msg.topicIDs, msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id)
)
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
for peer_id in peers_gen:

View File

@ -156,9 +156,7 @@ class GossipSub(IPubsubRouter):
self.mcache.put(pubsub_msg)
peers_gen = self._get_peers_to_send(
pubsub_msg.topicIDs,
msg_forwarder=msg_forwarder,
origin=ID(pubsub_msg.from_id),
pubsub_msg.topicIDs, msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id)
)
rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg])
for peer_id in peers_gen:
@ -353,9 +351,7 @@ class GossipSub(IPubsubRouter):
if num_fanout_peers_in_topic < self.degree:
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic]
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
topic,
self.degree - num_fanout_peers_in_topic,
self.fanout[topic],
topic, self.degree - num_fanout_peers_in_topic, self.fanout[topic]
)
# Add the peers to fanout[topic]
self.fanout[topic].extend(selected_peers)
@ -374,9 +370,7 @@ class GossipSub(IPubsubRouter):
for peer in peers_to_emit_ihave_to:
# TODO: this line is a monster, can hopefully be simplified
if (
topic not in self.mesh or (peer not in self.mesh[topic])
) and (
if (topic not in self.mesh or (peer not in self.mesh[topic])) and (
topic not in self.fanout or (peer not in self.fanout[topic])
):
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
@ -396,10 +390,7 @@ class GossipSub(IPubsubRouter):
topic, self.degree, []
)
for peer in peers_to_emit_ihave_to:
if (
peer not in self.mesh[topic]
and peer not in self.fanout[topic]
):
if peer not in self.mesh[topic] and peer not in self.fanout[topic]:
msg_id_strs = [str(msg) for msg in msg_ids]
await self.emit_ihave(topic, msg_id_strs, peer)
@ -439,19 +430,13 @@ class GossipSub(IPubsubRouter):
self, topic: str, num_to_select: int, minus: Sequence[ID]
) -> List[ID]:
gossipsub_peers_in_topic = [
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if peer_id in self.peers_gossipsub
peer_id for peer_id in self.pubsub.peer_topics[topic] if peer_id in self.peers_gossipsub
]
return self.select_from_minus(
num_to_select, gossipsub_peers_in_topic, list(minus)
)
return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, list(minus))
# RPC handlers
async def handle_ihave(
self, ihave_msg: rpc_pb2.Message, sender_peer_id: ID
) -> None:
async def handle_ihave(self, ihave_msg: rpc_pb2.Message, sender_peer_id: ID) -> None:
"""
Checks the seen set and requests unknown messages with an IWANT message.
"""
@ -475,9 +460,7 @@ class GossipSub(IPubsubRouter):
if msg_ids_wanted:
await self.emit_iwant(msg_ids_wanted, sender_peer_id)
async def handle_iwant(
self, iwant_msg: rpc_pb2.Message, sender_peer_id: ID
) -> None:
async def handle_iwant(self, iwant_msg: rpc_pb2.Message, sender_peer_id: ID) -> None:
"""
Forwards all request messages that are present in mcache to the requesting peer.
"""
@ -512,9 +495,7 @@ class GossipSub(IPubsubRouter):
# 4) And write the packet to the stream
await peer_stream.write(rpc_msg)
async def handle_graft(
self, graft_msg: rpc_pb2.Message, sender_peer_id: ID
) -> None:
async def handle_graft(self, graft_msg: rpc_pb2.Message, sender_peer_id: ID) -> None:
topic: str = graft_msg.topicID
# Add peer to mesh for topic
@ -525,9 +506,7 @@ class GossipSub(IPubsubRouter):
# Respond with PRUNE if not subscribed to the topic
await self.emit_prune(topic, sender_peer_id)
async def handle_prune(
self, prune_msg: rpc_pb2.Message, sender_peer_id: ID
) -> None:
async def handle_prune(self, prune_msg: rpc_pb2.Message, sender_peer_id: ID) -> None:
topic: str = prune_msg.topicID
# Remove peer from mesh for topic, if peer is in topic
@ -589,9 +568,7 @@ class GossipSub(IPubsubRouter):
await self.emit_control_message(control_msg, to_peer)
async def emit_control_message(
self, control_msg: rpc_pb2.ControlMessage, to_peer: ID
) -> None:
async def emit_control_message(self, control_msg: rpc_pb2.ControlMessage, to_peer: ID) -> None:
# Add control message to packet
packet: rpc_pb2.RPC = rpc_pb2.RPC()
packet.control.CopyFrom(control_msg)

View File

@ -105,9 +105,7 @@ class Pubsub:
"""
packet = rpc_pb2.RPC()
for topic_id in self.my_topics:
packet.subscriptions.extend(
[rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]
)
packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)])
return packet.SerializeToString()
async def continuously_read_stream(self, stream: INetStream) -> None:

View File

@ -29,9 +29,7 @@ class InsecureTransport(ISecureTransport):
insecure_conn = InsecureConn(conn, self.transport_id)
return insecure_conn
async def secure_outbound(
self, conn: "IRawConnection", peer_id: "ID"
) -> ISecureConn:
async def secure_outbound(self, conn: "IRawConnection", peer_id: "ID") -> ISecureConn:
"""
Secure the connection, either locally or by communicating with opposing node via conn,
for an inbound connection (i.e. we are the initiator)

View File

@ -25,9 +25,7 @@ class ISecureTransport(ABC):
"""
@abstractmethod
async def secure_outbound(
self, conn: "IRawConnection", peer_id: "ID"
) -> "ISecureConn":
async def secure_outbound(self, conn: "IRawConnection", peer_id: "ID") -> "ISecureConn":
"""
Secure the connection, either locally or by communicating with opposing node via conn,
for an inbound connection (i.e. we are the initiator)

View File

@ -61,9 +61,7 @@ class SecurityMultistream(ABC):
return secure_conn
async def secure_outbound(
self, conn: "IRawConnection", peer_id: "ID"
) -> "ISecureConn":
async def secure_outbound(self, conn: "IRawConnection", peer_id: "ID") -> "ISecureConn":
"""
Secure the connection, either locally or by communicating with opposing node via conn,
for an inbound connection (i.e. we are the initiator)
@ -78,9 +76,7 @@ class SecurityMultistream(ABC):
return secure_conn
async def select_transport(
self, conn: "IRawConnection", initiator: bool
) -> "ISecureTransport":
async def select_transport(self, conn: "IRawConnection", initiator: bool) -> "ISecureTransport":
"""
Select a transport that both us and the node on the
other end of conn support and agree on

View File

@ -26,16 +26,12 @@ class SimpleSecurityTransport(ISecureTransport):
incoming = (await conn.read()).decode()
if incoming != self.key_phrase:
raise Exception(
"Key phrase differed between nodes. Expected " + self.key_phrase
)
raise Exception("Key phrase differed between nodes. Expected " + self.key_phrase)
secure_conn = SimpleSecureConn(conn, self.key_phrase)
return secure_conn
async def secure_outbound(
self, conn: "IRawConnection", peer_id: "ID"
) -> "ISecureConn":
async def secure_outbound(self, conn: "IRawConnection", peer_id: "ID") -> "ISecureConn":
"""
Secure the connection, either locally or by communicating with opposing node via conn,
for an inbound connection (i.e. we are the initiator)
@ -49,9 +45,7 @@ class SimpleSecurityTransport(ISecureTransport):
await asyncio.sleep(0)
if incoming != self.key_phrase:
raise Exception(
"Key phrase differed between nodes. Expected " + self.key_phrase
)
raise Exception("Key phrase differed between nodes. Expected " + self.key_phrase)
secure_conn = SimpleSecureConn(conn, self.key_phrase)
return secure_conn

View File

@ -157,9 +157,7 @@ class Mplex(IMuxedConn):
try:
header = await decode_uvarint_from_stream(self.raw_conn.reader, timeout)
length = await decode_uvarint_from_stream(self.raw_conn.reader, timeout)
message = await asyncio.wait_for(
self.raw_conn.reader.read(length), timeout=timeout
)
message = await asyncio.wait_for(self.raw_conn.reader.read(length), timeout=timeout)
except asyncio.TimeoutError:
return None, None, None

View File

@ -50,9 +50,7 @@ class MplexStream(IMuxedStream):
"""
# TODO error handling with timeout
# TODO understand better how mutexes are used from go repo
await self.mplex_conn.send_message(
get_flag(self.initiator, "CLOSE"), None, self.stream_id
)
await self.mplex_conn.send_message(get_flag(self.initiator, "CLOSE"), None, self.stream_id)
remote_lock = ""
async with self.stream_lock:

View File

@ -25,9 +25,7 @@ class TCP(ITransport):
:return: return True if successful
"""
self.server = await asyncio.start_server(
self.handler,
maddr.value_for_protocol("ip4"),
maddr.value_for_protocol("tcp"),
self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp")
)
socket = self.server.sockets[0]
self.multiaddrs.append(_multiaddr_from_socket(socket))

View File

@ -11,18 +11,11 @@ extras_require = {
"pytest-cov>=2.7.1,<3.0.0",
"pytest-asyncio>=0.10.0,<1.0.0",
],
"lint": [
"mypy>=0.701,<1.0",
"black==19.3b0",
"isort==4.3.21",
"flake8>=3.7.7,<4.0.0",
],
"lint": ["mypy>=0.701,<1.0", "black==19.3b0", "isort==4.3.21", "flake8>=3.7.7,<4.0.0"],
"dev": ["tox>=3.13.2,<4.0.0"],
}
extras_require["dev"] = (
extras_require["test"] + extras_require["lint"] + extras_require["dev"]
)
extras_require["dev"] = extras_require["test"] + extras_require["lint"] + extras_require["dev"]
setuptools.setup(

View File

@ -114,9 +114,7 @@ async def test_multiple_streams():
response_a = (await stream_a.read()).decode()
response_b = (await stream_b.read()).decode()
assert response_a == ("ack_b:" + a_message) and response_b == (
"ack_a:" + b_message
)
assert response_a == ("ack_b:" + a_message) and response_b == ("ack_a:" + b_message)
# Success, terminate pending tasks.
await cleanup()

View File

@ -15,11 +15,7 @@ import pytest
from libp2p import initialize_default_swarm, new_node
from libp2p.host.basic_host import BasicHost
from libp2p.network.notifee_interface import INotifee
from tests.utils import (
cleanup,
echo_stream_handler,
perform_two_host_set_up_custom_handler,
)
from tests.utils import cleanup, echo_stream_handler, perform_two_host_set_up_custom_handler
class MyNotifee(INotifee):
@ -130,10 +126,7 @@ async def test_one_notifier_on_two_nodes():
# Ensure the connected and opened_stream events were hit in Notifee obj
# and that the stream passed into opened_stream matches the stream created on
# node_b
assert events_b == [
["connectedb", stream.mplex_conn],
["opened_streamb", stream],
]
assert events_b == [["connectedb", stream.mplex_conn], ["opened_streamb", stream]]
while True:
read_string = (await stream.read()).decode()

View File

@ -37,8 +37,7 @@ def test_init_no_value():
pytest.param(random.randint(0, 255), id="random integer"),
pytest.param(multiaddr.Multiaddr("/"), id="empty multiaddr"),
pytest.param(
multiaddr.Multiaddr("/ip4/127.0.0.1"),
id="multiaddr without peer_id(p2p protocol)",
multiaddr.Multiaddr("/ip4/127.0.0.1"), id="multiaddr without peer_id(p2p protocol)"
),
),
)

View File

@ -48,9 +48,7 @@ async def perform_simple_test(
@pytest.mark.asyncio
async def test_single_protocol_succeeds():
expected_selected_protocol = "/echo/1.0.0"
await perform_simple_test(
expected_selected_protocol, ["/echo/1.0.0"], ["/echo/1.0.0"]
)
await perform_simple_test(expected_selected_protocol, ["/echo/1.0.0"], ["/echo/1.0.0"])
@pytest.mark.asyncio

View File

@ -16,9 +16,7 @@ def num_hosts():
@pytest.fixture
async def hosts(num_hosts):
_hosts = HostFactory.create_batch(num_hosts)
await asyncio.gather(
*[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts]
)
await asyncio.gather(*[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts])
yield _hosts
# Clean up
listeners = []

View File

@ -102,10 +102,7 @@ FLOODSUB_PROTOCOL_TEST_CASES = [
"3": ["1", "2", "4"],
"4": ["1", "2", "3"],
},
"topic_map": {
"astrophysics": ["1", "2", "3", "4"],
"school": ["1", "2", "3", "4"],
},
"topic_map": {"astrophysics": ["1", "2", "3", "4"], "school": ["1", "2", "3", "4"]},
"messages": [
{"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"},
{"topics": ["school"], "data": b"foobar", "node_id": "2"},
@ -137,8 +134,7 @@ FLOODSUB_PROTOCOL_TEST_CASES = [
]
floodsub_protocol_pytest_params = [
pytest.param(test_case, id=test_case["name"])
for test_case in FLOODSUB_PROTOCOL_TEST_CASES
pytest.param(test_case, id=test_case["name"]) for test_case in FLOODSUB_PROTOCOL_TEST_CASES
]
@ -195,9 +191,7 @@ async def perform_test_from_obj(obj, router_factory):
# Create neighbor if neighbor does not yet exist
if neighbor_id not in node_map:
await add_node(neighbor_id)
tasks_connect.append(
connect(node_map[start_node_id], node_map[neighbor_id])
)
tasks_connect.append(connect(node_map[start_node_id], node_map[neighbor_id]))
# Connect nodes and wait at least for 2 seconds
await asyncio.gather(*tasks_connect, asyncio.sleep(2))

View File

@ -35,9 +35,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
for source_num in adjacency_map:
target_nums = adjacency_map[source_num]
for target_num in target_nums:
await connect(
dummy_nodes[source_num].libp2p_node, dummy_nodes[target_num].libp2p_node
)
await connect(dummy_nodes[source_num].libp2p_node, dummy_nodes[target_num].libp2p_node)
# Allow time for network creation to take place
await asyncio.sleep(0.25)

View File

@ -10,8 +10,7 @@ from .utils import dense_connect, one_to_all_connect
@pytest.mark.parametrize(
"num_hosts, gossipsub_params",
((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),),
"num_hosts, gossipsub_params", ((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),)
)
@pytest.mark.asyncio
async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub):
@ -343,11 +342,7 @@ async def test_fanout_maintenance(hosts, pubsubs_gsub):
(
2,
GossipsubParams(
degree=1,
degree_low=0,
degree_high=2,
gossip_window=50,
gossip_history=100,
degree=1, degree_low=0, degree_high=2, gossip_window=50, gossip_history=100
),
),
),

View File

@ -20,7 +20,5 @@ async def test_gossipsub_initialize_with_floodsub_protocol():
async def test_gossipsub_run_with_floodsub_tests(test_case_obj):
await perform_test_from_obj(
test_case_obj,
functools.partial(
GossipsubFactory, degree=3, degree_low=2, degree_high=4, time_to_live=30
),
functools.partial(GossipsubFactory, degree=3, degree_low=2, degree_high=4, time_to_live=30),
)

View File

@ -128,9 +128,7 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch):
event_handle_rpc.set()
monkeypatch.setattr(pubsubs_fsub[0], "push_msg", mock_push_msg)
monkeypatch.setattr(
pubsubs_fsub[0], "handle_subscription", mock_handle_subscription
)
monkeypatch.setattr(pubsubs_fsub[0], "handle_subscription", mock_handle_subscription)
monkeypatch.setattr(pubsubs_fsub[0].router, "handle_rpc", mock_handle_rpc)
async def wait_for_event_occurring(event):
@ -149,9 +147,7 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch):
task = asyncio.ensure_future(pubsubs_fsub[0].continuously_read_stream(stream))
# Test: `push_msg` is called when publishing to a subscribed topic.
publish_subscribed_topic = rpc_pb2.RPC(
publish=[rpc_pb2.Message(topicIDs=[TESTING_TOPIC])]
)
publish_subscribed_topic = rpc_pb2.RPC(publish=[rpc_pb2.Message(topicIDs=[TESTING_TOPIC])])
await stream.write(publish_subscribed_topic.SerializeToString())
await wait_for_event_occurring(event_push_msg)
# Make sure the other events are not emitted.
@ -204,10 +200,7 @@ def test_handle_subscription(pubsubs_fsub):
peer_ids = [ID(b"\x12\x20" + i.to_bytes(32, "big")) for i in range(2)]
# Test: One peer is subscribed
pubsubs_fsub[0].handle_subscription(peer_ids[0], sub_msg_0)
assert (
len(pubsubs_fsub[0].peer_topics) == 1
and TESTING_TOPIC in pubsubs_fsub[0].peer_topics
)
assert len(pubsubs_fsub[0].peer_topics) == 1 and TESTING_TOPIC in pubsubs_fsub[0].peer_topics
assert len(pubsubs_fsub[0].peer_topics[TESTING_TOPIC]) == 1
assert peer_ids[0] in pubsubs_fsub[0].peer_topics[TESTING_TOPIC]
# Test: Another peer is subscribed
@ -233,10 +226,7 @@ def test_handle_subscription(pubsubs_fsub):
async def test_handle_talk(pubsubs_fsub):
sub = await pubsubs_fsub[0].subscribe(TESTING_TOPIC)
msg_0 = make_pubsub_msg(
origin_id=pubsubs_fsub[0].my_id,
topic_ids=[TESTING_TOPIC],
data=b"1234",
seqno=b"\x00" * 8,
origin_id=pubsubs_fsub[0].my_id, topic_ids=[TESTING_TOPIC], data=b"1234", seqno=b"\x00" * 8
)
await pubsubs_fsub[0].handle_talk(msg_0)
msg_1 = make_pubsub_msg(
@ -246,10 +236,7 @@ async def test_handle_talk(pubsubs_fsub):
seqno=b"\x11" * 8,
)
await pubsubs_fsub[0].handle_talk(msg_1)
assert (
len(pubsubs_fsub[0].my_topics) == 1
and sub == pubsubs_fsub[0].my_topics[TESTING_TOPIC]
)
assert len(pubsubs_fsub[0].my_topics) == 1 and sub == pubsubs_fsub[0].my_topics[TESTING_TOPIC]
assert sub.qsize() == 1
assert (await sub.get()) == msg_0
@ -283,9 +270,7 @@ async def test_publish(pubsubs_fsub, monkeypatch):
await pubsubs_fsub[0].publish(TESTING_TOPIC, TESTING_DATA)
assert len(msgs) == 2, "`push_msg` should be called every time `publish` is called"
assert (msg_forwarders[0] == msg_forwarders[1]) and (
msg_forwarders[1] == pubsubs_fsub[0].my_id
)
assert (msg_forwarders[0] == msg_forwarders[1]) and (msg_forwarders[1] == pubsubs_fsub[0].my_id)
assert msgs[0].seqno != msgs[1].seqno, "`seqno` should be different every time"

View File

@ -65,9 +65,7 @@ async def test_single_insecure_security_transport_succeeds():
def assertion_func(details):
assert details["id"] == "foo"
await perform_simple_test(
assertion_func, transports_for_initiator, transports_for_noninitiator
)
await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator)
@pytest.mark.asyncio
@ -78,9 +76,7 @@ async def test_single_simple_test_security_transport_succeeds():
def assertion_func(details):
assert details["key_phrase"] == "tacos"
await perform_simple_test(
assertion_func, transports_for_initiator, transports_for_noninitiator
)
await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator)
@pytest.mark.asyncio
@ -94,9 +90,7 @@ async def test_two_simple_test_security_transport_for_initiator_succeeds():
def assertion_func(details):
assert details["key_phrase"] == "shleep"
await perform_simple_test(
assertion_func, transports_for_initiator, transports_for_noninitiator
)
await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator)
@pytest.mark.asyncio
@ -110,9 +104,7 @@ async def test_two_simple_test_security_transport_for_noninitiator_succeeds():
def assertion_func(details):
assert details["key_phrase"] == "tacos"
await perform_simple_test(
assertion_func, transports_for_initiator, transports_for_noninitiator
)
await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator)
@pytest.mark.asyncio
@ -129,9 +121,7 @@ async def test_two_simple_test_security_transport_for_both_succeeds():
def assertion_func(details):
assert details["key_phrase"] == "b"
await perform_simple_test(
assertion_func, transports_for_initiator, transports_for_noninitiator
)
await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator)
@pytest.mark.asyncio
@ -174,6 +164,4 @@ async def test_default_insecure_security():
else:
assert details1 == details2
await perform_simple_test(
assertion_func, transports_for_initiator, transports_for_noninitiator
)
await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator)