From 3debd2c808bf9afa6b277c6052d9a39cca6c8459 Mon Sep 17 00:00:00 2001 From: Alex Stokes Date: Tue, 13 Aug 2019 14:36:42 -0700 Subject: [PATCH] Run `black` and `isort` w/ the new config --- examples/chat/chat.py | 17 +++++-- libp2p/host/basic_host.py | 8 +++- libp2p/host/host_interface.py | 8 +++- libp2p/kademlia/crawling.py | 4 +- libp2p/kademlia/kad_peerinfo.py | 14 ++++-- libp2p/kademlia/network.py | 12 +++-- libp2p/kademlia/protocol.py | 12 +++-- libp2p/network/network_interface.py | 8 +++- libp2p/network/swarm.py | 12 +++-- libp2p/peer/peerinfo.py | 4 +- libp2p/protocol_muxer/multiselect.py | 4 +- libp2p/protocol_muxer/multiselect_client.py | 4 +- .../multiselect_client_interface.py | 4 +- .../multiselect_muxer_interface.py | 4 +- libp2p/pubsub/floodsub.py | 4 +- libp2p/pubsub/gossipsub.py | 45 ++++++++++++++----- libp2p/pubsub/pubsub.py | 20 ++++++--- libp2p/security/security_multistream.py | 4 +- libp2p/security/simple_security.py | 8 +++- libp2p/stream_muxer/abc.py | 9 +++- libp2p/stream_muxer/mplex/mplex.py | 8 +++- libp2p/stream_muxer/mplex/mplex_stream.py | 12 ++++- libp2p/stream_muxer/mplex/utils.py | 4 +- libp2p/transport/tcp/tcp.py | 4 +- libp2p/transport/upgrader.py | 4 +- setup.py | 11 ++++- tests/libp2p/test_libp2p.py | 4 +- tests/libp2p/test_notify.py | 11 ++++- tests/peer/test_peerinfo.py | 7 ++- tests/protocol_muxer/test_protocol_muxer.py | 4 +- tests/pubsub/conftest.py | 4 +- .../floodsub_integration_test_settings.py | 12 +++-- tests/pubsub/test_dummyaccount_demo.py | 4 +- tests/pubsub/test_gossipsub.py | 9 +++- .../test_gossipsub_backward_compatibility.py | 4 +- tests/pubsub/test_pubsub.py | 30 ++++++++++--- tests/security/test_security_multistream.py | 24 +++++++--- 37 files changed, 273 insertions(+), 88 deletions(-) diff --git a/examples/chat/chat.py b/examples/chat/chat.py index aebe094..39258b5 100755 --- a/examples/chat/chat.py +++ b/examples/chat/chat.py @@ -82,14 +82,23 @@ def main() -> None: Then, run another host with 'python ./chat -p -d ', where is the multiaddress of the previous listener host. """ - example_maddr = "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" + example_maddr = ( + "/ip4/127.0.0.1/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q" + ) parser = argparse.ArgumentParser(description=description) parser.add_argument( - "--debug", action="store_true", help="generate the same node ID on every execution" + "--debug", + action="store_true", + help="generate the same node ID on every execution", ) - parser.add_argument("-p", "--port", default=8000, type=int, help="source port number") parser.add_argument( - "-d", "--destination", type=str, help=f"destination multiaddr string, e.g. {example_maddr}" + "-p", "--port", default=8000, type=int, help="source port number" + ) + parser.add_argument( + "-d", + "--destination", + type=str, + help=f"destination multiaddr string, e.g. {example_maddr}", ) parser.add_argument( "-l", diff --git a/libp2p/host/basic_host.py b/libp2p/host/basic_host.py index 9419c98..cdee9a0 100644 --- a/libp2p/host/basic_host.py +++ b/libp2p/host/basic_host.py @@ -66,7 +66,9 @@ class BasicHost(IHost): addrs.append(addr.encapsulate(p2p_part)) return addrs - def set_stream_handler(self, protocol_id: TProtocol, stream_handler: StreamHandlerFn) -> bool: + def set_stream_handler( + self, protocol_id: TProtocol, stream_handler: StreamHandlerFn + ) -> bool: """ set stream handler for host :param protocol_id: protocol id used on stream @@ -77,7 +79,9 @@ class BasicHost(IHost): # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on - async def new_stream(self, peer_id: ID, protocol_ids: Sequence[TProtocol]) -> INetStream: + async def new_stream( + self, peer_id: ID, protocol_ids: Sequence[TProtocol] + ) -> INetStream: """ :param peer_id: peer_id that host is connecting :param protocol_id: protocol id that stream runs on diff --git a/libp2p/host/host_interface.py b/libp2p/host/host_interface.py index 667a437..bcaefad 100644 --- a/libp2p/host/host_interface.py +++ b/libp2p/host/host_interface.py @@ -37,7 +37,9 @@ class IHost(ABC): """ @abstractmethod - def set_stream_handler(self, protocol_id: TProtocol, stream_handler: StreamHandlerFn) -> bool: + def set_stream_handler( + self, protocol_id: TProtocol, stream_handler: StreamHandlerFn + ) -> bool: """ set stream handler for host :param protocol_id: protocol id used on stream @@ -48,7 +50,9 @@ class IHost(ABC): # protocol_id can be a list of protocol_ids # stream will decide which protocol_id to run on @abstractmethod - async def new_stream(self, peer_id: ID, protocol_ids: Sequence[TProtocol]) -> INetStream: + async def new_stream( + self, peer_id: ID, protocol_ids: Sequence[TProtocol] + ) -> INetStream: """ :param peer_id: peer_id that host is connecting :param protocol_ids: protocol ids that stream can run on diff --git a/libp2p/kademlia/crawling.py b/libp2p/kademlia/crawling.py index 398df11..c649da0 100644 --- a/libp2p/kademlia/crawling.py +++ b/libp2p/kademlia/crawling.py @@ -114,7 +114,9 @@ class ValueSpiderCrawl(SpiderCrawl): """ value_counts = Counter(values) if len(value_counts) != 1: - log.warning("Got multiple values for key %i: %s", self.node.xor_id, str(values)) + log.warning( + "Got multiple values for key %i: %s", self.node.xor_id, str(values) + ) value = value_counts.most_common(1)[0][0] peer = self.nearest_without_value.popleft() diff --git a/libp2p/kademlia/kad_peerinfo.py b/libp2p/kademlia/kad_peerinfo.py index 2971650..9fab8dc 100644 --- a/libp2p/kademlia/kad_peerinfo.py +++ b/libp2p/kademlia/kad_peerinfo.py @@ -49,7 +49,9 @@ class KadPeerInfo(PeerInfo): def encode(self): return ( - str(self.peer_id_bytes) + "\n" + str("/ip4/" + str(self.ip) + "/udp/" + str(self.port)) + str(self.peer_id_bytes) + + "\n" + + str("/ip4/" + str(self.ip) + "/udp/" + str(self.port)) ) @@ -137,11 +139,17 @@ class KadPeerHeap: def create_kad_peerinfo(node_id_bytes=None, sender_ip=None, sender_port=None): - node_id = ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255))) + node_id = ( + ID(node_id_bytes) if node_id_bytes else ID(digest(random.getrandbits(255))) + ) peer_data = None if sender_ip and sender_port: peer_data = PeerData() - addr = [Multiaddr("/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port))] + addr = [ + Multiaddr( + "/" + P_IP + "/" + str(sender_ip) + "/" + P_UDP + "/" + str(sender_port) + ) + ] peer_data.add_addrs(addr) return KadPeerInfo(node_id, peer_data) diff --git a/libp2p/kademlia/network.py b/libp2p/kademlia/network.py index e55f96a..fcf4b9a 100644 --- a/libp2p/kademlia/network.py +++ b/libp2p/kademlia/network.py @@ -62,7 +62,9 @@ class KademliaServer: Provide interface="::" to accept ipv6 address """ loop = asyncio.get_event_loop() - listen = loop.create_datagram_endpoint(self._create_protocol, local_addr=(interface, port)) + listen = loop.create_datagram_endpoint( + self._create_protocol, local_addr=(interface, port) + ) log.info("Node %i listening on %s:%i", self.node.xor_id, interface, port) self.transport, self.protocol = await listen # finally, schedule refreshing table @@ -83,7 +85,9 @@ class KademliaServer: for node_id in self.protocol.get_refresh_ids(): node = create_kad_peerinfo(node_id) nearest = self.protocol.router.find_neighbors(node, self.alpha) - spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) + spider = NodeSpiderCrawl( + self.protocol, node, nearest, self.ksize, self.alpha + ) results.append(spider.find()) # do our crawling @@ -118,7 +122,9 @@ class KademliaServer: cos = list(map(self.bootstrap_node, addrs)) gathered = await asyncio.gather(*cos) nodes = [node for node in gathered if node is not None] - spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha) + spider = NodeSpiderCrawl( + self.protocol, self.node, nodes, self.ksize, self.alpha + ) return await spider.find() async def bootstrap_node(self, addr): diff --git a/libp2p/kademlia/protocol.py b/libp2p/kademlia/protocol.py index 9fa035b..8606fc2 100644 --- a/libp2p/kademlia/protocol.py +++ b/libp2p/kademlia/protocol.py @@ -50,7 +50,9 @@ class KademliaProtocol(RPCProtocol): source = create_kad_peerinfo(nodeid, sender[0], sender[1]) self.welcome_if_new(source) - log.debug("got a store request from %s, storing '%s'='%s'", sender, key.hex(), value) + log.debug( + "got a store request from %s, storing '%s'='%s'", sender, key.hex(), value + ) self.storage[key] = value return True @@ -80,7 +82,9 @@ class KademliaProtocol(RPCProtocol): we store a map of content_id to peer_id (non xor) """ if nodeid == provider_id: - log.info("adding provider %s for key %s in local table", provider_id, str(key)) + log.info( + "adding provider %s for key %s in local table", provider_id, str(key) + ) self.storage[key] = provider_id return True return False @@ -131,7 +135,9 @@ class KademliaProtocol(RPCProtocol): async def call_add_provider(self, node_to_ask, key, provider_id): address = (node_to_ask.ip, node_to_ask.port) - result = await self.add_provider(address, self.source_node.peer_id_bytes, key, provider_id) + result = await self.add_provider( + address, self.source_node.peer_id_bytes, key, provider_id + ) return self.handle_call_response(result, node_to_ask) diff --git a/libp2p/network/network_interface.py b/libp2p/network/network_interface.py index f1b53bc..83c3b20 100644 --- a/libp2p/network/network_interface.py +++ b/libp2p/network/network_interface.py @@ -38,7 +38,9 @@ class INetwork(ABC): """ @abstractmethod - def set_stream_handler(self, protocol_id: TProtocol, stream_handler: StreamHandlerFn) -> bool: + def set_stream_handler( + self, protocol_id: TProtocol, stream_handler: StreamHandlerFn + ) -> bool: """ :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance @@ -46,7 +48,9 @@ class INetwork(ABC): """ @abstractmethod - async def new_stream(self, peer_id: ID, protocol_ids: Sequence[TProtocol]) -> INetStream: + async def new_stream( + self, peer_id: ID, protocol_ids: Sequence[TProtocol] + ) -> INetStream: """ :param peer_id: peer_id of destination :param protocol_ids: available protocol ids to use for stream diff --git a/libp2p/network/swarm.py b/libp2p/network/swarm.py index 2fbc002..3353f18 100644 --- a/libp2p/network/swarm.py +++ b/libp2p/network/swarm.py @@ -68,7 +68,9 @@ class Swarm(INetwork): def get_peer_id(self) -> ID: return self.self_id - def set_stream_handler(self, protocol_id: TProtocol, stream_handler: StreamHandlerFn) -> bool: + def set_stream_handler( + self, protocol_id: TProtocol, stream_handler: StreamHandlerFn + ) -> bool: """ :param protocol_id: protocol id used on stream :param stream_handler: a stream handler instance @@ -121,7 +123,9 @@ class Swarm(INetwork): return muxed_conn - async def new_stream(self, peer_id: ID, protocol_ids: Sequence[TProtocol]) -> NetStream: + async def new_stream( + self, peer_id: ID, protocol_ids: Sequence[TProtocol] + ) -> NetStream: """ :param peer_id: peer_id of destination :param protocol_id: protocol id @@ -196,7 +200,9 @@ class Swarm(INetwork): # Per, https://discuss.libp2p.io/t/multistream-security/130, we first secure # the conn and then mux the conn - secured_conn = await self.upgrader.upgrade_security(raw_conn, peer_id, False) + secured_conn = await self.upgrader.upgrade_security( + raw_conn, peer_id, False + ) muxed_conn = self.upgrader.upgrade_connection( secured_conn, self.generic_protocol_handler, peer_id ) diff --git a/libp2p/peer/peerinfo.py b/libp2p/peer/peerinfo.py index e143fda..a2f0842 100644 --- a/libp2p/peer/peerinfo.py +++ b/libp2p/peer/peerinfo.py @@ -25,7 +25,9 @@ def info_from_p2p_addr(addr: multiaddr.Multiaddr) -> PeerInfo: parts = addr.split() if not parts: - raise InvalidAddrError(f"`parts`={parts} should at least have a protocol `P_P2P`") + raise InvalidAddrError( + f"`parts`={parts} should at least have a protocol `P_P2P`" + ) p2p_part = parts[-1] last_protocol_code = p2p_part.protocols()[0].code diff --git a/libp2p/protocol_muxer/multiselect.py b/libp2p/protocol_muxer/multiselect.py index 9a55ec3..a6b93ce 100644 --- a/libp2p/protocol_muxer/multiselect.py +++ b/libp2p/protocol_muxer/multiselect.py @@ -30,7 +30,9 @@ class Multiselect(IMultiselectMuxer): """ self.handlers[protocol] = handler - async def negotiate(self, stream: NegotiableTransport) -> Tuple[TProtocol, StreamHandlerFn]: + async def negotiate( + self, stream: NegotiableTransport + ) -> Tuple[TProtocol, StreamHandlerFn]: """ Negotiate performs protocol selection :param stream: stream to negotiate on diff --git a/libp2p/protocol_muxer/multiselect_client.py b/libp2p/protocol_muxer/multiselect_client.py index b9b3240..8a5b7f1 100644 --- a/libp2p/protocol_muxer/multiselect_client.py +++ b/libp2p/protocol_muxer/multiselect_client.py @@ -39,7 +39,9 @@ class MultiselectClient(IMultiselectClient): # Handshake succeeded if this point is reached - async def select_protocol_or_fail(self, protocol: TProtocol, stream: IMuxedStream) -> TProtocol: + async def select_protocol_or_fail( + self, protocol: TProtocol, stream: IMuxedStream + ) -> TProtocol: """ Send message to multiselect selecting protocol and fail if multiselect does not return same protocol diff --git a/libp2p/protocol_muxer/multiselect_client_interface.py b/libp2p/protocol_muxer/multiselect_client_interface.py index 9bf7da6..df5c3a8 100644 --- a/libp2p/protocol_muxer/multiselect_client_interface.py +++ b/libp2p/protocol_muxer/multiselect_client_interface.py @@ -12,7 +12,9 @@ class IMultiselectClient(ABC): """ @abstractmethod - async def select_protocol_or_fail(self, protocol: TProtocol, stream: IMuxedStream) -> TProtocol: + async def select_protocol_or_fail( + self, protocol: TProtocol, stream: IMuxedStream + ) -> TProtocol: """ Send message to multiselect selecting protocol and fail if multiselect does not return same protocol diff --git a/libp2p/protocol_muxer/multiselect_muxer_interface.py b/libp2p/protocol_muxer/multiselect_muxer_interface.py index 5444646..f2e1e89 100644 --- a/libp2p/protocol_muxer/multiselect_muxer_interface.py +++ b/libp2p/protocol_muxer/multiselect_muxer_interface.py @@ -22,7 +22,9 @@ class IMultiselectMuxer(ABC): """ @abstractmethod - async def negotiate(self, stream: NegotiableTransport) -> Tuple[TProtocol, StreamHandlerFn]: + async def negotiate( + self, stream: NegotiableTransport + ) -> Tuple[TProtocol, StreamHandlerFn]: """ Negotiate performs protocol selection :param stream: stream to negotiate on diff --git a/libp2p/pubsub/floodsub.py b/libp2p/pubsub/floodsub.py index c9b0010..d35b97b 100644 --- a/libp2p/pubsub/floodsub.py +++ b/libp2p/pubsub/floodsub.py @@ -67,7 +67,9 @@ class FloodSub(IPubsubRouter): """ peers_gen = self._get_peers_to_send( - pubsub_msg.topicIDs, msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id) + pubsub_msg.topicIDs, + msg_forwarder=msg_forwarder, + origin=ID(pubsub_msg.from_id), ) rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) for peer_id in peers_gen: diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index caaf58c..598e16b 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -157,7 +157,9 @@ class GossipSub(IPubsubRouter): self.mcache.put(pubsub_msg) peers_gen = self._get_peers_to_send( - pubsub_msg.topicIDs, msg_forwarder=msg_forwarder, origin=ID(pubsub_msg.from_id) + pubsub_msg.topicIDs, + msg_forwarder=msg_forwarder, + origin=ID(pubsub_msg.from_id), ) rpc_msg = rpc_pb2.RPC(publish=[pubsub_msg]) for peer_id in peers_gen: @@ -352,7 +354,9 @@ class GossipSub(IPubsubRouter): if num_fanout_peers_in_topic < self.degree: # Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] selected_peers = self._get_in_topic_gossipsub_peers_from_minus( - topic, self.degree - num_fanout_peers_in_topic, self.fanout[topic] + topic, + self.degree - num_fanout_peers_in_topic, + self.fanout[topic], ) # Add the peers to fanout[topic] self.fanout[topic].extend(selected_peers) @@ -371,7 +375,9 @@ class GossipSub(IPubsubRouter): for peer in peers_to_emit_ihave_to: # TODO: this line is a monster, can hopefully be simplified - if (topic not in self.mesh or (peer not in self.mesh[topic])) and ( + if ( + topic not in self.mesh or (peer not in self.mesh[topic]) + ) and ( topic not in self.fanout or (peer not in self.fanout[topic]) ): msg_id_strs = [str(msg_id) for msg_id in msg_ids] @@ -391,7 +397,10 @@ class GossipSub(IPubsubRouter): topic, self.degree, [] ) for peer in peers_to_emit_ihave_to: - if peer not in self.mesh[topic] and peer not in self.fanout[topic]: + if ( + peer not in self.mesh[topic] + and peer not in self.fanout[topic] + ): msg_id_strs = [str(msg) for msg in msg_ids] await self.emit_ihave(topic, msg_id_strs, peer) @@ -431,13 +440,19 @@ class GossipSub(IPubsubRouter): self, topic: str, num_to_select: int, minus: Sequence[ID] ) -> List[ID]: gossipsub_peers_in_topic = [ - peer_id for peer_id in self.pubsub.peer_topics[topic] if peer_id in self.peers_gossipsub + peer_id + for peer_id in self.pubsub.peer_topics[topic] + if peer_id in self.peers_gossipsub ] - return self.select_from_minus(num_to_select, gossipsub_peers_in_topic, list(minus)) + return self.select_from_minus( + num_to_select, gossipsub_peers_in_topic, list(minus) + ) # RPC handlers - async def handle_ihave(self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID) -> None: + async def handle_ihave( + self, ihave_msg: rpc_pb2.ControlIHave, sender_peer_id: ID + ) -> None: """ Checks the seen set and requests unknown messages with an IWANT message. """ @@ -461,7 +476,9 @@ class GossipSub(IPubsubRouter): if msg_ids_wanted: await self.emit_iwant(msg_ids_wanted, sender_peer_id) - async def handle_iwant(self, iwant_msg: rpc_pb2.ControlIWant, sender_peer_id: ID) -> None: + async def handle_iwant( + self, iwant_msg: rpc_pb2.ControlIWant, sender_peer_id: ID + ) -> None: """ Forwards all request messages that are present in mcache to the requesting peer. """ @@ -496,7 +513,9 @@ class GossipSub(IPubsubRouter): # 4) And write the packet to the stream await peer_stream.write(rpc_msg) - async def handle_graft(self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID) -> None: + async def handle_graft( + self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID + ) -> None: topic: str = graft_msg.topicID # Add peer to mesh for topic @@ -507,7 +526,9 @@ class GossipSub(IPubsubRouter): # Respond with PRUNE if not subscribed to the topic await self.emit_prune(topic, sender_peer_id) - async def handle_prune(self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID) -> None: + async def handle_prune( + self, prune_msg: rpc_pb2.ControlPrune, sender_peer_id: ID + ) -> None: topic: str = prune_msg.topicID # Remove peer from mesh for topic, if peer is in topic @@ -569,7 +590,9 @@ class GossipSub(IPubsubRouter): await self.emit_control_message(control_msg, to_peer) - async def emit_control_message(self, control_msg: rpc_pb2.ControlMessage, to_peer: ID) -> None: + async def emit_control_message( + self, control_msg: rpc_pb2.ControlMessage, to_peer: ID + ) -> None: # Add control message to packet packet: rpc_pb2.RPC = rpc_pb2.RPC() packet.control.CopyFrom(control_msg) diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 6600c25..a19c99a 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -138,7 +138,9 @@ class Pubsub: """ packet = rpc_pb2.RPC() for topic_id in self.my_topics: - packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]) + packet.subscriptions.extend( + [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] + ) return packet.SerializeToString() async def continuously_read_stream(self, stream: INetStream) -> None: @@ -207,7 +209,9 @@ class Pubsub: :param msg: the message published to the topic """ return tuple( - self.topic_validators[topic] for topic in msg.topicIDs if topic in self.topic_validators + self.topic_validators[topic] + for topic in msg.topicIDs + if topic in self.topic_validators ) async def stream_handler(self, stream: INetStream) -> None: @@ -315,7 +319,9 @@ class Pubsub: # Create subscribe message packet: rpc_pb2.RPC = rpc_pb2.RPC() - packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)]) + packet.subscriptions.extend( + [rpc_pb2.RPC.SubOpts(subscribe=True, topicid=topic_id)] + ) # Send out subscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -340,7 +346,9 @@ class Pubsub: # Create unsubscribe message packet: rpc_pb2.RPC = rpc_pb2.RPC() - packet.subscriptions.extend([rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)]) + packet.subscriptions.extend( + [rpc_pb2.RPC.SubOpts(subscribe=False, topicid=topic_id)] + ) # Send out unsubscribe message to all peers await self.message_all_peers(packet.SerializeToString()) @@ -391,7 +399,9 @@ class Pubsub: cast(Awaitable[bool], topic_validator.validator(msg_forwarder, msg)) ) else: - sync_topic_validators.append(cast(SyncValidatorFn, topic_validator.validator)) + sync_topic_validators.append( + cast(SyncValidatorFn, topic_validator.validator) + ) for validator in sync_topic_validators: if not validator(msg_forwarder, msg): diff --git a/libp2p/security/security_multistream.py b/libp2p/security/security_multistream.py index cc5b90c..b2fd2bd 100644 --- a/libp2p/security/security_multistream.py +++ b/libp2p/security/security_multistream.py @@ -73,7 +73,9 @@ class SecurityMultistream(ABC): return secure_conn - async def select_transport(self, conn: IRawConnection, initiator: bool) -> ISecureTransport: + async def select_transport( + self, conn: IRawConnection, initiator: bool + ) -> ISecureTransport: """ Select a transport that both us and the node on the other end of conn support and agree on diff --git a/libp2p/security/simple_security.py b/libp2p/security/simple_security.py index b06a549..4fb0a3c 100644 --- a/libp2p/security/simple_security.py +++ b/libp2p/security/simple_security.py @@ -25,7 +25,9 @@ class SimpleSecurityTransport(ISecureTransport): incoming = (await conn.read()).decode() if incoming != self.key_phrase: - raise Exception("Key phrase differed between nodes. Expected " + self.key_phrase) + raise Exception( + "Key phrase differed between nodes. Expected " + self.key_phrase + ) secure_conn = SimpleSecureConn(conn, self.key_phrase) return secure_conn @@ -44,7 +46,9 @@ class SimpleSecurityTransport(ISecureTransport): await asyncio.sleep(0) if incoming != self.key_phrase: - raise Exception("Key phrase differed between nodes. Expected " + self.key_phrase) + raise Exception( + "Key phrase differed between nodes. Expected " + self.key_phrase + ) secure_conn = SimpleSecureConn(conn, self.key_phrase) return secure_conn diff --git a/libp2p/stream_muxer/abc.py b/libp2p/stream_muxer/abc.py index 0b77933..0903633 100644 --- a/libp2p/stream_muxer/abc.py +++ b/libp2p/stream_muxer/abc.py @@ -22,7 +22,10 @@ class IMuxedConn(ABC): @abstractmethod def __init__( - self, conn: ISecureConn, generic_protocol_handler: "GenericProtocolHandlerFn", peer_id: ID + self, + conn: ISecureConn, + generic_protocol_handler: "GenericProtocolHandlerFn", + peer_id: ID, ) -> None: """ create a new muxed connection @@ -54,7 +57,9 @@ class IMuxedConn(ABC): """ @abstractmethod - async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> "IMuxedStream": + async def open_stream( + self, protocol_id: str, multi_addr: Multiaddr + ) -> "IMuxedStream": """ creates a new muxed_stream :param protocol_id: protocol_id of stream diff --git a/libp2p/stream_muxer/mplex/mplex.py b/libp2p/stream_muxer/mplex/mplex.py index 2fa3292..16d3613 100644 --- a/libp2p/stream_muxer/mplex/mplex.py +++ b/libp2p/stream_muxer/mplex/mplex.py @@ -90,7 +90,9 @@ class Mplex(IMuxedConn): # Stream not created yet return None - async def open_stream(self, protocol_id: str, multi_addr: Multiaddr) -> IMuxedStream: + async def open_stream( + self, protocol_id: str, multi_addr: Multiaddr + ) -> IMuxedStream: """ creates a new muxed_stream :param protocol_id: protocol_id of stream @@ -177,7 +179,9 @@ class Mplex(IMuxedConn): try: header = await decode_uvarint_from_stream(self.raw_conn.reader, timeout) length = await decode_uvarint_from_stream(self.raw_conn.reader, timeout) - message = await asyncio.wait_for(self.raw_conn.reader.read(length), timeout=timeout) + message = await asyncio.wait_for( + self.raw_conn.reader.read(length), timeout=timeout + ) except asyncio.TimeoutError: return None, None, None diff --git a/libp2p/stream_muxer/mplex/mplex_stream.py b/libp2p/stream_muxer/mplex/mplex_stream.py index e4627ec..7fc1361 100644 --- a/libp2p/stream_muxer/mplex/mplex_stream.py +++ b/libp2p/stream_muxer/mplex/mplex_stream.py @@ -47,7 +47,11 @@ class MplexStream(IMuxedStream): write to stream :return: number of bytes written """ - flag = HeaderTags.MessageInitiator if self.initiator else HeaderTags.MessageReceiver + flag = ( + HeaderTags.MessageInitiator + if self.initiator + else HeaderTags.MessageReceiver + ) return await self.mplex_conn.send_message(flag, data, self.stream_id) async def close(self) -> bool: @@ -89,7 +93,11 @@ class MplexStream(IMuxedStream): return True if not self.remote_closed: - flag = HeaderTags.ResetInitiator if self.initiator else HeaderTags.ResetInitiator + flag = ( + HeaderTags.ResetInitiator + if self.initiator + else HeaderTags.ResetInitiator + ) await self.mplex_conn.send_message(flag, None, self.stream_id) self.local_closed = True diff --git a/libp2p/stream_muxer/mplex/utils.py b/libp2p/stream_muxer/mplex/utils.py index 3546749..dba6465 100644 --- a/libp2p/stream_muxer/mplex/utils.py +++ b/libp2p/stream_muxer/mplex/utils.py @@ -31,7 +31,9 @@ def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]: return result, index + 1 -async def decode_uvarint_from_stream(reader: asyncio.StreamReader, timeout: float) -> int: +async def decode_uvarint_from_stream( + reader: asyncio.StreamReader, timeout: float +) -> int: shift = 0 result = 0 while True: diff --git a/libp2p/transport/tcp/tcp.py b/libp2p/transport/tcp/tcp.py index b400ec1..4378e1c 100644 --- a/libp2p/transport/tcp/tcp.py +++ b/libp2p/transport/tcp/tcp.py @@ -28,7 +28,9 @@ class TCPListener(IListener): :return: return True if successful """ self.server = await asyncio.start_server( - self.handler, maddr.value_for_protocol("ip4"), maddr.value_for_protocol("tcp") + self.handler, + maddr.value_for_protocol("ip4"), + maddr.value_for_protocol("tcp"), ) socket = self.server.sockets[0] self.multiaddrs.append(_multiaddr_from_socket(socket)) diff --git a/libp2p/transport/upgrader.py b/libp2p/transport/upgrader.py index 9b2ddba..28d4a51 100644 --- a/libp2p/transport/upgrader.py +++ b/libp2p/transport/upgrader.py @@ -47,7 +47,9 @@ class TransportUpgrader: @staticmethod def upgrade_connection( - conn: ISecureConn, generic_protocol_handler: GenericProtocolHandlerFn, peer_id: ID + conn: ISecureConn, + generic_protocol_handler: GenericProtocolHandlerFn, + peer_id: ID, ) -> Mplex: """ Upgrade raw connection to muxed connection diff --git a/setup.py b/setup.py index 1f5e445..96f48c8 100644 --- a/setup.py +++ b/setup.py @@ -11,11 +11,18 @@ extras_require = { "pytest-cov>=2.7.1,<3.0.0", "pytest-asyncio>=0.10.0,<1.0.0", ], - "lint": ["mypy>=0.701,<1.0", "black==19.3b0", "isort==4.3.21", "flake8>=3.7.7,<4.0.0"], + "lint": [ + "mypy>=0.701,<1.0", + "black==19.3b0", + "isort==4.3.21", + "flake8>=3.7.7,<4.0.0", + ], "dev": ["tox>=3.13.2,<4.0.0"], } -extras_require["dev"] = extras_require["test"] + extras_require["lint"] + extras_require["dev"] +extras_require["dev"] = ( + extras_require["test"] + extras_require["lint"] + extras_require["dev"] +) setuptools.setup( diff --git a/tests/libp2p/test_libp2p.py b/tests/libp2p/test_libp2p.py index c945e2f..bc58a8c 100644 --- a/tests/libp2p/test_libp2p.py +++ b/tests/libp2p/test_libp2p.py @@ -114,7 +114,9 @@ async def test_multiple_streams(): response_a = (await stream_a.read()).decode() response_b = (await stream_b.read()).decode() - assert response_a == ("ack_b:" + a_message) and response_b == ("ack_a:" + b_message) + assert response_a == ("ack_b:" + a_message) and response_b == ( + "ack_a:" + b_message + ) # Success, terminate pending tasks. await cleanup() diff --git a/tests/libp2p/test_notify.py b/tests/libp2p/test_notify.py index b18f4cf..7dd3ccb 100644 --- a/tests/libp2p/test_notify.py +++ b/tests/libp2p/test_notify.py @@ -15,7 +15,11 @@ import pytest from libp2p import initialize_default_swarm, new_node from libp2p.host.basic_host import BasicHost from libp2p.network.notifee_interface import INotifee -from tests.utils import cleanup, echo_stream_handler, perform_two_host_set_up_custom_handler +from tests.utils import ( + cleanup, + echo_stream_handler, + perform_two_host_set_up_custom_handler, +) class MyNotifee(INotifee): @@ -126,7 +130,10 @@ async def test_one_notifier_on_two_nodes(): # Ensure the connected and opened_stream events were hit in Notifee obj # and that the stream passed into opened_stream matches the stream created on # node_b - assert events_b == [["connectedb", stream.mplex_conn], ["opened_streamb", stream]] + assert events_b == [ + ["connectedb", stream.mplex_conn], + ["opened_streamb", stream], + ] while True: read_string = (await stream.read()).decode() diff --git a/tests/peer/test_peerinfo.py b/tests/peer/test_peerinfo.py index fb58298..156305c 100644 --- a/tests/peer/test_peerinfo.py +++ b/tests/peer/test_peerinfo.py @@ -8,9 +8,7 @@ from libp2p.peer.peerdata import PeerData from libp2p.peer.peerinfo import InvalidAddrError, PeerInfo, info_from_p2p_addr ALPHABETS = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" -VALID_MULTI_ADDR_STR = ( - "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" -) # noqa: E501 +VALID_MULTI_ADDR_STR = "/ip4/127.0.0.1/tcp/8000/p2p/3YgLAeMKSAPcGqZkAt8mREqhQXmJT8SN8VCMN4T6ih4GNX9wvK8mWJnWZ1qA2mLdCQ" # noqa: E501 def test_init_(): @@ -39,7 +37,8 @@ def test_init_no_value(): pytest.param(random.randint(0, 255), id="random integer"), pytest.param(multiaddr.Multiaddr("/"), id="empty multiaddr"), pytest.param( - multiaddr.Multiaddr("/ip4/127.0.0.1"), id="multiaddr without peer_id(p2p protocol)" + multiaddr.Multiaddr("/ip4/127.0.0.1"), + id="multiaddr without peer_id(p2p protocol)", ), ), ) diff --git a/tests/protocol_muxer/test_protocol_muxer.py b/tests/protocol_muxer/test_protocol_muxer.py index e26cbb9..775c460 100644 --- a/tests/protocol_muxer/test_protocol_muxer.py +++ b/tests/protocol_muxer/test_protocol_muxer.py @@ -48,7 +48,9 @@ async def perform_simple_test( @pytest.mark.asyncio async def test_single_protocol_succeeds(): expected_selected_protocol = "/echo/1.0.0" - await perform_simple_test(expected_selected_protocol, ["/echo/1.0.0"], ["/echo/1.0.0"]) + await perform_simple_test( + expected_selected_protocol, ["/echo/1.0.0"], ["/echo/1.0.0"] + ) @pytest.mark.asyncio diff --git a/tests/pubsub/conftest.py b/tests/pubsub/conftest.py index a5e6bd5..3c4a34b 100644 --- a/tests/pubsub/conftest.py +++ b/tests/pubsub/conftest.py @@ -16,7 +16,9 @@ def num_hosts(): @pytest.fixture async def hosts(num_hosts): _hosts = HostFactory.create_batch(num_hosts) - await asyncio.gather(*[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts]) + await asyncio.gather( + *[_host.get_network().listen(LISTEN_MADDR) for _host in _hosts] + ) yield _hosts # Clean up listeners = [] diff --git a/tests/pubsub/floodsub_integration_test_settings.py b/tests/pubsub/floodsub_integration_test_settings.py index 736e725..09e5c30 100644 --- a/tests/pubsub/floodsub_integration_test_settings.py +++ b/tests/pubsub/floodsub_integration_test_settings.py @@ -102,7 +102,10 @@ FLOODSUB_PROTOCOL_TEST_CASES = [ "3": ["1", "2", "4"], "4": ["1", "2", "3"], }, - "topic_map": {"astrophysics": ["1", "2", "3", "4"], "school": ["1", "2", "3", "4"]}, + "topic_map": { + "astrophysics": ["1", "2", "3", "4"], + "school": ["1", "2", "3", "4"], + }, "messages": [ {"topics": ["astrophysics"], "data": b"e=mc^2", "node_id": "1"}, {"topics": ["school"], "data": b"foobar", "node_id": "2"}, @@ -134,7 +137,8 @@ FLOODSUB_PROTOCOL_TEST_CASES = [ ] floodsub_protocol_pytest_params = [ - pytest.param(test_case, id=test_case["name"]) for test_case in FLOODSUB_PROTOCOL_TEST_CASES + pytest.param(test_case, id=test_case["name"]) + for test_case in FLOODSUB_PROTOCOL_TEST_CASES ] @@ -191,7 +195,9 @@ async def perform_test_from_obj(obj, router_factory): # Create neighbor if neighbor does not yet exist if neighbor_id not in node_map: await add_node(neighbor_id) - tasks_connect.append(connect(node_map[start_node_id], node_map[neighbor_id])) + tasks_connect.append( + connect(node_map[start_node_id], node_map[neighbor_id]) + ) # Connect nodes and wait at least for 2 seconds await asyncio.gather(*tasks_connect, asyncio.sleep(2)) diff --git a/tests/pubsub/test_dummyaccount_demo.py b/tests/pubsub/test_dummyaccount_demo.py index c1228f2..6a604e5 100644 --- a/tests/pubsub/test_dummyaccount_demo.py +++ b/tests/pubsub/test_dummyaccount_demo.py @@ -35,7 +35,9 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func): for source_num in adjacency_map: target_nums = adjacency_map[source_num] for target_num in target_nums: - await connect(dummy_nodes[source_num].libp2p_node, dummy_nodes[target_num].libp2p_node) + await connect( + dummy_nodes[source_num].libp2p_node, dummy_nodes[target_num].libp2p_node + ) # Allow time for network creation to take place await asyncio.sleep(0.25) diff --git a/tests/pubsub/test_gossipsub.py b/tests/pubsub/test_gossipsub.py index e18d679..83ab786 100644 --- a/tests/pubsub/test_gossipsub.py +++ b/tests/pubsub/test_gossipsub.py @@ -10,7 +10,8 @@ from .utils import dense_connect, one_to_all_connect @pytest.mark.parametrize( - "num_hosts, gossipsub_params", ((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),) + "num_hosts, gossipsub_params", + ((4, GossipsubParams(degree=4, degree_low=3, degree_high=5)),), ) @pytest.mark.asyncio async def test_join(num_hosts, hosts, gossipsubs, pubsubs_gsub): @@ -342,7 +343,11 @@ async def test_fanout_maintenance(hosts, pubsubs_gsub): ( 2, GossipsubParams( - degree=1, degree_low=0, degree_high=2, gossip_window=50, gossip_history=100 + degree=1, + degree_low=0, + degree_high=2, + gossip_window=50, + gossip_history=100, ), ), ), diff --git a/tests/pubsub/test_gossipsub_backward_compatibility.py b/tests/pubsub/test_gossipsub_backward_compatibility.py index ff279b7..05b6c40 100644 --- a/tests/pubsub/test_gossipsub_backward_compatibility.py +++ b/tests/pubsub/test_gossipsub_backward_compatibility.py @@ -20,5 +20,7 @@ async def test_gossipsub_initialize_with_floodsub_protocol(): async def test_gossipsub_run_with_floodsub_tests(test_case_obj): await perform_test_from_obj( test_case_obj, - functools.partial(GossipsubFactory, degree=3, degree_low=2, degree_high=4, time_to_live=30), + functools.partial( + GossipsubFactory, degree=3, degree_low=2, degree_high=4, time_to_live=30 + ), ) diff --git a/tests/pubsub/test_pubsub.py b/tests/pubsub/test_pubsub.py index 170b72b..7a9ff3e 100644 --- a/tests/pubsub/test_pubsub.py +++ b/tests/pubsub/test_pubsub.py @@ -184,7 +184,8 @@ async def test_get_msg_validators(pubsubs_fsub): @pytest.mark.parametrize("num_hosts", (1,)) @pytest.mark.parametrize( - "is_topic_1_val_passed, is_topic_2_val_passed", ((False, True), (True, False), (True, True)) + "is_topic_1_val_passed, is_topic_2_val_passed", + ((False, True), (True, False), (True, True)), ) @pytest.mark.asyncio async def test_validate_msg(pubsubs_fsub, is_topic_1_val_passed, is_topic_2_val_passed): @@ -271,7 +272,9 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): event_handle_rpc.set() monkeypatch.setattr(pubsubs_fsub[0], "push_msg", mock_push_msg) - monkeypatch.setattr(pubsubs_fsub[0], "handle_subscription", mock_handle_subscription) + monkeypatch.setattr( + pubsubs_fsub[0], "handle_subscription", mock_handle_subscription + ) monkeypatch.setattr(pubsubs_fsub[0].router, "handle_rpc", mock_handle_rpc) async def wait_for_event_occurring(event): @@ -290,7 +293,9 @@ async def test_continuously_read_stream(pubsubs_fsub, monkeypatch): task = asyncio.ensure_future(pubsubs_fsub[0].continuously_read_stream(stream)) # Test: `push_msg` is called when publishing to a subscribed topic. - publish_subscribed_topic = rpc_pb2.RPC(publish=[rpc_pb2.Message(topicIDs=[TESTING_TOPIC])]) + publish_subscribed_topic = rpc_pb2.RPC( + publish=[rpc_pb2.Message(topicIDs=[TESTING_TOPIC])] + ) await stream.write(publish_subscribed_topic.SerializeToString()) await wait_for_event_occurring(event_push_msg) # Make sure the other events are not emitted. @@ -343,7 +348,10 @@ def test_handle_subscription(pubsubs_fsub): peer_ids = [ID(b"\x12\x20" + i.to_bytes(32, "big")) for i in range(2)] # Test: One peer is subscribed pubsubs_fsub[0].handle_subscription(peer_ids[0], sub_msg_0) - assert len(pubsubs_fsub[0].peer_topics) == 1 and TESTING_TOPIC in pubsubs_fsub[0].peer_topics + assert ( + len(pubsubs_fsub[0].peer_topics) == 1 + and TESTING_TOPIC in pubsubs_fsub[0].peer_topics + ) assert len(pubsubs_fsub[0].peer_topics[TESTING_TOPIC]) == 1 assert peer_ids[0] in pubsubs_fsub[0].peer_topics[TESTING_TOPIC] # Test: Another peer is subscribed @@ -369,7 +377,10 @@ def test_handle_subscription(pubsubs_fsub): async def test_handle_talk(pubsubs_fsub): sub = await pubsubs_fsub[0].subscribe(TESTING_TOPIC) msg_0 = make_pubsub_msg( - origin_id=pubsubs_fsub[0].my_id, topic_ids=[TESTING_TOPIC], data=b"1234", seqno=b"\x00" * 8 + origin_id=pubsubs_fsub[0].my_id, + topic_ids=[TESTING_TOPIC], + data=b"1234", + seqno=b"\x00" * 8, ) await pubsubs_fsub[0].handle_talk(msg_0) msg_1 = make_pubsub_msg( @@ -379,7 +390,10 @@ async def test_handle_talk(pubsubs_fsub): seqno=b"\x11" * 8, ) await pubsubs_fsub[0].handle_talk(msg_1) - assert len(pubsubs_fsub[0].my_topics) == 1 and sub == pubsubs_fsub[0].my_topics[TESTING_TOPIC] + assert ( + len(pubsubs_fsub[0].my_topics) == 1 + and sub == pubsubs_fsub[0].my_topics[TESTING_TOPIC] + ) assert sub.qsize() == 1 assert (await sub.get()) == msg_0 @@ -413,7 +427,9 @@ async def test_publish(pubsubs_fsub, monkeypatch): await pubsubs_fsub[0].publish(TESTING_TOPIC, TESTING_DATA) assert len(msgs) == 2, "`push_msg` should be called every time `publish` is called" - assert (msg_forwarders[0] == msg_forwarders[1]) and (msg_forwarders[1] == pubsubs_fsub[0].my_id) + assert (msg_forwarders[0] == msg_forwarders[1]) and ( + msg_forwarders[1] == pubsubs_fsub[0].my_id + ) assert msgs[0].seqno != msgs[1].seqno, "`seqno` should be different every time" diff --git a/tests/security/test_security_multistream.py b/tests/security/test_security_multistream.py index e61ddec..070f29d 100644 --- a/tests/security/test_security_multistream.py +++ b/tests/security/test_security_multistream.py @@ -65,7 +65,9 @@ async def test_single_insecure_security_transport_succeeds(): def assertion_func(details): assert details["id"] == "foo" - await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) @pytest.mark.asyncio @@ -76,7 +78,9 @@ async def test_single_simple_test_security_transport_succeeds(): def assertion_func(details): assert details["key_phrase"] == "tacos" - await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) @pytest.mark.asyncio @@ -90,7 +94,9 @@ async def test_two_simple_test_security_transport_for_initiator_succeeds(): def assertion_func(details): assert details["key_phrase"] == "shleep" - await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) @pytest.mark.asyncio @@ -104,7 +110,9 @@ async def test_two_simple_test_security_transport_for_noninitiator_succeeds(): def assertion_func(details): assert details["key_phrase"] == "tacos" - await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) @pytest.mark.asyncio @@ -121,7 +129,9 @@ async def test_two_simple_test_security_transport_for_both_succeeds(): def assertion_func(details): assert details["key_phrase"] == "b" - await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + ) @pytest.mark.asyncio @@ -164,4 +174,6 @@ async def test_default_insecure_security(): else: assert details1 == details2 - await perform_simple_test(assertion_func, transports_for_initiator, transports_for_noninitiator) + await perform_simple_test( + assertion_func, transports_for_initiator, transports_for_noninitiator + )