import asyncio
import random

import dask
from dask_cloudprovider.generic.vmcluster import (
from import (
from dask_cloudprovider.utils.timeout import Timeout

    from aiobotocore.session import get_session
    import botocore.exceptions
    import botocore.config
except ImportError as e:
    msg = (
        "Dask Cloud Provider AWS requirements are not installed.\n\n"
        "Please either conda or pip install as follows:\n\n"
        "  conda install -c conda-forge dask-cloudprovider       # either conda install\n"
        '  pip install "dask-cloudprovider[aws]" --upgrade       # or python -m pip install'
    raise ImportError(msg) from e

class EC2Instance(VMInterface):
    def __init__(
        instance_tags: None,
        volume_tags: None,
        use_private_ip: False,
        super().__init__(*args, **kwargs)
        self.instance = None
        self.cluster = cluster
        self.config = config
        self.region = region
        self.availability_zone = availability_zone
        self.bootstrap = bootstrap
        self.extra_bootstrap = extra_bootstrap
        self.ami = ami
        self.docker_image = docker_image or self.config.get("docker_image")
        self.env_vars = env_vars
        self.instance_type = instance_type
        self.gpu_instance = gpu_instance
        self.vpc = vpc
        self.subnet_id = subnet_id
        self.security_groups = security_groups
        self.filesystem_size = filesystem_size
        self.key_name = key_name
        self.iam_instance_profile = iam_instance_profile
        self.instance_tags = instance_tags
        self.volume_tags = volume_tags
        self.use_private_ip = use_private_ip
        self.enable_detailed_monitoring = enable_detailed_monitoring

    async def create_vm(self):
        # TODO Enable Spot support

        boto_config = botocore.config.Config(retries=dict(max_attempts=10))
        async with self.cluster.boto_session.create_client(
            "ec2", region_name=self.region, config=boto_config
        ) as client:
            self.vpc = self.vpc or await get_default_vpc(client)
            self.subnet_id = (
                self.subnet_id or (await get_vpc_subnets(client, self.vpc))[0]
            self.security_groups = self.security_groups or [
                await get_security_group(client, self.vpc)
            self.ami = self.ami or await get_latest_ami_id(
                "099720109477",  # Canonical

            vm_kwargs = {
                "BlockDeviceMappings": [
                        "DeviceName": "/dev/sda1",
                        "VirtualName": "sda1",
                        "Ebs": {
                            "DeleteOnTermination": True,
                            "VolumeSize": self.filesystem_size,
                            "VolumeType": "gp3",
                            "Encrypted": False,
                "ImageId": self.ami,
                "InstanceType": self.instance_type,
                "MaxCount": 1,
                "MinCount": 1,
                "Monitoring": {"Enabled": self.enable_detailed_monitoring},
                "UserData": self.cluster.render_process_cloud_init(self),
                "InstanceInitiatedShutdownBehavior": "terminate",
                "NetworkInterfaces": [
                        "AssociatePublicIpAddress": False
                        if self.use_private_ip
                        else True,
                        "DeleteOnTermination": True,
                        "Description": "private" if self.use_private_ip else "public",
                        "DeviceIndex": 0,
                        "Groups": self.security_groups,
                        "SubnetId": self.subnet_id,
                "TagSpecifications": [
                        "ResourceType": "instance",
                        "Tags": dict_to_aws(self.instance_tags, upper=True),
                        "ResourceType": "volume",
                        "Tags": dict_to_aws(self.volume_tags, upper=True),

            if self.key_name:
                vm_kwargs["KeyName"] = self.key_name

            if self.iam_instance_profile:
                vm_kwargs["IamInstanceProfile"] = self.iam_instance_profile

            if self.availability_zone:
                if isinstance(self.availability_zone, list):
                    self.availability_zone = random.choice(self.availability_zone)
                vm_kwargs["Placement"] = {"AvailabilityZone": self.availability_zone}

            response = await client.run_instances(**vm_kwargs)
            [self.instance] = response["Instances"]

            try:  # Ensure we tear down any resources we allocated if something goes wrong
                return await self.configure_vm(client)
            except Exception:
                    f"reclaiming vm because configure_vm failed {}"
                await self.destroy_vm()

    async def configure_vm(self, client):
        timeout = Timeout(300, f"Failed to add tags for {self.instance['InstanceId']}")
        backoff = 0.1
                await client.create_tags(
                        {"Key": "Name", "Value":},
                        {"Key": "Dask Cluster", "Value": self.cluster.uuid},
            except Exception as e:

            await asyncio.sleep(min(backoff, 10) + backoff % 1)
            # Exponential backoff with a cap of 10 seconds and some jitter
            backoff = backoff * 2

            f"Created instance {self.instance['InstanceId']} as {}"

        address_type = "Private" if self.use_private_ip else "Public"
        ip_address_key = f"{address_type}IpAddress"

        default_error = (
            f"Failed {address_type} IP for instance {self.instance['InstanceId']}"
        timeout = Timeout(300, default_error)
        backoff = 0.1
        while self.instance.get(ip_address_key) is None and
                response = await client.describe_instances(
                    InstanceIds=[self.instance["InstanceId"]], DryRun=False
                [reservation] = response["Reservations"]
                [self.instance] = reservation["Instances"]
            except botocore.exceptions.ClientError as e:
            await asyncio.sleep(min(backoff, 10) + backoff % 1)
            # Exponential backoff with a cap of 10 seconds and some jitter
            backoff = backoff * 2
        return self.instance[ip_address_key], None

    async def destroy_vm(self):
        boto_config = botocore.config.Config(retries=dict(max_attempts=10))
        async with self.cluster.boto_session.create_client(
            "ec2", region_name=self.region, config=boto_config
        ) as client:
            await client.terminate_instances(
                InstanceIds=[self.instance["InstanceId"]], DryRun=False
            self.cluster._log(f"Terminated {} ({self.instance['InstanceId']})")

class EC2Scheduler(SchedulerMixin, EC2Instance):
    """Scheduler running on an EC2 instance."""

class EC2Worker(WorkerMixin, EC2Instance):
    """Worker running on an EC2 instance."""

[docs]class EC2Cluster(VMCluster): """Deploy a Dask cluster using EC2. This creates a Dask scheduler and workers on EC2 instances. All instances will run a single configurable Docker container which should contain a valid Python environment with Dask and any other dependencies. All optional parameters can also be configured in a `cloudprovider.yaml` file in your Dask configuration directory or via environment variables. For example ``ami`` can be set via ``DASK_CLOUDPROVIDER__EC2__AMI``. See for more info. Parameters ---------- region: string (optional) The region to start your clusters. By default this will be detected from your config. availability_zone: string or List(string) (optional) The availability zone to start your clusters. By default AWS will select the AZ with most free capacity. If you specify more than one then scheduler and worker VMs will be randomly assigned to one of your chosen AZs. bootstrap: bool (optional) It is assumed that the ``ami`` will not have Docker installed (or the NVIDIA drivers for GPU instances). If ``bootstrap`` is ``True`` these dependencies will be installed on instance start. If you are using a custom AMI which already has these dependencies set this to ``False.`` worker_command: string (optional) The command workers should run when starting. By default this will be ``"dask-worker"`` unless ``instance_type`` is a GPU instance in which case ``dask-cuda-worker`` will be used. ami: string (optional) The base OS AMI to use for scheduler and workers. This must be a Debian flavour distribution. By default this will be the latest official Ubuntu 20.04 LTS release from canonical. If the AMI does not include Docker it will be installed at runtime. If the instance_type is a GPU instance the NVIDIA drivers and Docker GPU runtime will be installed at runtime. instance_type: string (optional) A valid EC2 instance type. This will determine the resources available to the scheduler and all workers. If supplied, you may not specify ``scheduler_instance_type`` or ``worker_instance_type``. See By default will use ``t2.micro``. scheduler_instance_type: string (optional) A valid EC2 instance type. This will determine the resources available to the scheduler. See By default will use ``t2.micro``. worker_instance_type: string (optional) A valid EC2 instance type. This will determine the resources available to all workers. See By default will use ``t2.micro``. vpc: string (optional) The VPC ID in which to launch the instances. Will detect and use the default VPC if not specified. subnet_id: string (optional) The Subnet ID in which to launch the instances. Will use all subnets for the VPC if not specified. security_groups: List(string) (optional) The security group ID that will be attached to the workers. Must allow all traffic between instances in the security group and ports 8786 and 8787 between the scheduler instance and wherever you are calling ``EC2Cluster`` from. By default a Dask security group will be created with ports 8786 and 8787 exposed to the internet. filesystem_size: int (optional) The instance filesystem size in GB. Defaults to ``40``. key_name: str (optional) The SSH key name to assign to all instances created by the cluster manager. You can list your existing key pair names with ``aws ec2 describe-key-pairs --query 'KeyPairs[*].KeyName' --output text``. NOTE: You will need to ensure your security group allows access on port 22. If ``security_groups`` is not set the default group will not contain this rule and you will need to add it manually. iam_instance_profile: dict (optional) An IAM profile to assign to VMs. This can be used for allowing access to other AWS resources such as S3. See n_workers: int Number of workers to initialise the cluster with. Defaults to ``0``. worker_module: str The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker`` worker_options: dict Params to be passed to the worker class. See :class:`distributed.worker.Worker` for default worker class. If you set ``worker_module`` then refer to the docstring for the custom worker class. scheduler_options: dict Params to be passed to the scheduler class. See :class:`distributed.scheduler.Scheduler`. docker_image: string (optional) The Docker image to run on all instances. This image must have a valid Python environment and have ``dask`` installed in order for the ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python environment matches your local environment where ``EC2Cluster`` is being created from. For GPU instance types the Docker image much have NVIDIA drivers and ``dask-cuda`` installed. By default the ``daskdev/dask:latest`` image will be used. docker_args: string (optional) Extra command line arguments to pass to Docker. env_vars: dict (optional) Environment variables to be passed to the worker. silence_logs: bool Whether or not we should silence logging when setting up the cluster. asynchronous: bool If this is intended to be used directly within an event loop with async/await security : Security or bool, optional Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically. Default is ``True``. debug: bool, optional More information will be printed when constructing clusters to enable debugging. instance_tags: dict, optional Tags to be applied to all EC2 instances upon creation. By default, includes "createdBy": "dask-cloudprovider" volume_tags: dict, optional Tags to be applied to all EBS volumes upon creation. By default, includes "createdBy": "dask-cloudprovider" use_private_ip: bool (optional) Whether to use a private IP (if True) or public IP (if False). Default ``False``. enable_detailed_monitoring: bool (optional) Whether to enable detailed monitoring for created instances. See Default ``False``. Notes ----- **Resources created** .. csv-table:: :header: Resource, Name, Purpose, Cost EC2 Instance, dask-scheduler-{cluster uuid}, Dask Scheduler, "`EC2 Pricing <>`_" EC2 Instance, dask-worker-{cluster uuid}-{worker uuid}, Dask Workers, "`EC2 Pricing <>`_" **Credentials** In order for Dask workers to access AWS resources such as S3 they will need credentials. The best practice way of doing this is to pass an IAM role to be used by workers. See the ``iam_instance_profile`` keyword for more information. Alternatively you could read in your local credentials created with ``aws configure`` and pass them along as environment variables. Here is a small example to help you do that. >>> def get_aws_credentials(): ... parser = configparser.RawConfigParser() ...'~/.aws/config')) ... config = parser.items('default') ...'~/.aws/credentials')) ... credentials = parser.items('default') ... all_credentials = {key.upper(): value for key, value in [*config, *credentials]} ... with contextlib.suppress(KeyError): ... all_credentials["AWS_REGION"] = all_credentials.pop("REGION") ... return all_credentials >>> cluster = EC2Cluster(env_vars=get_aws_credentials()) **Manual cleanup** If for some reason the cluster manager is terminated without being able to perform cleanup the default behaviour of ``EC2Cluster`` is for the scheduler and workers to time out. This will result in the host VMs shutting down. This cluster manager also creates instances with the terminate on shutdown setting so all resources should be removed automatically. If for some reason you chose to override those settings and disable auto cleanup you can destroy resources with the following CLI command. .. code-block:: bash export CLUSTER_ID="cluster id printed during creation" aws ec2 describe-instances \\ --filters "Name=tag:Dask Cluster,Values=${CLUSTER_ID}" \\ --query "Reservations[*].Instances[*].[InstanceId]" \\ --output text | xargs aws ec2 terminate-instances --instance-ids Examples -------- Regular cluster. >>> cluster = EC2Cluster() >>> cluster.scale(5) RAPIDS Cluster. >>> env_vars = >>> cluster = EC2Cluster(ami="ami-06d62f645899df7de", # Deep Learning AMI Ubuntu 18.04 (these are region specific) ... docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04", ... instance_type="g4dn.xlarge", ... worker_class="dask_cuda.CUDAWorker", ... n_workers=2, ... bootstrap=False, ... filesystem_size=120, ... env_vars=get_aws_credentials()) # Pass credentials to Cluster see Notes section for info ======= Enable SSH for debugging >>> from import EC2Cluster >>> cluster = EC2Cluster(key_name="myawesomekey", # Security group which allows ports 22, 8786, 8787 and all internal traffic security_groups=["sg-aabbcc112233"]) # You can now SSH to an instance with `ssh ubuntu@public_ip` >>> cluster.close() """ def __init__( self, region=None, availability_zone=None, bootstrap=None, auto_shutdown=None, ami=None, instance_type=None, scheduler_instance_type=None, worker_instance_type=None, vpc=None, subnet_id=None, security_groups=None, filesystem_size=None, key_name=None, iam_instance_profile=None, docker_image=None, debug=False, instance_tags=None, volume_tags=None, use_private_ip=None, enable_detailed_monitoring=None, **kwargs, ): self.boto_session = get_session() self.config = dask.config.get("cloudprovider.ec2", {}) self.scheduler_class = EC2Scheduler self.worker_class = EC2Worker self.region = region if region is not None else self.config.get("region") self.availability_zone = ( availability_zone if availability_zone is not None else self.config.get("availability_zone") ) self.bootstrap = ( bootstrap if bootstrap is not None else self.config.get("bootstrap") ) self.auto_shutdown = ( auto_shutdown if auto_shutdown is not None else self.config.get("auto_shutdown") ) self.ami = ami if ami is not None else self.config.get("ami") self.instance_type = ( instance_type if instance_type is not None else self.config.get("instance_type") ) if instance_type is None: self.scheduler_instance_type = ( scheduler_instance_type if scheduler_instance_type is not None else self.config.get("scheduler_instance_type") ) self.worker_instance_type = ( worker_instance_type if worker_instance_type is not None else self.config.get("worker_instance_type") ) else: if scheduler_instance_type is not None or worker_instance_type is not None: raise ValueError( "If you specify instance_type, you may not specify scheduler_instance_type or worker_instance_type" ) self.scheduler_instance_type = instance_type self.worker_instance_type = instance_type self.gpu_instance = self.instance_type.startswith(("p", "g")) self.vpc = vpc if vpc is not None else self.config.get("vpc") self.subnet_id = ( subnet_id if subnet_id is not None else self.config.get("subnet_id") ) self.security_groups = ( security_groups if security_groups is not None else self.config.get("security_groups") ) self.filesystem_size = ( filesystem_size if filesystem_size is not None else self.config.get("filesystem_size") ) self.key_name = ( key_name if key_name is not None else self.config.get("key_name") ) self.iam_instance_profile = ( iam_instance_profile if iam_instance_profile is not None else self.config.get("iam_instance_profile") ) self.debug = debug instance_tags = instance_tags if instance_tags is not None else {} self.instance_tags = {**instance_tags, **self.config.get("instance_tags")} volume_tags = volume_tags if volume_tags is not None else {} self.volume_tags = {**volume_tags, **self.config.get("volume_tags")} self._use_private_ip = ( use_private_ip if use_private_ip is not None else self.config.get("use_private_ip") ) self.enable_detailed_monitoring = ( enable_detailed_monitoring if enable_detailed_monitoring is not None else self.config.get("enable_detailed_monitoring") ) self.options = { "cluster": self, "config": self.config, "region": self.region, "availability_zone": self.availability_zone, "bootstrap": self.bootstrap, "ami": self.ami, "docker_image": docker_image or self.config.get("docker_image"), "instance_type": self.instance_type, "scheduler_instance_type": self.scheduler_instance_type, "worker_instance_type": self.worker_instance_type, "gpu_instance": self.gpu_instance, "vpc": self.vpc, "subnet_id": self.subnet_id, "security_groups": self.security_groups, "filesystem_size": self.filesystem_size, "key_name": self.key_name, "iam_instance_profile": self.iam_instance_profile, "instance_tags": self.instance_tags, "volume_tags": self.volume_tags, "use_private_ip": self._use_private_ip, "enable_detailed_monitoring": self.enable_detailed_monitoring, } self.scheduler_options = {**self.options} self.worker_options = {**self.options} self.scheduler_options["instance_type"] = self.scheduler_instance_type self.worker_options["instance_type"] = self.worker_instance_type super().__init__(debug=debug, **kwargs)