Microsoft Azure
Contents
Microsoft Azure¶
|
Cluster running on Azure Virtual machines. |
Overview¶
Authentication¶
In order to create clusters on Azure you need to set your authentication credentials.
You can do this via the az
command line tool.
$ az login
Note
Setting the default output to table
with az configure
will make the az
tool much easier to use.
Resource Groups¶
To create resources on Azure they must be placed in a resource group. Dask Cloudprovider will need a group to create Dask components in.
You can list existing groups via the cli.
$ az group list
You can also create a new resource group if you do not have an existing one.
$ az group create --location <location> --name <resource group name> --subscription <subscription>
You can get a full list of locations with az account list-locations
and subscriptions with az account list
.
Take note of your resource group name for later.
Virtual Networks¶
Compute resources on Azure must be placed in virtual networks (vnet). Dask Cloudprovider will require an existing vnet to connect compute resources to.
You can list existing vnets via the cli.
$ az network vnet list
You can also create a new vnet via the cli.
$ az network vnet create -g <resource group name> -n <vnet name> --address-prefix 10.0.0.0/16 \
--subnet-name <subnet name> --subnet-prefix 10.0.0.0/24
This command will create a new vnet in your resource group with one subnet with the 10.0.0.0/24
prefix. For more than 255 compute resources you will need additional subnets.
Take note of your vnet name for later.
Security Groups¶
To allow network traffic to reach your Dask cluster you will need to create a security group which allows traffic on ports 8786-8787 from wherever you are.
You can list existing security groups via the cli.
$ az network nsg list
Or you can create a new security group.
$ az network nsg create -g <resource group name> --name <security group name>
$ az network nsg rule create -g <resource group name> --nsg-name <security group name> -n MyNsgRuleWithAsg \
--priority 500 --source-address-prefixes Internet --destination-port-ranges 8786 8787 \
--destination-address-prefixes '*' --access Allow --protocol Tcp --description "Allow Internet to Dask on ports 8786,8787."
This example allows all traffic to 8786-8787 from the internet. It is recommended you make your rules more restrictive than this by limiting it to your corporate network or specific IP.
Again take note of this security group name for later.
Extra options¶
To further customize the VMs created, you can provide extra_vm_options
to AzureVMCluster
. For example, to set the identity
of the virtual machines to a (previously created) user assigned identity, create an azure.mgmt.compute.models.VirtualMachineIdentity
>>> import os
>>> import azure.identity
>>> import dask_cloudprovider.azure
>>> import azure.mgmt.compute.models
>>> subscription_id = os.environ["DASK_CLOUDPROVIDER__AZURE__SUBSCRIPTION_ID"]
>>> rg_name = os.environ["DASK_CLOUDPROVIDER__AZURE__RESOURCE_GROUP"]
>>> identity_name = "dask-cloudprovider-identity"
>>> v = azure.mgmt.compute.models.UserAssignedIdentitiesValue()
>>> user_assigned_identities = {
... f"/subscriptions/{subscription_id}/resourcegroups/{rg_name}/providers/Microsoft.ManagedIdentity/userAssignedIdentities/{identity_name}": v
... }
>>> identity = azure.mgmt.compute.models.VirtualMachineIdentity(
... type="UserAssigned",
... user_assigned_identities=user_assigned_identities
... )
And then provide that to AzureVMCluster
>>> cluster = dask_cloudprovider.azure.AzureVMCluster(extra_vm_options={"identity": identity.as_dict()})
>>> cluster.scale(1)
Dask Configuration¶
You’ll provide the names or IDs of the Azure resources when you create a AzureVMCluster
. You can specify
these values manually, or use Dask’s configuration system
system. For example, the resource_group
value can be specified using an environment variable:
$ export DASK_CLOUDPROVIDER__AZURE__RESOURCE_GROUP="<resource group name>"
$ python
Or you can set it in a YAML configuration file.
cloudprovider:
azure:
resource_group: "<resource group name>"
azurevm:
vnet: "<vnet name>"
Note that the options controlling the VMs are under the cloudprovider.azure.azurevm key.
See Configuration for more.
AzureVM¶
- class dask_cloudprovider.azure.AzureVMCluster(location: str = None, resource_group: str = None, vnet: str = None, subnet: str = None, security_group: str = None, public_ingress: bool = None, vm_size: str = None, scheduler_vm_size: str = None, vm_image: dict = {}, disk_size: int = None, bootstrap: bool = None, auto_shutdown: bool = None, docker_image=None, debug: bool = False, marketplace_plan: dict = {}, subscription_id: Optional[str] = None, extra_vm_options: Optional[dict] = None, **kwargs)[source]¶
Cluster running on Azure Virtual machines.
This cluster manager constructs a Dask cluster running on Azure Virtual Machines.
When configuring your cluster you may find it useful to install the
az
tool for querying the Azure API for available options.https://docs.microsoft.com/en-us/cli/azure/install-azure-cli
- Parameters
- location: str
The Azure location to launch you cluster in. List available locations with
az account list-locations
.- resource_group: str
The resource group to create components in. List your resource groups with
az group list
.- vnet: str
The vnet to attach VM network interfaces to. List your vnets with
az network vnet list
.- subnet: str (optional)
The vnet subnet to attach VM network interfaces to. If omitted it will automatically use the first subnet in your vnet.
- security_group: str
The security group to apply to your VMs. This must allow ports 8786-8787 from wherever you are running this from. List your security groups with
az network nsg list
.- public_ingress: bool
Assign a public IP address to the scheduler. Default
True
.- vm_size: str
Azure VM size to use for scheduler and workers. Default
Standard_DS1_v2
. List available VM sizes withaz vm list-sizes --location <location>
.- disk_size: int
Specifies the size of the VM host OS disk in gigabytes. Default is
50
. This value cannot be larger than1023
.- scheduler_vm_size: str
Azure VM size to use for scheduler. If not set will use the
vm_size
.- vm_image: dict
By default all VMs will use the latest Ubuntu LTS release with the following configuration
{"publisher": "Canonical", "offer": "UbuntuServer","sku": "18.04-LTS", "version": "latest"}
You can override any of these options by passing a dict with matching keys here. For example if you wish to try Ubuntu 19.04 you can pass
{"sku": "19.04"}
and thepublisher
,offer
andversion
will be used from the default.- bootstrap: bool (optional)
It is assumed that the
VHD
will not have Docker installed (or the NVIDIA drivers for GPU instances). Ifbootstrap
isTrue
these dependencies will be installed on instance start. If you are using a custom VHD which already has these dependencies set this toFalse.
- auto_shutdown: bool (optional)
Shutdown the VM if the Dask process exits. Default
True
.- worker_module: str
The Dask worker module to start on worker VMs.
- n_workers: int
Number of workers to initialise the cluster with. Defaults to
0
.- worker_module: str
The Python module to run for the worker. Defaults to
distributed.cli.dask_worker
- worker_options: dict
Params to be passed to the worker class. See
distributed.worker.Worker
for default worker class. If you setworker_module
then refer to the docstring for the custom worker class.- scheduler_options: dict
Params to be passed to the scheduler class. See
distributed.scheduler.Scheduler
.- docker_image: string (optional)
The Docker image to run on all instances.
This image must have a valid Python environment and have
dask
installed in order for thedask-scheduler
anddask-worker
commands to be available. It is recommended the Python environment matches your local environment whereAzureVMCluster
is being created from.For GPU instance types the Docker image much have NVIDIA drivers and
dask-cuda
installed.By default the
daskdev/dask:latest
image will be used.- docker_args: string (optional)
Extra command line arguments to pass to Docker.
- extra_bootstrap: list[str] (optional)
Extra commands to be run during the bootstrap phase.
- silence_logs: bool
Whether or not we should silence logging when setting up the cluster.
- asynchronous: bool
If this is intended to be used directly within an event loop with async/await
- securitySecurity or bool, optional
Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically. Default is
True
.- debug: bool, optional
More information will be printed when constructing clusters to enable debugging.
- marketplace_plan: dict (optional)
Plan information dict necessary for creating a virtual machine from Azure Marketplace image or a custom image sourced from a Marketplace image with a plan. Default is {}.
All three fields “name”, “publisher”, “product” must be passed in the dictionary if set. For e.g.
{"name": "ngc-base-version-21-02-2", "publisher": "nvidia","product": "ngc_azure_17_11"}
- subscription_id: str (optional)
The ID of the Azure Subscription to create the virtual machines in. If not specified, then dask-cloudprovider will attempt to use the configured default for the Azure CLI. List your subscriptions with
az account list
.- extra_vm_options: dict[str, Any]:
Additional arguments to provide to Azure’s
VirtualMachinesOperations.begin_create_or_update
when creating the scheduler and worker VMs.
Examples
Minimal example
Create the cluster
>>> from dask_cloudprovider.azure import AzureVMCluster >>> cluster = AzureVMCluster(resource_group="<resource group>", ... vnet="<vnet>", ... security_group="<security group>", ... n_workers=1) Creating scheduler instance Assigned public IP Network interface ready Creating VM Created VM dask-5648cc8b-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance Network interface ready Creating VM Created VM dask-5648cc8b-worker-e1ebfc0e
Connect a client.
>>> from dask.distributed import Client >>> client = Client(cluster)
Do some work.
>>> import dask.array as da >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) >>> arr.mean().compute() 0.5004117488368686
Close the cluster.
>>> client.close() >>> cluster.close() Terminated VM dask-5648cc8b-worker-e1ebfc0e Removed disks for VM dask-5648cc8b-worker-e1ebfc0e Deleted network interface Terminated VM dask-5648cc8b-scheduler Removed disks for VM dask-5648cc8b-scheduler Deleted network interface Unassigned public IP
You can also do this all in one go with context managers to ensure the cluster is created and cleaned up.
>>> with AzureVMCluster(resource_group="<resource group>", ... vnet="<vnet>", ... security_group="<security group>", ... n_workers=1) as cluster: ... with Client(cluster) as client: ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) Creating scheduler instance Assigned public IP Network interface ready Creating VM Created VM dask-1e6dac4e-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance Network interface ready Creating VM Created VM dask-1e6dac4e-worker-c7c4ca23 0.4996427609642539 Terminated VM dask-1e6dac4e-worker-c7c4ca23 Removed disks for VM dask-1e6dac4e-worker-c7c4ca23 Deleted network interface Terminated VM dask-1e6dac4e-scheduler Removed disks for VM dask-1e6dac4e-scheduler Deleted network interface Unassigned public IP
RAPIDS example
You can also use
AzureVMCluster
to run a GPU enabled cluster and leverage the RAPIDS accelerated libraries.>>> cluster = AzureVMCluster(resource_group="<resource group>", ... vnet="<vnet>", ... security_group="<security group>", ... n_workers=1, ... vm_size="Standard_NC12s_v3", # Or any NVIDIA GPU enabled size ... docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.9", ... worker_class="dask_cuda.CUDAWorker") >>> from dask.distributed import Client >>> client = Client(cluster)
Run some GPU code.
>>> def get_gpu_model(): ... import pynvml ... pynvml.nvmlInit() ... return pynvml.nvmlDeviceGetName(pynvml.nvmlDeviceGetHandleByIndex(0))
>>> client.submit(get_gpu_model).result() b'Tesla V100-PCIE-16GB'
Close the cluster.
>>> client.close() >>> cluster.close()
- Attributes
asynchronous
Are we running in the event loop?
- auto_shutdown
- bootstrap
- called_from_running_loop
- command
- dashboard_link
- docker_image
- gpu_instance
- loop
- name
- observed
- plan
- requested
- scheduler_address
- scheduler_class
- worker_class
Methods
adapt
([Adaptive, minimum, maximum, ...])Turn on adaptivity
call_async
(f, *args, **kwargs)Run a blocking function in a thread as a coroutine.
from_name
(name)Create an instance of this class to represent an existing cluster by name.
get_client
()Return client for the cluster
get_logs
([cluster, scheduler, workers])Return logs for the cluster, scheduler and workers
get_tags
()Generate tags to be applied to all resources.
new_worker_spec
()Return name and spec for the next worker
scale
([n, memory, cores])Scale cluster to n workers
scale_up
([n, memory, cores])Scale cluster to n workers
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
wait_for_workers
(n_workers[, timeout])Blocking call to wait for n workers before continuing
close
get_cloud_init
logs
render_cloud_init
render_process_cloud_init
scale_down
Azure Spot Instance Plugin¶
- class dask_cloudprovider.azure.AzurePreemptibleWorkerPlugin(poll_interval_s=1, metadata_url=None, termination_events=None, termination_offset_minutes=0)[source]¶
A worker plugin for azure spot instances
This worker plugin will poll azure’s metadata service for preemption notifications. When a node is preempted, the plugin will attempt to shutdown gracefully all workers on the node.
This plugin can be used on any worker running on azure spot instances, not just the ones created by
dask-cloudprovider
.For more details on azure spot instances see: https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events
- Parameters
- poll_interval_s: int (optional)
The rate at which the plugin will poll the metadata service in seconds.
Defaults to
1
- metadata_url: str (optional)
The url of the metadata service to poll.
Defaults to “http://169.254.169.254/metadata/scheduledevents?api-version=2019-08-01”
- termination_events: List[str] (optional)
The type of events that will trigger the gracefull shutdown
Defaults to
['Preempt', 'Terminate']
- termination_offset_minutes: int (optional)
Extra offset to apply to the premption date. This may be negative, to start the gracefull shutdown before the
NotBefore
date. It can also be positive, to start the shutdown after theNotBefore
date, but this is at your own risk.Defaults to
0
Examples
Let’s say you have cluster and a client instance. For example using
dask_kubernetes.KubeCluster
>>> from dask_kubernetes import KubeCluster >>> from distributed import Client >>> cluster = KubeCluster() >>> client = Client(cluster)
You can add the worker plugin using the following:
>>> from dask_cloudprovider.azure import AzurePreemptibleWorkerPlugin >>> client.register_worker_plugin(AzurePreemptibleWorkerPlugin())
Methods
setup
(worker)Run when the plugin is attached to a worker.
teardown
(worker)Run when the worker to which the plugin is attached is closed, or when the plugin is removed.
transition
(key, start, finish, **kwargs)Throughout the lifecycle of a task (see Worker State), Workers are instructed by the scheduler to compute certain tasks, resulting in transitions in the state of each task.
poll_status