Source code for dask_cloudprovider.hetzner.vserver

import asyncio
import dask

from dask_cloudprovider.generic.vmcluster import (
    VMCluster,
    VMInterface,
    SchedulerMixin,
    WorkerMixin,
)

try:
    import hcloud
except ImportError as e:
    msg = (
        "Dask Cloud Provider Hetzner requirements are not installed.\n\n"
        "Please pip install as follows:\n\n"
        '  pip install "dask-cloudprovider[hcloud]" --upgrade  # or python -m pip install'
    )
    raise ImportError(msg) from e

from hcloud.images.domain import Image
from hcloud.server_types.domain import ServerType
from hcloud.actions.domain import Action


class VServer(VMInterface):
    def __init__(
        self,
        cluster: str,
        config,
        env_vars: dict = None,
        bootstrap=None,
        extra_bootstrap=None,
        docker_image: str = None,
        image: str = None,
        location: str = None,
        server_type: str = None,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.cluster = cluster
        self.config = config
        self.location = location
        self.bootstrap = bootstrap
        self.extra_bootstrap = extra_bootstrap
        self.env_vars = env_vars
        self.client = hcloud.Client(self.config.get("token"))
        self.server_type = ServerType(server_type)
        self.image = Image(name=image)
        self.docker_image = docker_image

    async def create_vm(self):
        await self.cluster.call_async(
            self.client.servers.create,
            server_type=self.server_type,
            image=self.image,
            name=self.name,
            user_data=self.cluster.render_process_cloud_init(self),
        )

        self.server = self.client.servers.get_by_name(self.name)
        for action in self.server.get_actions():
            while action.status != Action.STATUS_SUCCESS:
                await self.cluster.call_async(action.reload)
                await asyncio.sleep(0.1)
        self.cluster._log(f"Created Hetzner vServer {self.name}")

        return self.server.public_net.ipv4.ip

    async def destroy_vm(self):
        await self.cluster.call_async(self.client.servers.delete, server=self.server)
        self.cluster._log(f"Terminated vServer {self.name}")


class HetznerScheduler(SchedulerMixin, VServer):
    """Scheduler running on a Hetzner server."""


class HetznerWorker(WorkerMixin, VServer):
    """Worker running on a Hetzner server."""


[docs]class HetznerCluster(VMCluster): """Cluster running on Hetzner cloud vServers. VMs in Hetzner are referred to as vServers. This cluster manager constructs a Dask cluster running on VMs. When configuring your cluster you may find it useful to install the ``hcloud`` tool for querying the Hetzner API for available options. https://github.com/hetznercloud/cli Parameters ---------- image: str The image to use for the host OS. This should be a Ubuntu variant. You can list available images with ``hcloud image list|grep Ubuntu``. location: str The Hetzner location to launch you cluster in. A full list can be obtained with ``hcloud location list``. server_type: str The VM server type. You can get a full list with ``hcloud server-type list``. The default is ``cx11`` which is vServer with 2GB RAM and 1 vCPU. n_workers: int Number of workers to initialise the cluster with. Defaults to ``0``. worker_module: str The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker`` worker_options: dict Params to be passed to the worker class. See :class:`distributed.worker.Worker` for default worker class. If you set ``worker_module`` then refer to the docstring for the custom worker class. scheduler_options: dict Params to be passed to the scheduler class. See :class:`distributed.scheduler.Scheduler`. env_vars: dict Environment variables to be passed to the worker. extra_bootstrap: list[str] (optional) Extra commands to be run during the bootstrap phase. Example -------- >>> from dask_cloudprovider.hetzner import HetznerCluster >>> cluster = HetznerCluster(n_workers=1) >>> from dask.distributed import Client >>> client = Client(cluster) >>> import dask.array as da >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) >>> arr.mean().compute() >>> client.close() >>> cluster.close() """ def __init__( self, bootstrap: str = None, image: str = None, location: str = None, server_type: str = None, docker_image: str = None, **kwargs, ): self.config = dask.config.get("cloudprovider.hetzner", {}) self.scheduler_class = HetznerScheduler self.worker_class = HetznerWorker self.image = dask.config.get("cloudprovider.hetzner.image", override_with=image) self.docker_image = dask.config.get( "cloudprovider.hetzner.docker_image", override_with=docker_image ) self.location = dask.config.get( "cloudprovider.hetzner.location", override_with=location ) self.server_type = dask.config.get( "cloudprovider.hetzner.server_type", override_with=server_type ) self.bootstrap = dask.config.get( "cloudprovider.hetzner.bootstrap", override_with=bootstrap ) self.options = { "bootstrap": self.bootstrap, "cluster": self, "config": self.config, "docker_image": self.docker_image, "image": self.image, "location": self.location, "server_type": self.server_type, } self.scheduler_options = {**self.options} self.worker_options = {**self.options} super().__init__(**kwargs)