跳转至

Intercloud Broker

We now present an intercloud broker that targets batch applications. We first review the requirements of such a broker, and then propose an architecture. Finally, we describe our implementation of the resulting design, called SkyPilot.

Requirements

alt text

Cataloging cloud services and instances. There is a huge and growing number of services, instances, and locations across clouds. As shown in Table 1, the top three public clouds alone provide hundreds of compute VM types in dozens of regions across the globe. Even for a simple request of a 4vCPU VM in the “compute-optimized” family—advertised by all three clouds—there are at least 90 choices within the US in terms of region and VM type. Furthermore, each cloud has hundreds of software services (e.g., hosted Kubernetes/Spark, blob storage, SQL databases) to choose from. This is clearly beyond what can be navigated manually by ordinary users.

云服务与实例的分类. 云中的服务、实例和位置数量庞大且持续增长。如表 1 所示,仅前三大公共云就提供了数百种计算虚拟机类型,覆盖全球多个区域。即使对于一个简单的请求,例如在“计算优化”系列中选择一个 4vCPU 的虚拟机类型 —— 这一类型由三大云服务商都提供 —— 仅在美国地区,就至少有 90 种关于区域和虚拟机类型的选择。此外,每个云平台都提供数百种软件服务(如托管的 Kubernetes/Spark、Blob 存储、SQL 数据库)供用户选择。显然,这已远超普通用户手动操作的能力范围。

To provide the automatic placement of jobs, the broker must catalog the variety of instances and services, the APIs to invoke these services, and the subset of clouds and regions where these offerings are available.

为了自动安排任务的部署,代理程序必须对各种实例和服务、调用这些服务的 API 以及提供这些服务的云平台和区域进行分类。

Even after they have been cataloged, these many options are hard to navigate. Thus, the broker should expose filters on common attributes to applications so that they can easily narrow down the many options across clouds. For compute instances, filters may include the number of vCPUs, RAM, and accelerator types. For managed services (e.g., hosted analytics), filters may include the service or the package version (e.g., AWS EMR 6.5, or Apache Spark 3.1.2). Moreover, the broker should allow an application to choose specific services or instances supported only by one cloud.

即使完成分类,这些众多选项仍然难以导航。因此,代理程序应向应用程序提供基于常见属性的筛选功能,以便用户轻松缩小跨云平台的众多选项范围。

  • 对于计算实例,筛选条件可以包括 vCPU 数量、内存 (RAM) 大小和加速器类型等。
  • 对于托管服务(如托管的分析服务),筛选条件可以包括服务类型或软件包版本(例如 AWS EMR 6.5 或 Apache Spark 3.1.2)。

此外,代理程序还应允许应用程序选择仅由某一云平台支持的特定服务或实例。

Tracking pricing and dynamic availability. The price and availability of resources can vary dramatically across clouds and even regions or zones in the same cloud, often, but not always, following a diurnal pattern [73]. The variations are especially acute for scarce resources (§5.4), such as GPUs or preemptible spot instances that many applications use due to their lower costs, and change over time.

价格与动态可用性的跟踪. 资源的价格和可用性在不同云平台甚至同一云中的不同区域或可用区之间可能会有显著差异,通常(但并非总是)遵循昼夜周期变化的模式 [73]。这种波动在稀缺资源(§5.4)中尤为明显,如 GPU 或抢占式的 Spot 实例,许多应用程序由于其较低的成本而使用这些资源,但其可用性会随时间发生变化。

To illustrate the potential changes in resource availability, consider a real user’s application: a bioinformatics task running for 8 days on 24 spot VMs on GCP (see §5.2 for more detail). When a VM is preempted, it waits for another spot VM to become available. Figure 2 shows the cumulative number of preemptions over time. Note that preemptions happened every day and at unpredictably different rates (e.g., compare day 3–4 vs. day 4–5). The application experienced 319 preemptions, a preemption every 36 minutes on average.

为了说明资源可用性可能发生的变化,可以考虑一个真实用户的应用:一个运行 8 天的生物信息学任务,使用了 GCP 上的 24 个 Spot 虚拟机(参见 §5.2 了解更多细节)。当某个虚拟机被抢占时,它会等待另一个 Spot 虚拟机变得可用。图 2 显示了随着时间推移的累积抢占次数。需要注意的是,抢占每天都会发生,且发生率无法预测(例如,对比第 3-4 天与第 4-5 天)。该应用程序共经历了 319 次抢占,平均每 36 分钟就发生一次抢占。

Thus, the broker should track the availability and pricing to provide applications with the best choices at run time. One challenge is that clouds do not publish availability information explicitly. The broker may have to learn about availability implicitly by observing preemptions or allocation failures of both on-demand and spot resources in different locations.

因此,代理程序应跟踪资源的可用性和价格,以便在运行时为应用程序提供最佳选择。一个挑战是云平台并不会明确公布可用性信息。代理程序可能需要通过观察不同位置的按需资源和 Spot 资源的抢占或分配失败,间接了解资源的可用性。

alt text

Note
  • 代理程序应跟踪资源的可用性和价格,以便在运行时为应用程序提供最佳选择
  • 挑战是云平台并不会明确公布可用性信息。代理程序可能需要通过观察不同位置的按需资源和 Spot 资源的抢占或分配失败,间接了解资源的可用性

Dynamic optimization. Recall that the goal of the broker is to meet the application’s cost and performance requirements under various constraints, such as data residency. This means the broker should choose the types of instances or services, clouds, and locations to run the tasks in the application DAG. This is a challenging optimization problem because of (1) the sheer number of choices (Table 1), (2) DAG topologies becoming complex (Figure 10), and (3) the unpredictable resource availability and price changes during the application’s provisioning or run time (Figure 2).

动态优化. 回顾一下,代理程序的目标是在各种约束条件下(如数据驻留要求)满足应用程序的成本和性能需求。这意味着代理程序需要选择实例类型或服务、云平台和运行应用程序任务的具体位置。

由于以下原因,这个优化问题具有挑战性:

(1) 选择数量庞大(如表 1 所示)

(2) DAG 拓扑结构变得复杂(如图 10 所示)

(3) 应用程序配置或运行期间资源可用性和价格变化无法预测(如图 2 所示)

As a result, the broker should implement a dynamic optimizer that can reflect the current resource availability and prices, and quickly find an optimal execution plan out of the large search space. To use up-to-date prices, the broker needs to compute the execution plan whenever an application starts. In addition, when a task in an application DAG cannot run as the broker originally planned due to availability changes, the broker needs to generate a new execution plan by re-optimization during the application’s run time.

因此,代理程序应实施一个动态优化器,以反映当前的资源可用性和价格,并能够在庞大的搜索空间中快速找到最佳执行计划。为了使用最新的价格,代理程序需要在每次应用程序启动时计算执行计划。此外,当应用程序 DAG 中的任务由于可用性变化无法按原计划运行时,代理程序需要通过在应用程序运行期间重新优化生成新的执行计划。

Dynamic optimizer
  1. 代理程序应实施一个动态优化器,以反映当前的资源可用性和价格
  2. 动态优化器必须做到:在庞大的搜索空间中快速找到最佳执行解(计划)
  3. 在应用执行过程中,如果任务由于可用性变化无法按原计划执行,Broker需要在程序运行时自动实现:重新优化生成新的执行计划

Managing resources and applications. Once the optimizer decides the placement of an application, the broker must provision the resources and free them when the application terminates. This involves starting and reliably shutting down instances on various clouds, or creating and terminating services (e.g., sending requests to a hosted service like AWS EMR). While these lifecycle operations may seem straightforward, bugs or failures can easily lead to inconsistencies between the broker state and the cloud provider state (e.g., leaking instances or intermediate data), which can be costly.

资源与应用管理. 一旦优化器决定了应用程序的部署,代理程序必须分配资源,并在应用程序结束时释放这些资源。这涉及在各种云平台上启动并可靠地关闭实例,或者创建和终止服务(例如向托管服务如 AWS EMR 发送请求)。虽然这些生命周期操作看似简单,但错误或故障可能容易导致代理程序状态与云服务商状态之间的不一致(例如实例或中间数据的泄露),这可能会带来高昂的成本。

In addition, the broker must manage the execution of the application, i.e., start an application’s task when its inputs are available, possibly restart it in case of failures or preemptions, and move the task’s inputs across clouds/regions, if remote.

此外,代理程序还必须管理应用程序的执行,即在任务的输入可用时启动任务,可能在失败或抢占的情况下重新启动任务,并在任务的输入跨云或跨区域时移动数据。

Note

优化器:负责实现应用程序的部署

代理器:

  1. 程序启动时分配资源,结束时释放资源
  2. 应用程序的执行
    • 启动任务
    • 在失败和抢占情况下,重启任务
    • 在跨云/同云内跨区域的任务需求中,移动数据

Architecture

Given these requirements, we propose an intercloud broker architecture consisting of the following components (Figure 3).

alt text

Catalog. The catalog records the instances and services available in each cloud, detailed locations that offer them, and the APIs to allocate, shut down, and access them. It also stores the long-term prices for on-demand VMs, data storage, egress, and services (typically these prices do not change for months). The catalog can provide filtering and searching functionalities. The catalog can be based on information published by the clouds, listed by a third party, or collected by the broker.

目录。目录记录了每个云平台上可用的实例和服务、提供这些实例和服务的具体位置,以及用于分配、关闭和访问这些实例和服务的 API。它还存储了按需虚拟机、数据存储、出口流量和服务的长期价格(通常这些价格几个月内不会变化)。目录可以提供过滤和搜索功能。目录可以基于云平台发布的信息、第三方列出的数据或代理程序自行收集的信息构建。

Tracker. This component tracks spot prices (which can change more frequently, e.g., hourly or daily) as well as resource availability across clouds and their locations.

追踪器。该组件跟踪 Spot 价格(这些价格可能更频繁地变化,例如每小时或每天)以及各云平台及其位置的资源可用性。

Optimizer. The optimizer takes as inputs (1) the application’s DAG and its requirements, and (2) the instance and service availability as well as their prices provided by the catalog and tracker, and then computes an optimal placement of the tasks. Upon resource availability and price changes, the optimizer may perform re-optimization.

优化器

优化器以 (1) 应用程序的 DAG 及其需求;以及 (2) 目录和追踪器提供的实例和服务的可用性及价格为输入,然后计算出任务的最优部署方案。

在资源可用性和价格发生变化时,优化器可能会重新进行优化。

Provisioner. This component manages resources (§4.1) by allocating the resources required to run the execution plan provided by the optimizer, and freeing them when each task exits. To handle unpredictable capacity and user quota errors, the provisioner implements automatic failover, where it asks the optimizer for a new placement plan if the provision fails. Failures are also reported to the tracker.

资源管理器。该组件通过分配执行优化器提供的执行计划所需的资源并在每个任务退出时释放这些资源来管理资源 (§4.1)。为应对不可预测的容量和用户配额错误,资源管理器实现了自动故障转移机制,即在分配失败时请求优化器提供新的部署方案。故障也会被报告给追踪器。

Executor. The executor manages the application (§4.1) by packaging each application’s tasks and running them on the resources allocated by the provisioner.

执行器。执行器通过打包每个应用程序的任务并在资源管理器分配的资源上运行这些任务来管理应用程序 (§4.1)。

alt text

Architecture

The SkyPilot Architecture is of great significance here !!!

We need to spare some time to understand it :)

In the future, we imagine intercloud brokers will offer more sophisticated services such as troubleshooting across clouds, providing more detailed performance measurements for specific applications on each cloud, the equivalent of spot-pricing but across clouds, reselling services at lower than listed prices (similar to the travel industry), and advanced configuration features for security and/or networking.

未来,我们设想跨云代理将提供更复杂的服务,例如跨云故障排除、为特定应用程序提供更详细的性能测量、跨云的现货定价等效服务、以低于标价转售服务(类似于旅游业),以及提供安全和/或网络方面的高级配置功能。

Furthermore, we expect a commercial broker to provide billing support to enable a user to have a single account with the provider of the intercloud broker, which then pays for the services rendered by each cloud on behalf of the user, and charges the user back. In our current deployment, our users have direct accounts with the three major clouds, so this functionality is not needed.

此外,我们预计商业代理将提供计费支持,用户只需在跨云代理提供商处开设一个账户,代理商将代表用户支付每个云服务的费用,并向用户收取相应的费用。在我们当前的部署中,用户直接在三大云服务商处开设账户,因此此功能尚不需要。

未来展望

这个设想描述了未来跨云代理可能提供的一些高级功能和服务。让我来解释一下这些概念:

  1. 跨云故障排除:
    • 这意味着代理服务能够在多个云平台之间诊断和解决问题,使得在复杂的多云环境中更容易找出并修复故障。
  2. 为特定应用程序提供更详细的性能测量:
    • 代理服务可以针对特定应用提供更精细和深入的性能分析,帮助用户优化应用在不同云平台上的运行效果。
  3. 跨云的现货定价等效服务:
    • 类似于云平台提供的现货实例,代理服务可能会在多个云平台间寻找最优惠的价格,为用户提供类似的省钱机会。
  4. 以低于标价转售服务:
    • 代理可能会批量购买云资源,然后以略低于原价的价格转售给用户,类似于旅游业中的打包服务
  5. 提供安全和/或网络方面的高级配置功能:
    • 代理服务可能会提供跨多个云平台的统一安全策略和网络配置,简化多云环境的管理复杂度。

总的来说,这个设想描绘了一个更加智能和功能丰富的跨云代理服务,它不仅仅是简单地转发请求,而是能够提供更多增值服务,帮助用户更好地管理和优化他们的多云环境。这种服务可以大大简化多云策略的实施难度,提高资源利用效率,并可能带来成本节约。

SkyPilot: An Implementation

We have implemented SkyPilot, which follows the architecture described in §4.2 with one difference: instead of implementing the tracker as a centralized component, SkyPilot distributes it between the catalog that refreshes prices daily, and the provisioner that tracks and caches provisioning failures.

SkyPilot is written in ≈ 21,000 lines of Python code, and has involved several person-years so far. It currently supports AWS, Azure, and GCP. It is being used by users from 3 universities and 4 other organizations; we report our deployment experience in §6. Next, we first describe SkyPilot in detail, then discuss the services in the compatibility set it uses.

我们实现了 SkyPilot,其架构遵循 §4.2 中描述的架构,但有一个不同之处:SkyPilot 将追踪器分布到每天刷新价格的目录和跟踪及缓存资源配置失败的供应器之间,而不是将其实现为一个集中式组件。

SkyPilot 使用了大约 21,000 行 Python 代码编写,至今已经投入了数年的人力。它目前支持 AWS、Azure 和 GCP,并且正在被来自 3 所大学和 4 个其他组织的用户使用。我们在 §6 中报告了我们的部署经验。接下来,我们首先详细描述 SkyPilot,然后讨论它所使用的兼容性集中的服务。

Application API. As mentioned earlier, an application is specified as a DAG of coarse-grained tasks. Example tasks include a Spark job to process data, a Horovod [79] job to train a model, or an MPI job for HPC computations. A task starts when all of the tasks that provide its inputs have finished. Each task is self-contained and includes its executable and all library dependencies (e.g., packaged as a Docker image).

应用程序 API。如前所述,应用程序被指定为由粗粒度任务组成的 DAG(有向无环图)。示例任务包括用于处理数据的 Spark 作业、用于训练模型的 Horovod [79] 作业或用于高性能计算的 MPI 作业。一个任务会在所有为其提供输入的任务完成后开始。每个任务都是自包含的,并包含其可执行文件和所有库依赖项(例如,打包为 Docker 镜像)。

Task
  1. 应用程序是由 粗粒度任务 组成的 DAG(有向无环图)
  2. 任务是 自包含的,包含其可执行文件和所有库依赖项(例如,打包为 Docker 镜像)
  3. 一个任务会在 所有前驱依赖任务 均完成后,才开始

A task specifies its input and output locations in the form of cloud object store URIs. Optionally, a task can provide the size estimates of its inputs and outputs to help the optimizer estimate the cost of data transfers across clouds.

任务以云对象存储 URI 的形式指定其输入和输出位置。任务可以选择提供其输入和输出的大小估算,以帮助优化器估计跨云数据传输的成本。

Each task specifies the resources it requires. For flexibility, resources are encoded as labels, such as “cpu: 4” or “accelerator: nvidia-v100”, an idea we borrow from cluster managers such as Borg [85], Mesos [63], and Condor [82]. The optimizer uses these resource labels to search the service catalog for a set of feasible candidates for each task. If desired, the user can short-circuit the optimizer’s selection by explicitly specifying a cloud and an instance type.

每个任务都指定其所需的资源。为了灵活性,资源被编码为标签,例如 “cpu: 4” 或 “accelerator: nvidia-v100”,这是我们从集群管理器(如 Borg [85]、Mesos [63] 和 Condor [82])中借鉴的思想。优化器使用这些资源标签在服务目录中为每个任务搜索一组可行的候选项。如果用户愿意,可以通过明确指定云和实例类型来跳过优化器的选择。

The user optionally specifies the number of instances for each task by a “num_nodes: n” label, which defaults to 1. Since we target coarse-grained batch jobs, our users have not found this a burden. In the future, we plan to support autoscaling or intelligently picking the number of instances [54,84].

用户可以通过 “num_nodes: n” 标签选择性地指定每个任务的实例数量,默认值为 1。由于我们主要处理粗粒度的批处理作业,因此用户并不认为这是一个负担。未来,我们计划支持自动扩展或智能选择实例数量 [54,84]。

Resource for each task
  1. 每个任务都指定其所需的资源。为了灵活性,资源被编码为标签,例如 “cpu: 4” 或 “accelerator: nvidia-v100”
  2. 用户可以通过 “num_nodes: n” 标签选择性地指定每个任务的实例数量,默认值为 1 (明确指定云和实例类型来跳过优化器的选择)
  3. 优化器使用这些资源标签在服务目录中为每个任务搜索一组可行的候选项

Finally, the user supplies an optional time estimator for each task, which estimates how long it will run on each specified resource. These estimates are used by the optimizer for planning the DAG. The user could determine these estimates by benchmarking the task on different configurations. If a time estimator is unspecified for a task, currently the optimizer defaults to the heuristic of choosing the resource with the lowest hourly price.5

最后,用户可以为每个任务提供一个可选的时间估算器,该估算器估计任务在每个指定资源上的运行时间。优化器使用这些估算值来规划 DAG。用户可以通过在不同配置上对任务进行基准测试来确定这些估算值。如果任务未指定时间估算器,当前优化器默认采用选择每小时价格最低的资源的启发式方法。

Optional time estimator for each task
  1. 用户可以为每个任务提供一个可选的时间估算器,该估算器估计任务在每个指定资源上的运行时间
  2. 如任务未指定时间估算器,当前优化器默认采用选择每小时价格最低的资源的启发式方法

alt text

Example. Listing 1 shows an application consisting of two tasks. The train task trains a model. It reads the input data from S3 and writes the output (the trained model) to the object store of the cloud it is assigned to run on, which is determined by the optimizer. By using Resources, a dictionary of resource labels, the user specifies that this training task requires either an nvidia-v100 accelerator or a google-tpuv3-8 accelerator with 4 host vCPUs. The user also provides a train_time_estimator_fn lambda that estimates the task’s run time on these two accelerators. For example, one can compute a rough estimate by dividing the total number of floating operations required for training the model by the accelerator’s performance in FLOPS (floating point operations per second), or use a more accurate benchmarking-based predictor.

示例。清单 1 显示了由两个任务组成的应用程序。train 任务训练模型。它从 S3 读取输入数据,并将输出(训练好的模型)写入它被分配运行的云的对象存储,分配由优化器决定。通过使用资源标签的字典 Resources,用户指定该训练任务需要 nvidia-v100 加速器或 google-tpuv3-8 加速器,以及 4 个主机 vCPU。用户还提供了一个 train_time_estimator_fn lambda 函数,用于估算任务在这两种加速器上的运行时间。例如,可以通过将训练模型所需的浮点操作总数除以加速器的 FLOPS(每秒浮点操作数)来粗略估算,或使用更准确的基于基准测试的预测器。

The infer task performs model serving. It takes the trained model as input (set_input(train.output(0))). The Airflowlike statement, train >> infer, enforces this dependency. These two tasks are encapsulated in a Dag object. The DAG is passed to the optimizer to output an execution plan, which is then passed to the provisioner and the executor.

infer 任务执行模型推理。它将训练好的模型作为输入(set_input(train.output(0)))。类似于 Airflow 的语句 train >> infer 强制执行此依赖关系。这两个任务被封装在一个 Dag 对象中。该 DAG 被传递给优化器以生成执行计划,执行计划随后被传递给资源供应器和执行器。

Figure 4a visualizes the DAG. (I/O data are task attributes and not nodes in the DAG; we show them for clarity.) While simple, this basic API already exposes many degrees of freedom. For example, while train’s input is on S3, the optimizer may choose to assign the task to a different cloud. In doing so, the optimizer must take into account the possible transfer costs, while satisfying the task’s requirements.

图 4a 可视化了 DAG。(I/O 数据是任务属性,而不是 DAG 中的节点;我们为清晰起见展示了它们。)虽然简单,但这个基础 API 已经显示出许多自由度。例如,尽管 train 的输入位于 S3 上,优化器可能会选择将任务分配到不同的云。在这样做时,优化器必须考虑可能的传输成本,同时满足任务的要求。

For convenience, SkyPilot also offers a YAML interface to specify an application in addition to the programmatic API.

为方便起见,SkyPilot 还提供了一个 YAML 接口,以便除了编程 API 之外,还可以用来指定应用程序。

Catalog. SkyPilot implements a simple catalog to support three services (IaaS, object stores, managed analytics) on AWS, Azure, and GCP. These offerings are sufficient for our target workloads. We use the clouds’ public APIs to obtain details about these offerings. Pricing is refreshed periodically.

目录。SkyPilot 实现了一个简单的目录,用于支持 AWS、Azure 和 GCP 上的三种服务(IaaS、对象存储、托管分析)。这些服务足以满足我们目标工作负载的需求。我们通过各云提供的公共 API 获取有关这些服务的详细信息。定价会定期刷新。

Optimizer. The optimizer assigns each task to a cloud, location, and hardware configuration to best satisfy the user’s requirements, e.g., minimize the total cost or time. It achieves this by filtering the offerings in the service catalog and solving an integer linear program (ILP) to pick an optimal assignment.

Warning

这一段原文符号太多,懒得整理,直接放中文

优化器。优化器将每个任务分配到云、位置和硬件配置,以最好地满足用户的需求,例如最小化总成本或时间。它通过筛选服务目录中的服务选项,并解决整数线性规划(ILP)来选择最优分配。

在实际优化之前,优化器首先将高级资源需求转换为一组可行的配置,即 〈 云,区域,实例类型 〉 元组,这些配置可用于运行每个任务。我们将这种配置称为集群。

例如,Resources(accelerator='nvidia-v100') 可以映射到 AWS 实例的集群 〈 AWS, us-west-2a, p3.2x 〉 或 Azure 实例的集群 〈 Azure, westus2-1, NC6s_v3 〉。为了完成这种转换,优化器会筛选服务目录中的服务选项,检查它们是否满足每个任务所需的资源。然后每个任务都会标注上可行集群的列表。

优化器在区域级别而不是区域群级别计算执行计划。这是因为即使在同一区域,不同的区域也可能有不同的实例类型和价格,且区域间的数据传输并不是免费的

基于 ILP 的优化。考虑一个具有 N 个任务的 DAG,每个任务有 C 个可行的集群。因为 C 通常在 10 个左右,最多可达数百个,因此,简单地枚举所有 C^N 个可能的分配方案,即使对于适度的 N 值,也是不可行的。为了解决这个问题,我们将分配问题表述为 0-1 ILP(整数线性规划)。

SkyPilot supports two types of optimization objectives: either total running cost or end-to-end run time. Our ILP formulation is inspired by Alpa [94], but we additionally consider the parallelism between tasks that do not have dependency on each other. This is critical for minimizing the DAG run time.

SkyPilot支持两种优化目标:总运行成本或端到端运行时间。我们的整数线性规划(ILP)公式受到Alpa的启发,但我们还额外考虑了没有相互依赖关系的任务之间的并行性。这对于最小化有向无环图(DAG)的运行时间至关重要。

Given a DAG \((V,E)\) where \(V\) is the set of the tasks and \(E\) is the set of the edges representing the data dependencies between the tasks, our goal is to find an optimal mapping from each task in \(V\) to one of its annotated feasible clusters. For each task \(v \in V\), we denote the set of the feasible clusters by \(C_v\). Then we use a task time estimator to obtain a time vector \(t_v \in \mathbb{R}^{|C_v|}\), where each element is the time estimate for running task \(v\) on a cluster in \(C_v\). The time estimator can be either provided by the user or set to a default value of 1 hour. In addition, we get a cost vector \(c_v \in \mathbb{R}^{|C_v|}\) by multiplying \(t_v\) by the hourly price of each cluster. To account for the data transfer overhead between two tasks \((u,v) \in E\), we define a matrix \(P_{uv} \in \mathbb{R}^{|C_u| \times |C_v|}\) whose \((i,j)\) element is the data transfer time when the parent task \(u\) is mapped to the \(i\)-th cluster of \(C_u\) and the child task \(v\) is mapped to the \(j\)-th cluster of \(C_v\). Similarly, we define \(Q_{uv} \in \mathbb{R}^{|C_u| \times |C_v|}\) for the data transfer cost between \(u\) and \(v\).

给定一个DAG \((V,E)\),其中\(V\)是任务集合,\(E\)是表示任务之间数据依赖关系的边集合,我们的目标是找到一个最优映射,将\(V\)中的每个任务映射到其注释的可行集群之一。对于每个任务\(v \in V\),我们用\(C_v\)表示可行集群的集合。然后,我们使用任务时间估算器获得一个时间向量\(t_v \in \mathbb{R}^{|C_v|}\),其中每个元素是在\(C_v\)中的一个集群上运行任务\(v\)的时间估计。时间估算器可以由用户提供,或设置为默认值1小时。此外,我们通过将\(t_v\)乘以每个集群的小时价格得到一个成本向量\(c_v \in \mathbb{R}^{|C_v|}\)。为了考虑两个任务\((u,v) \in E\)之间的数据传输开销,我们定义一个矩阵\(P_{uv} \in \mathbb{R}^{|C_u| \times |C_v|}\),其\((i,j)\)元素是当父任务\(u\)映射到\(C_u\)的第\(i\)个集群,子任务\(v\)映射到\(C_v\)的第\(j\)个集群时的数据传输时间。类似地,我们定义\(Q_{uv} \in \mathbb{R}^{|C_u| \times |C_v|}\)来表示\(u\)\(v\)之间的数据传输成本。

When minimizing the total cost, we have:

\[ \min_s \sum_{v \in V} s_v^T c_v + \sum_{(u,v) \in E} s_u^T Q_{uv} s_v \quad (1) \]

computation cost      data transfer cost

where \(s_v \in \{0,1\}^{|C_v|}\) is a one-hot vector that selects a cluster from \(C_v\). The objective explicitly considers the two types of cost: the first term represents the total cost spent in executing all tasks on the selected clusters, while the second term represents the total data transfer cost. After we linearize [61] the second term, we get a 0-1 ILP, which SkyPilot solves using an off-the-shelf solver, CBC [60].

Similarly, when minimizing the end-to-end time, we have:

\[ \min_s f_{\text{sink}} \quad (2) \]
\[ \text{s.t.} \quad f_v \geq f_u + s_u^T P_{uv} s_v + s_v^T t_v \quad \forall (u,v) \in E \quad (3) \]

parent     data transfer    computation finish time     time              time

where \(s_v \in \{0,1\}^{|C_v|}\) is the one-hot decision vector and \(f_v \in \mathbb{R}\) is the finish time of the task \(v\). The optimization constraint ensures that a task finishes no earlier than its parents, the input data arrive, and the task produces its outputs. Under these constraints, the running time of the DAG becomes the finish time of its sink. Again, as we can linearize the second term, this problem can be efficiently solved by 0-1 ILP solvers.

While we cover the two representative objectives above, our ILP formulation allows any combination of cost and time to be used for the optimization. For example, we can minimize the cost under a time budget (or vice versa), by augmenting Equation 1 with the constraint in Equation 3 and bounding \(f_{\text{sink}}\) by the time budget. Future work can incorporate carbon footprint of cloud regions [21] into placement decisions.

SkyPilot supports two types of optimization objectives: either total running cost or end-to-end run time. Our ILP formulation is inspired by Alpa [94], but we additionally consider the parallelism between tasks that do not have dependency on each other. This is critical for minimizing the DAG run time.

Given a DAG \((V,E)\) where \(V\) is the set of the tasks and \(E\) is the set of the edges representing the data dependencies between the tasks, our goal is to find an optimal mapping from each task in \(V\) to one of its annotated feasible clusters. For each task \(v \in V\), we denote the set of the feasible clusters by \(C_v\). Then we use a task time estimator to obtain a time vector \(t_v \in \mathbb{R}^{|C_v|}\), where each element is the time estimate for running task \(v\) on a cluster in \(C_v\). The time estimator can be either provided by the user or set to a default value of 1 hour. In addition, we get a cost vector \(c_v \in \mathbb{R}^{|C_v|}\) by multiplying \(t_v\) by the hourly price of each cluster. To account for the data transfer overhead between two tasks \((u,v) \in E\), we define a matrix \(P_{uv} \in \mathbb{R}^{|C_u| \times |C_v|}\) whose \((i,j)\) element is the data transfer time when the parent task \(u\) is mapped to the \(i\)-th cluster of \(C_u\) and the child task \(v\) is mapped to the \(j\)-th cluster of \(C_v\). Similarly, we define \(Q_{uv} \in \mathbb{R}^{|C_u| \times |C_v|}\) for the data transfer cost between \(u\) and \(v\).

当最小化总成本时,我们有:

\[ \min_s \sum_{v \in V} s_v^T c_v + \sum_{(u,v) \in E} s_u^T Q_{uv} s_v \quad (1) \]

计算成本      数据传输成本

其中 \(s_v \in \{0,1\}^{|C_v|}\) 是一个独热向量,用于从 \(C_v\) 中选择一个集群。目标函数明确考虑了两种类型的成本:第一项表示在选定集群上执行所有任务的总成本,而第二项表示总数据传输成本。在我们线性化第二项后,我们得到一个0-1整数线性规划(ILP),SkyPilot使用现成的求解器CBC 来解决。

类似地,当最小化端到端时间时,我们有:

\[ \min_s f_{\text{sink}} \quad (2) \]
\[ \text{s.t.} \quad f_v \geq f_u + s_u^T P_{uv} s_v + s_v^T t_v \quad \forall (u,v) \in E \quad (3) \]

父任务     数据传输    计算 完成时间     时间              时间

其中 \(s_v \in \{0,1\}^{|C_v|}\) 是独热决策向量,\(f_v \in \mathbb{R}\) 是任务 \(v\) 的完成时间。优化约束确保一个任务的完成时间不早于其父任务完成、输入数据到达和任务产生其输出的时间。在这些约束下,DAG的运行时间成为其汇点的完成时间。同样,由于我们可以线性化第二项,这个问题可以通过0-1 ILP求解器高效解决。

虽然我们涵盖了上述两个代表性目标,但我们的ILP公式允许使用任何成本和时间的组合进行优化。例如,我们可以通过在方程1中增加方程3的约束并用时间预算限制 \(f_{\text{sink}}\) 来在时间预算下最小化成本(反之亦然)。未来的工作可以将云区域的碳足迹纳入放置决策中。

alt text

Provisioner. SkyPilot implements a provisioner that reads the optimized plan and allocates a cluster for the next task ready to execute. As discussed, allocations may fail due to either insufficient capacity in a cloud’s location or insufficient quota of the user’s account. On such failures, the provisioner kicks off failover as follows. First, the failed location is temporarily blocked for the current allocation request with a timeto-live. Then, the optimizer is asked to re-optimize the DAG with this new constraint added. The provisioner then retries in the newly optimized location (another location of the same cloud or a different cloud). If all available locations fail to provide the resource, either an error is returned to the user or the provisioner can be configured to wait and retry in a loop.

资源供应器。SkyPilot 实现了一个资源供应器,它读取优化后的计划并为下一个准备执行的任务分配集群。如前所述,分配可能由于云的某个位置资源不足或用户账户的配额不足而失败。在这种情况下,资源供应器会启动故障转移机制。首先,失败的位置会被暂时屏蔽,并设定一个存活时间。然后,优化器会在加入新约束的情况下重新优化 DAG。资源供应器随后会在新优化的位置重试(同一云的另一个位置或不同的云)。如果所有可用的位置都无法提供资源,则会返回错误给用户,或者资源供应器可以配置为在循环中等待并重试。

We found failover to be especially valuable for scarce resources (e.g., large CPU or GPU VMs). For example, depending on request timing, it took 3–5 and 2–7 location attempts to allocate 8 V100 and 8 T4 GPUs on AWS, respectively.

我们发现故障转移对稀缺资源(如大型 CPU 或 GPU 虚拟机)特别有用。例如,视请求时机而定,在 AWS 上分配 8 个 V100 GPU 和 8 个 T4 GPU 分别需要 3-5 次和 2-7 次位置尝试。

没太看懂,整理一下

这段内容描述了SkyPilot系统中Provisioner(资源配置器)的工作机制,特别是在资源分配失败时的处理方法。我来解释一下主要内容:

  1. Provisioner的基本功能:

    • 读取优化后的计划
    • 为下一个准备执行的任务分配集群资源
  2. 资源分配可能失败的原因:

    • 云服务商在特定位置的容量不足
    • 用户账户的配额不足
  3. 失败后的故障转移(failover)处理:

    • 暂时阻止在失败的位置进行分配,设置一个生存时间(time-to-live)
      • 现在这块区域发生了故障,暂时需要“维修”,别人也不可以使用它
      • TTL:可以理解为“修复时间”
    • 要求优化器重新优化DAG(有向无环图,代表任务流程),加入新的约束条件
    • 在新优化的位置重试分配(可能是同一云服务商的其他位置,或不同的云服务商)
    • 如果所有可用位置都无法提供资源,要么返回错误给用户,要么配置Provisioner循环等待并重试
  4. 故障转移的价值:

    • 对于稀缺资源(如大型CPU或GPU虚拟机)特别有用
    • 举例:在AWS上分配8个V100 GPU需要3-5次尝试,分配8个T4 GPU需要2-7次尝试,具体取决于请求时机

这个机制提高了资源分配的成功率,特别是在处理稀缺或高需求资源时。它通过智能地在不同位置和云服务商之间切换,最大化了获取所需资源的机会。

Executor. After a cluster is provisioned, the executor orchestrates a task’s execution, e.g., setting up the task’s dependencies on the cluster, performing cross-cloud data transfers for the task’s inputs, and running the task (which can be a distributed program utilizing a multi-node cluster). We built an executor on top of Ray [71], a distributed framework that we use for intra-cluster task execution with fault tolerance support. Using Ray, rather than building a new execution engine, allowed us to focus on building the higher-level components new to the broker. For example, our executor implements a storage module that abstracts the object stores of AWS, Azure, and GCP and performs transfers. The executor also implements status tracking of task executions for resource management. On execution failures, the executor optionally exposes cluster handles to allow login and debugging.

执行器。在集群被分配后,执行器负责协调任务的执行,例如在集群上设置任务的依赖项、为任务的输入执行跨云数据传输,并运行任务(该任务可以是利用多节点集群的分布式程序)。我们基于 Ray [71] 构建了一个执行器,Ray 是一个分布式框架,我们用它来支持集群内任务执行,并提供容错功能。使用 Ray 而不是构建一个新的执行引擎,使我们能够专注于构建代理的新高级组件。例如,我们的执行器实现了一个存储模块,它抽象了 AWS、Azure 和 GCP 的对象存储并执行数据传输。执行器还实现了任务执行的状态跟踪,用于资源管理。在执行失败时,执行器可以选择公开集群句柄,允许用户登录并调试。

The executor interface is modular. We envision other executors will be added in the future, e.g., for Kubernetes [36]. In addition, while our system formulation is generic enough to support arbitrary DAGs, our implementation of the executor has focused on supporting pipelines (sequential DAGs).

执行器接口是模块化的。我们预见未来将添加其他执行器,例如 Kubernetes [36] 的执行器。此外,虽然我们的系统设计足够通用,支持任意 DAG,但我们对执行器的实现主要集中在支持流水线(顺序 DAG)。

没太看懂,整理一下

这段关于执行器(Executor)的内容描述了如何管理和执行在分布式集群上的任务,特别是使用一个分布式框架(Ray)来支持任务执行和容错。

    • 当一个集群被分配后,执行器负责协调任务的执行。具体来说,这包括:
      • 在集群上设置任务所需的依赖项(比如程序的库、工具等)。
      • 执行跨云的数据传输(如果任务需要从不同的云存储中获取数据)。
      • 运行任务(这个任务可能是分布式的,可以利用多个节点来执行)。

    集群的分配与任务执行的协调

    例子:假设你有一个机器学习模型训练任务,它需要用到在 AWS S3 上存储的数据,但是这个任务在 Azure 上运行。执行器负责将这些数据从 AWS S3 传输到 Azure 的计算节点,并确保任务所需的所有依赖项(例如特定的库或框架)都在该节点上设置好。

  1. 基于 Ray 的执行器

    • 执行器是基于一个分布式计算框架 Ray 构建的。Ray 是一个专门用于分布式计算的工具,可以在不同的集群节点之间进行任务调度并提供容错功能。
    • 使用 Ray 代替从头构建新的执行引擎,SkyPilot 团队可以将精力集中在构建代理需要的新高级组件上。

    例子:Ray 可以帮助你在多台机器上同时执行任务。如果一台机器突然失败,Ray 会自动处理这种故障,将任务转移到其他可用的机器上继续执行。

  2. 对象存储的抽象和数据传输

    • 执行器还实现了一个存储模块,它将不同云平台的对象存储(例如 AWS 的 S3、Azure 的 Blob Storage 和 GCP, Google Cloud Storage)进行抽象,这样用户不必直接处理每个平台的细微差别。
    • 这个模块会自动处理跨云的数据传输问题。

    例子:假设你有两个任务,一个在 AWS 上运行,另一个在 GCP 上运行。执行器会管理从 AWS S3 到 GCP 的数据传输,而你只需要指定数据的来源和目的地,不用关心底层如何传输。

  3. 任务状态跟踪与故障处理

    • 执行器实现了任务执行的状态跟踪,这样可以更好地管理资源。
    • 如果任务执行失败,执行器可以允许用户登录到相关的集群节点进行调试。

    例子:如果你的任务在执行过程中失败了,执行器可以提供故障日志并允许你直接连接到相关节点进行排查问题,类似于远程调试。

  4. 模块化接口

    • 执行器的设计是模块化的,这意味着未来可以扩展,支持不同的执行环境,比如 Kubernetes。
    • 虽然系统设计可以支持任意的DAG(有向无环图),当前的执行器实现更侧重于支持流水线式的 DAG(即任务按照顺序执行)。

    例子:假设你有一系列数据处理任务,需要按顺序执行。第一个任务从数据库中提取数据,第二个任务对数据进行处理,第三个任务将处理后的数据存储在云存储中。执行器会按照顺序执行这些任务,而你只需描述任务之间的依赖关系。

执行器是一个管理任务执行的组件,基于 Ray 框架,支持跨云数据传输和容错功能。它帮助用户管理复杂的分布式任务,而无需关注底层的细节,如数据传输、集群管理等。

Compatibility set. One of the distinguishing features of Sky is leveraging the already existing services and APIs across clouds (i.e., compatibility set; §2.3), rather than building uniform services and APIs across all clouds. However, a broker still needs to develop some glue-code to handle similar but not identical services supported by different clouds. The natural question is what is the effort to implement such glue-code? The answer for our applications so far is “minimal”.

兼容性集。Sky 的一个显著特点是利用各云中已经存在的服务和 API(即兼容性集;§2.3),而不是在所有云中构建统一的服务和 API。然而,代理仍然需要编写一些胶合代码,以处理不同云中支持的类似但不完全相同的服务。自然的问题是,实现这些胶合代码的工作量是多少?对于我们的应用程序而言,答案是“最小的”。

To manage clusters, SkyPilot uses Ray’s cluster launcher, which already supports AWS, GCP, and Azure. (Other frameworks could also be used, e.g., Terraform [51].) The main functionality we added is the control for automatic failover.

为了管理集群,SkyPilot 使用了 Ray 的集群启动器,该启动器已经支持 AWS、GCP 和 Azure。(也可以使用其他框架,例如 Terraform [51]。)我们添加的主要功能是自动故障转移的控制。

One of the most important components of any Sky application is storage. While the APIs provided by the object stores of the three major clouds are similar, they are not identical. Fortunately, all have libraries [20,30,46] exposing the POSIX interface, which allows us to mount different object stores as directories. Providing this functionality required only 400500 lines of code (LoC) per object store.

任何 Sky 应用程序中最重要的组件之一是存储。虽然三大云的对象存储提供的 API 相似,但并不完全相同。幸运的是,它们都有库 [20,30,46] 提供 POSIX 接口,使我们能够将不同的对象存储挂载为目录。实现这一功能,每个对象存储只需要 400-500 行代码。

Finally, for analytics applications we use high-level APIs, e.g., hosted analytics services provided by AWS (EMR) and GCP (Dataproc). Abstracting these services required us to implement just two methods: provisioning and termination. This involved only 200 LoC for EMR and Dataproc together.

最后,对于分析应用程序,我们使用高级 API,例如 AWS(EMR)和 GCP(Dataproc)提供的托管分析服务。抽象这些服务只需要实现两个方法:配置和终止。对于 EMR 和 Dataproc,总共只涉及 200 行代码。