Amazon Web Services (AWS)

EC2Cluster([region, bootstrap, …]) Deploy a Dask cluster using EC2.
ECSCluster([fargate_scheduler, …]) Deploy a Dask cluster using ECS
FargateCluster(**kwargs) Deploy a Dask cluster using Fargate on ECS

Overview

Authentication

In order to create clusters on AWS you need to set your access key, secret key and region. The simplest way is to use the aws command line tool.

$ pip install awscli
$ aws configure

Elastic Compute Cloud (EC2)

class dask_cloudprovider.aws.EC2Cluster(region=None, bootstrap=None, auto_shutdown=None, ami=None, instance_type=None, vpc=None, subnet_id=None, security_groups=None, filesystem_size=None, docker_image=None, **kwargs)[source]

Deploy a Dask cluster using EC2.

This creates a Dask scheduler and workers on EC2 instances.

All instances will run a single configurable Docker container which should contain a valid Python environment with Dask and any other dependencies.

All optional parameters can also be configured in a cloudprovider.yaml file in your Dask configuration directory or via environment variables.

For example ami can be set via DASK_CLOUDPROVIDER__EC2__AMI.

See https://docs.dask.org/en/latest/configuration.html for more info.

Parameters:
region: string (optional)

The region to start you clusters. By default this will be detected from your config.

bootstrap: bool (optional)

It is assumed that the ami will not have Docker installed (or the NVIDIA drivers for GPU instances). If bootstrap is True these dependencies will be installed on instance start. If you are using a custom AMI which already has these dependencies set this to False.

worker_command: string (optional)

The command workers should run when starting. By default this will be "dask-worker" unless instance_type is a GPU instance in which case dask-cuda-worker will be used.

ami: string (optional)

The base OS AMI to use for scheduler and workers.

This must be a Debian flavour distribution. By default this will be the latest official Ubuntu 20.04 LTS release from canonical.

If the AMI does not include Docker it will be installed at runtime. If the instance_type is a GPU instance the NVIDIA drivers and Docker GPU runtime will be installed at runtime.

instance_type: string (optional)

A valid EC2 instance type. This will determine the resources available to your workers.

See https://aws.amazon.com/ec2/instance-types/.

By default will use t2.micro.

vpc: string (optional)

The VPC ID in which to launch the instances.

Will detect and use the default VPC if not specified.

subnet_id: string (optional)

The Subnet ID in which to launch the instances.

Will use all subnets for the VPC if not specified.

security_groups: List(string) (optional)

The security group ID that will be attached to the workers.

Must allow all traffic between instances in the security group and ports 8786 and 8787 between the scheduler instance and wherever you are calling EC2Cluster from.

By default a Dask security group will be created with ports 8786 and 8787 exposed to the internet.

filesystem_size: int (optional)

The instance filesystem size in GB.

Defaults to 40.

n_workers: int

Number of workers to initialise the cluster with. Defaults to 0.

worker_module: str

The Python module to run for the worker. Defaults to distributed.cli.dask_worker

worker_options: dict

Params to be passed to the worker class. See distributed.worker.Worker for default worker class. If you set worker_module then refer to the docstring for the custom worker class.

scheduler_options: dict

Params to be passed to the scheduler class. See distributed.scheduler.Scheduler.

docker_image: string (optional)

The Docker image to run on all instances.

This image must have a valid Python environment and have dask installed in order for the dask-scheduler and dask-worker commands to be available. It is recommended the Python environment matches your local environment where EC2Cluster is being created from.

For GPU instance types the Docker image much have NVIDIA drivers and dask-cuda installed.

By default the daskdev/dask:latest image will be used.

env_vars: dict (optional)

Environment variables to be passed to the worker.

silence_logs: bool

Whether or not we should silence logging when setting up the cluster.

asynchronous: bool

If this is intended to be used directly within an event loop with async/await

security : Security or bool, optional

Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically.

Notes

Resources created

Resource Name Purpose Cost
EC2 Instance dask-scheduler-{cluster uuid} Dask Scheduler EC2 Pricing
EC2 Instance dask-worker-{cluster uuid}-{worker uuid} Dask Workers EC2 Pricing

Manual cleanup

If for some reason the cluster manager is terminated without being able to perform cleanup the default behaviour of EC2Cluster is for the scheduler and workers to time out. This will result in the host VMs shutting down. This cluster manager also creates instances with the terminate on shutdown setting so all resources should be removed automatically.

If for some reason you chose to override those settings and disable auto cleanup you can destroy resources with the following CLI command.

export CLUSTER_ID="cluster id printed during creation"
aws ec2 describe-instances \
    --filters "Name=tag:Dask Cluster,Values=${CLUSTER_ID}" \
    --query "Reservations[*].Instances[*].[InstanceId]" \
    --output text | xargs aws ec2 terminate-instances --instance-ids

Examples

Regular cluster.

>>> cluster = EC2Cluster()
>>> cluster.scale(5)

RAPIDS Cluster.

>>> cluster = EC2Cluster(ami="ami-0c7c7d78f752f8f17",  # Example Deep Learning AMI (Ubuntu 18.04)
                         docker_image="rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04",
                         instance_type="p3.2xlarge",
                         worker_module="dask_cuda.cli.dask_cuda_worker",
                         bootstrap=False,
                         filesystem_size=120)
Attributes:
asynchronous
auto_shutdown
bootstrap
command
dashboard_link
docker_image
gpu_instance
observed
plan
requested
scheduler_address
scheduler_class
worker_class

Methods

adapt(*args[, minimum, maximum]) Turn on adaptivity
get_logs([cluster, scheduler, workers]) Return logs for the cluster, scheduler and workers
get_tags() Generate tags to be applied to all resources.
new_worker_spec() Return name and spec for the next worker
scale([n, memory, cores]) Scale cluster to n workers
scale_up([n, memory, cores]) Scale cluster to n workers
close  
get_cloud_init  
logs  
render_cloud_init  
scale_down  
sync  

Elastic Container Service (ECS)

class dask_cloudprovider.aws.ECSCluster(fargate_scheduler=False, fargate_workers=False, image=None, scheduler_cpu=None, scheduler_mem=None, scheduler_timeout=None, scheduler_extra_args=None, scheduler_task_kwargs=None, worker_cpu=None, worker_mem=None, worker_gpu=None, worker_extra_args=None, worker_task_kwargs=None, n_workers=None, cluster_arn=None, cluster_name_template=None, execution_role_arn=None, task_role_arn=None, task_role_policies=None, cloudwatch_logs_group=None, cloudwatch_logs_stream_prefix=None, cloudwatch_logs_default_retention=None, vpc=None, subnets=None, security_groups=None, environment=None, tags=None, find_address_timeout=None, skip_cleanup=None, aws_access_key_id=None, aws_secret_access_key=None, region_name=None, platform_version=None, fargate_use_private_ip=False, mount_points=None, volumes=None, mount_volumes_on_scheduler=False, **kwargs)[source]

Deploy a Dask cluster using ECS

This creates a dask scheduler and workers on an existing ECS cluster.

All the other required resources such as roles, task definitions, tasks, etc will be created automatically like in FargateCluster.

Parameters:
fargate_scheduler: bool (optional)

Select whether or not to use fargate for the scheduler.

Defaults to False. You must provide an existing cluster.

fargate_workers: bool (optional)

Select whether or not to use fargate for the workers.

Defaults to False. You must provide an existing cluster.

image: str (optional)

The docker image to use for the scheduler and worker tasks.

Defaults to daskdev/dask:latest or rapidsai/rapidsai:latest if worker_gpu is set.

scheduler_cpu: int (optional)

The amount of CPU to request for the scheduler in milli-cpu (1/1024).

Defaults to 1024 (one vCPU). See the troubleshooting guide for information on the valid values for this argument.

scheduler_mem: int (optional)

The amount of memory to request for the scheduler in MB.

Defaults to 4096 (4GB). See the troubleshooting guide for information on the valid values for this argument.

scheduler_timeout: str (optional)

The scheduler task will exit after this amount of time if there are no clients connected.

Defaults to 5 minutes.

scheduler_extra_args: List[str] (optional)

Any extra command line arguments to pass to dask-scheduler, e.g. ["--tls-cert", "/path/to/cert.pem"]

Defaults to None, no extra command line arguments.

scheduler_task_kwargs: dict (optional)

Additional keyword arguments for the scheduler ECS task.

worker_cpu: int (optional)

The amount of CPU to request for worker tasks in milli-cpu (1/1024).

Defaults to 4096 (four vCPUs). See the troubleshooting guide for information on the valid values for this argument.

worker_mem: int (optional)

The amount of memory to request for worker tasks in MB.

Defaults to 16384 (16GB). See the troubleshooting guide for information on the valid values for this argument.

worker_gpu: int (optional)

The number of GPUs to expose to the worker.

To provide GPUs to workers you need to use a GPU ready docker image that has dask-cuda installed and GPU nodes available in your ECS cluster. Fargate is not supported at this time.

Defaults to None, no GPUs.

worker_extra_args: List[str] (optional)

Any extra command line arguments to pass to dask-worker, e.g. ["--tls-cert", "/path/to/cert.pem"]

Defaults to None, no extra command line arguments.

worker_task_kwargs: dict (optional)

Additional keyword arguments for the workers ECS task.

n_workers: int (optional)

Number of workers to start on cluster creation.

Defaults to None.

cluster_arn: str (optional if fargate is true)

The ARN of an existing ECS cluster to use for launching tasks.

Defaults to None which results in a new cluster being created for you.

cluster_name_template: str (optional)

A template to use for the cluster name if cluster_arn is set to None.

Defaults to 'dask-{uuid}'

execution_role_arn: str (optional)

The ARN of an existing IAM role to use for ECS execution.

This ARN must have sts:AssumeRole allowed for ecs-tasks.amazonaws.com and allow the following permissions:

  • ecr:GetAuthorizationToken
  • ecr:BatchCheckLayerAvailability
  • ecr:GetDownloadUrlForLayer
  • ecr:GetRepositoryPolicy
  • ecr:DescribeRepositories
  • ecr:ListImages
  • ecr:DescribeImages
  • ecr:BatchGetImage
  • logs:*
  • ec2:AuthorizeSecurityGroupIngress
  • ec2:Describe*
  • elasticloadbalancing:DeregisterInstancesFromLoadBalancer
  • elasticloadbalancing:DeregisterTargets
  • elasticloadbalancing:Describe*
  • elasticloadbalancing:RegisterInstancesWithLoadBalancer
  • elasticloadbalancing:RegisterTargets

Defaults to None (one will be created for you).

task_role_arn: str (optional)

The ARN for an existing IAM role for tasks to assume. This defines which AWS resources the dask workers can access directly. Useful if you need to read from S3 or a database without passing credentials around.

Defaults to None (one will be created with S3 read permission only).

task_role_policies: List[str] (optional)

If you do not specify a task_role_arn you may want to list some IAM Policy ARNs to be attached to the role that will be created for you.

E.g if you need your workers to read from S3 you could add arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess.

Default None (no policies will be attached to the role)

cloudwatch_logs_group: str (optional)

The name of an existing cloudwatch log group to place logs into.

Default None (one will be created called dask-ecs)

cloudwatch_logs_stream_prefix: str (optional)

Prefix for log streams.

Defaults to the cluster name.

cloudwatch_logs_default_retention: int (optional)

Retention for logs in days. For use when log group is auto created.

Defaults to 30.

vpc: str (optional)

The ID of the VPC you wish to launch your cluster in.

Defaults to None (your default VPC will be used).

subnets: List[str] (optional)

A list of subnets to use when running your task.

Defaults to None. (all subnets available in your VPC will be used)

security_groups: List[str] (optional)

A list of security group IDs to use when launching tasks.

Defaults to None (one will be created which allows all traffic between tasks and access to ports 8786 and 8787 from anywhere).

environment: dict (optional)

Extra environment variables to pass to the scheduler and worker tasks.

Useful for setting EXTRA_APT_PACKAGES, EXTRA_CONDA_PACKAGES and `EXTRA_PIP_PACKAGES if you’re using the default image.

Defaults to None.

tags: dict (optional)

Tags to apply to all resources created automatically.

Defaults to None. Tags will always include {"createdBy": "dask-cloudprovider"}

find_address_timeout: int

Configurable timeout in seconds for finding the task IP from the cloudwatch logs.

Defaults to 60 seconds.

skip_cleanup: bool (optional)

Skip cleaning up of stale resources. Useful if you have lots of resources and this operation takes a while.

Default False.

platform_version: str (optional)

Version of the AWS Fargate platform to use, e.g. “1.4.0” or “LATEST”. This setting has no effect for the EC2 launch type.

Defaults to None

fargate_use_private_ip: bool (optional)

Whether to use a private IP (if True) or public IP (if False) with Fargate.

Default False.

mount_points: list (optional)

List of mount points as documented here: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/efs-volumes.html

Default None.

volumes: list (optional)

List of volumes as documented here: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/efs-volumes.html

Default None.

mount_volumes_on_scheduler: bool (optional)

Whether to also mount volumes in the scheduler task. Any volumes and mount points specified will always be mounted in worker tasks. This setting controls whether volumes are also mounted in the scheduler task.

Default False.

**kwargs: dict

Additional keyword arguments to pass to SpecCluster.

Examples

>>> from dask_cloudprovider.aws import ECSCluster
>>> cluster = ECSCluster(cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<clustername>")

There is also support in ECSCluster for GPU aware Dask clusters. To do this you need to create an ECS cluster with GPU capable instances (from the g3, p3 or p3dn families) and specify the number of GPUs each worker task should have.

>>> from dask_cloudprovider.aws import ECSCluster
>>> cluster = ECSCluster(
...     cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
...     worker_gpu=1)

By setting the worker_gpu option to something other than None will cause the cluster to run dask-cuda-worker as the worker startup command. Setting this option will also change the default Docker image to rapidsai/rapidsai:latest, if you’re using a custom image you must ensure the NVIDIA CUDA toolkit is installed with a version that matches the host machine along with dask-cuda.

Attributes:
asynchronous
dashboard_link
observed
plan
requested
scheduler_address
tags

Methods

adapt(*args[, minimum, maximum]) Turn on adaptivity
get_logs([cluster, scheduler, workers]) Return logs for the cluster, scheduler and workers
new_worker_spec() Return name and spec for the next worker
scale([n, memory, cores]) Scale cluster to n workers
scale_up([n, memory, cores]) Scale cluster to n workers
close  
logs  
scale_down  
sync  

Fargate

class dask_cloudprovider.aws.FargateCluster(**kwargs)[source]

Deploy a Dask cluster using Fargate on ECS

This creates a dask scheduler and workers on a Fargate powered ECS cluster. If you do not configure a cluster one will be created for you with sensible defaults.

Parameters:
kwargs: dict

Keyword arguments to be passed to ECSCluster.

Notes

IAM Permissions

To create a FargateCluster the cluster manager will need to various AWS resources ranging from IAM roles to VPCs to ECS tasks. Depending on your use case you may want the cluster to create all of these for you, or you may wish to specify them youself ahead of time.

Here is the full minimal IAM policy that you need to create the whole cluster:

{
    "Statement": [
        {
            "Action": [
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:CreateSecurityGroup",
                "ec2:CreateTags",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:DeleteSecurityGroup",
                "ecs:CreateCluster",
                "ecs:DescribeTasks",
                "ecs:ListAccountSettings",
                "ecs:RegisterTaskDefinition",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:ListClusters",
                "ecs:DescribeClusters",
                "ecs:DeleteCluster",
                "ecs:ListTaskDefinitions",
                "ecs:DescribeTaskDefinition",
                "ecs:DeregisterTaskDefinition",
                "iam:AttachRolePolicy",
                "iam:CreateRole",
                "iam:TagRole",
                "iam:PassRole",
                "iam:DeleteRole",
                "iam:ListRoleTags",
                "iam:ListAttachedRolePolicies",
                "iam:DetachRolePolicy",
                "logs:DescribeLogGroups"
            ],
            "Effect": "Allow",
            "Resource": [
                "*"
            ]
        }
    ],
    "Version": "2012-10-17"
}

If you specify all of the resources yourself you will need a minimal policy of:

{
    "Statement": [
        {
            "Action": [
                "ec2:CreateTags",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ecs:DescribeTasks",
                "ecs:ListAccountSettings",
                "ecs:RegisterTaskDefinition",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:ListClusters",
                "ecs:DescribeClusters",
                "ecs:ListTaskDefinitions",
                "ecs:DescribeTaskDefinition",
                "ecs:DeregisterTaskDefinition",
                "iam:ListRoleTags",
                "logs:DescribeLogGroups"
            ],
            "Effect": "Allow",
            "Resource": [
                "*"
            ]
        }
    ],
    "Version": "2012-10-17"
}

Examples

The FargateCluster will create a new Fargate ECS cluster by default along with all the IAM roles, security groups, and so on that it needs to function.

>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster()

Note that in many cases you will want to specify a custom Docker image to FargateCluster so that Dask has the packages it needs to execute your workflow.

>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster(image="<hub-user>/<repo-name>[:<tag>]")

One strategy to ensure that package versions match between your custom environment and the Docker container is to create your environment from an environment.yml file, export the exact package list for that environment using conda list --export > package-list.txt, and then use the pinned package versions contained in package-list.txt in your Dockerfile. You could use the default Dask Dockerfile as a template and simply add your pinned additional packages.

Attributes:
asynchronous
dashboard_link
observed
plan
requested
scheduler_address
tags

Methods

adapt(*args[, minimum, maximum]) Turn on adaptivity
get_logs([cluster, scheduler, workers]) Return logs for the cluster, scheduler and workers
new_worker_spec() Return name and spec for the next worker
scale([n, memory, cores]) Scale cluster to n workers
scale_up([n, memory, cores]) Scale cluster to n workers
close  
logs  
scale_down  
sync