Source code for dask_cloudprovider.digitalocean.droplet

import asyncio

import dask
from dask_cloudprovider.generic.vmcluster import (
    VMCluster,
    VMInterface,
    SchedulerMixin,
    WorkerMixin,
)

try:
    import digitalocean
except ImportError as e:
    msg = (
        "Dask Cloud Provider Digital Ocean requirements are not installed.\n\n"
        "Please pip install as follows:\n\n"
        '  pip install "dask-cloudprovider[digitalocean]" --upgrade  # or python -m pip install'
    )
    raise ImportError(msg) from e


class Droplet(VMInterface):
    def __init__(
        self,
        cluster: str,
        config,
        *args,
        region: str = None,
        size: str = None,
        image: str = None,
        docker_image=None,
        env_vars=None,
        extra_bootstrap=None,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.droplet = None
        self.cluster = cluster
        self.config = config
        self.region = region
        self.size = size
        self.image = image
        self.gpu_instance = False
        self.bootstrap = True
        self.extra_bootstrap = extra_bootstrap
        self.docker_image = docker_image
        self.env_vars = env_vars

    async def create_vm(self):
        self.droplet = digitalocean.Droplet(
            token=self.config.get("token"),
            name=self.name,
            region=self.region,
            image=self.image,
            size_slug=self.size,
            backups=False,
            user_data=self.cluster.render_process_cloud_init(self),
        )
        await self.cluster.call_async(self.droplet.create)
        for action in self.droplet.get_actions():
            while action.status != "completed":
                action.load()
                await asyncio.sleep(0.1)
        while self.droplet.ip_address is None:
            await self.cluster.call_async(self.droplet.load)
            await asyncio.sleep(0.1)
        self.cluster._log(f"Created droplet {self.name}")

        return self.droplet.ip_address, None

    async def destroy_vm(self):
        await self.cluster.call_async(self.droplet.destroy)
        self.cluster._log(f"Terminated droplet {self.name}")


class DropletScheduler(SchedulerMixin, Droplet):
    """Scheduler running on a DigitalOcean Droplet."""


class DropletWorker(WorkerMixin, Droplet):
    """Worker running on a DigitalOcean Droplet."""


[docs]class DropletCluster(VMCluster): """Cluster running on Digital Ocean droplets. VMs in DigitalOcean (DO) are referred to as droplets. This cluster manager constructs a Dask cluster running on VMs. When configuring your cluster you may find it useful to install the ``doctl`` tool for querying the DO API for available options. https://www.digitalocean.com/docs/apis-clis/doctl/how-to/install/ Parameters ---------- region: str The DO region to launch you cluster in. A full list can be obtained with ``doctl compute region list``. size: str The VM size slug. You can get a full list with ``doctl compute size list``. The default is ``s-1vcpu-1gb`` which is 1GB RAM and 1 vCPU image: str The image ID to use for the host OS. This should be a Ubuntu variant. You can list available images with ``doctl compute image list --public | grep ubuntu.*x64``. worker_module: str The Dask worker module to start on worker VMs. n_workers: int Number of workers to initialise the cluster with. Defaults to ``0``. worker_module: str The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker`` worker_options: dict Params to be passed to the worker class. See :class:`distributed.worker.Worker` for default worker class. If you set ``worker_module`` then refer to the docstring for the custom worker class. scheduler_options: dict Params to be passed to the scheduler class. See :class:`distributed.scheduler.Scheduler`. docker_image: string (optional) The Docker image to run on all instances. This image must have a valid Python environment and have ``dask`` installed in order for the ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python environment matches your local environment where ``EC2Cluster`` is being created from. For GPU instance types the Docker image much have NVIDIA drivers and ``dask-cuda`` installed. By default the ``daskdev/dask:latest`` image will be used. docker_args: string (optional) Extra command line arguments to pass to Docker. extra_bootstrap: list[str] (optional) Extra commands to be run during the bootstrap phase. env_vars: dict (optional) Environment variables to be passed to the worker. silence_logs: bool Whether or not we should silence logging when setting up the cluster. asynchronous: bool If this is intended to be used directly within an event loop with async/await security : Security or bool, optional Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically. Default is ``True``. debug: bool, optional More information will be printed when constructing clusters to enable debugging. Examples -------- Create the cluster. >>> from dask_cloudprovider.digitalocean import DropletCluster >>> cluster = DropletCluster(n_workers=1) Creating scheduler instance Created droplet dask-38b817c1-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance Created droplet dask-38b817c1-worker-dc95260d Connect a client. >>> from dask.distributed import Client >>> client = Client(cluster) Do some work. >>> import dask.array as da >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) >>> arr.mean().compute() 0.5001550986751964 Close the cluster >>> client.close() >>> cluster.close() Terminated droplet dask-38b817c1-worker-dc95260d Terminated droplet dask-38b817c1-scheduler You can also do this all in one go with context managers to ensure the cluster is created and cleaned up. >>> with DropletCluster(n_workers=1) as cluster: ... with Client(cluster) as client: ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) Creating scheduler instance Created droplet dask-48efe585-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance Created droplet dask-48efe585-worker-5181aaf1 0.5000558682356162 Terminated droplet dask-48efe585-worker-5181aaf1 Terminated droplet dask-48efe585-scheduler """ def __init__( self, region: str = None, size: str = None, image: str = None, debug: bool = False, **kwargs, ): self.config = dask.config.get("cloudprovider.digitalocean", {}) self.scheduler_class = DropletScheduler self.worker_class = DropletWorker self.debug = debug self.options = { "cluster": self, "config": self.config, "region": region if region is not None else self.config.get("region"), "size": size if size is not None else self.config.get("size"), "image": image if image is not None else self.config.get("image"), } self.scheduler_options = {**self.options} self.worker_options = {**self.options} super().__init__(debug=debug, **kwargs)