Google Cloud Platform

GCPCluster([projectid, zone, network, ...])

Cluster running on GCP VM Instances.



In order to create clusters on GCP you need to set your authentication credentials. You can do this via the gcloud command line tool.

$ gcloud auth login

Alternatively you can use a service account which provides credentials in a JSON file. You must set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path to the JSON file.

$ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json

Project ID

To use Dask Cloudprovider with GCP you must also configure your Project ID. Generally when creating a GCP account you will create a default project. This can be found at the top of the GCP dashboard.

Your Project ID must be added to your Dask config file.

# ~/.config/dask/cloudprovider.yaml
    projectid: "YOUR PROJECT ID"

Or via an environment variable.


Google Cloud VMs

class dask_cloudprovider.gcp.GCPCluster(projectid=None, zone=None, network=None, network_projectid=None, machine_type=None, on_host_maintenance=None, source_image=None, docker_image=None, ngpus=None, gpu_type=None, filesystem_size=None, disk_type=None, auto_shutdown=None, bootstrap=True, preemptible=None, debug=False, instance_labels=None, **kwargs)[source]

Cluster running on GCP VM Instances.

This cluster manager constructs a Dask cluster running on Google Cloud Platform 67VMs.

When configuring your cluster you may find it useful to install the gcloud tool for querying the GCP API for available options.

projectid: str

Your GCP project ID. This must be set either here or in your Dask config.

See the GCP docs page for more info.

zone: str

The GCP zone to launch you cluster in. A full list can be obtained with gcloud compute zones list.

network: str

The GCP VPC network/subnetwork to use. The default is default. If using firewall rules, please ensure the follwing accesses are configured:

  • egress on all ports for downloading docker images and general data access

  • ingress on all ports for internal communication of workers

  • ingress on 8786-8787 for external accessibility of the dashboard/scheduler

  • (optional) ingress on 22 for ssh access

network_projectid: str

The project id of the GCP network. This defaults to the projectid. There may be cases (i.e. Shared VPC) when network configurations from a different GCP project are used.

machine_type: str

The VM machine_type. You can get a full list with gcloud compute machine-types list. The default is n1-standard-1 which is 3.75GB RAM and 1 vCPU

source_image: str

The OS image to use for the VM. Dask Cloudprovider will boostrap Ubuntu based images automatically. Other images require Docker and for GPUs the NVIDIA Drivers and NVIDIA Docker.

A list of available images can be found with gcloud compute images list

Valid values are:
  • The short image name provided it is in projectid.

  • The full image name projects/<projectid>/global/images/<source_image>.

  • The full image URI such as those listed in gcloud compute images list --uri.

The default is projects/ubuntu-os-cloud/global/images/ubuntu-minimal-1804-bionic-v20201014.

docker_image: string (optional)

The Docker image to run on all instances.

This image must have a valid Python environment and have dask installed in order for the dask-scheduler and dask-worker commands to be available. It is recommended the Python environment matches your local environment where EC2Cluster is being created from.

For GPU instance types the Docker image much have NVIDIA drivers and dask-cuda installed.

By default the daskdev/dask:latest image will be used.

docker_args: string (optional)

Extra command line arguments to pass to Docker.

extra_bootstrap: list[str] (optional)

Extra commands to be run during the bootstrap phase.

ngpus: int (optional)

The number of GPUs to atatch to the instance. Default is 0.

gpu_type: str (optional)

The name of the GPU to use. This must be set if ngpus>0. You can see a list of GPUs available in each zone with gcloud compute accelerator-types list.

filesystem_size: int (optional)

The VM filesystem size in GB. Defaults to 50.

disk_type: str (optional)

Type of disk to use. Default is pd-standard. You can see a list of disks available in each zone with gcloud compute disk-types list.

on_host_maintenance: str (optional)

The Host Maintenance GCP option. Defaults to TERMINATE.

n_workers: int (optional)

Number of workers to initialise the cluster with. Defaults to 0.

bootstrap: bool (optional)

Install Docker and NVIDIA drivers if ngpus>0. Set to False if you are using a custom source_image which already has these requirements. Defaults to True.

worker_class: str

The Python class to run for the worker. Defaults to dask.distributed.Nanny

worker_options: dict (optional)

Params to be passed to the worker class. See distributed.worker.Worker for default worker class. If you set worker_class then refer to the docstring for the custom worker class.

env_vars: dict (optional)

Environment variables to be passed to the worker.

scheduler_options: dict (optional)

Params to be passed to the scheduler class. See distributed.scheduler.Scheduler.

silence_logs: bool (optional)

Whether or not we should silence logging when setting up the cluster.

asynchronous: bool (optional)

If this is intended to be used directly within an event loop with async/await

securitySecurity or bool (optional)

Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically. Default is True.

preemptible: bool (optional)

Whether to use preemptible instances for workers in this cluster. Defaults to False.

debug: bool, optional

More information will be printed when constructing clusters to enable debugging.

instance_labels: dict (optional)

Labels to be applied to all GCP instances upon creation.


Create the cluster.

>>> from dask_cloudprovider.gcp import GCPCluster
>>> cluster = GCPCluster(n_workers=1)
Launching cluster with the following configuration:
Source Image: projects/ubuntu-os-cloud/global/images/ubuntu-minimal-1804-bionic-v20201014
Docker Image: daskdev/dask:latest
Machine Type: n1-standard-1
Filesytsem Size: 50
N-GPU Type:
Zone: us-east1-c
Creating scheduler instance
        Internal IP:
        External IP:
Waiting for scheduler to run
Scheduler is running
Creating worker instance
        Internal IP:
        External IP:

Connect a client.

>>> from dask.distributed import Client
>>> client = Client(cluster)

Do some work.

>>> import dask.array as da
>>> arr = da.random.random((1000, 1000), chunks=(100, 100))
>>> arr.mean().compute()

Close the cluster

>>> cluster.close()
Closing Instance: dask-acc897b9-worker-bfbc94bc
Closing Instance: dask-acc897b9-scheduler

You can also do this all in one go with context managers to ensure the cluster is created and cleaned up.

>>> with GCPCluster(n_workers=1) as cluster:
...     with Client(cluster) as client:
...         print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute())
Launching cluster with the following configuration:
Source Image: projects/ubuntu-os-cloud/global/images/ubuntu-minimal-1804-bionic-v20201014
Docker Image: daskdev/dask:latest
Machine Type: n1-standard-1
Filesystem Size: 50
N-GPU Type:
Zone: us-east1-c
Creating scheduler instance
        Internal IP:
        External IP:
Waiting for scheduler to run
Scheduler is running
Creating worker instance
        Internal IP:
        External IP:
Closing Instance: dask-19352f29-worker-91a6bfe0
Closing Instance: dask-19352f29-scheduler

Are we running in the event loop?



adapt([Adaptive, minimum, maximum, ...])

Turn on adaptivity

call_async(f, *args, **kwargs)

Run a blocking function in a thread as a coroutine.


Create an instance of this class to represent an existing cluster by name.


Return client for the cluster

get_logs([cluster, scheduler, workers])

Return logs for the cluster, scheduler and workers


Generate tags to be applied to all resources.


Return name and spec for the next worker

scale([n, memory, cores])

Scale cluster to n workers

scale_up([n, memory, cores])

Scale cluster to n workers

sync(func, *args[, asynchronous, ...])

Call func with args synchronously or asynchronously depending on the calling context