py-libp2p/tests/pubsub/factories.py

58 lines
1.5 KiB
Python
Raw Normal View History

import factory
from libp2p import initialize_default_swarm
2019-08-15 18:36:50 -07:00
from libp2p.crypto.rsa import create_new_key_pair
from libp2p.host.basic_host import BasicHost
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.gossipsub import GossipSub
from libp2p.pubsub.pubsub import Pubsub
from tests.configs import LISTEN_MADDR
from tests.pubsub.configs import (
2019-08-02 18:36:58 -07:00
FLOODSUB_PROTOCOL_ID,
GOSSIPSUB_PARAMS,
GOSSIPSUB_PROTOCOL_ID,
)
def swarm_factory():
2019-08-15 18:36:50 -07:00
private_key = create_new_key_pair()
2019-08-02 18:36:58 -07:00
return initialize_default_swarm(private_key, transport_opt=[str(LISTEN_MADDR)])
class HostFactory(factory.Factory):
class Meta:
model = BasicHost
network = factory.LazyFunction(swarm_factory)
class FloodsubFactory(factory.Factory):
class Meta:
model = FloodSub
protocols = (FLOODSUB_PROTOCOL_ID,)
class GossipsubFactory(factory.Factory):
class Meta:
model = GossipSub
protocols = (GOSSIPSUB_PROTOCOL_ID,)
degree = GOSSIPSUB_PARAMS.degree
degree_low = GOSSIPSUB_PARAMS.degree_low
degree_high = GOSSIPSUB_PARAMS.degree_high
time_to_live = GOSSIPSUB_PARAMS.time_to_live
gossip_window = GOSSIPSUB_PARAMS.gossip_window
gossip_history = GOSSIPSUB_PARAMS.gossip_history
heartbeat_interval = GOSSIPSUB_PARAMS.heartbeat_interval
class PubsubFactory(factory.Factory):
class Meta:
model = Pubsub
host = factory.SubFactory(HostFactory)
router = None
my_id = factory.LazyAttribute(lambda obj: obj.host.get_id())
cache_size = None