NVIDIA 集合通信库(NCCL)提供了一套面向低延迟和高带宽通信的集合操作 API,支持 AI 工作负载从单台主机上的少量 GPU 扩展至数据中心内数千个 GPU 的大规模部署。本文将介绍 NCCL 支持运行时动态调整规模以优化资源成本的特性,以及通过动态剔除故障节点来减少服务中断、提升系统可靠性的机制。
借助 NCCL 实现可扩展的 AI
NCCL 于 2015 年推出,旨在通过多个 GPU 协同训练模型来加速 AI 训练。在随后的十年中,训练任务已扩展至数千个 GPU,模型的规模和复杂性也持续增长。
当前,训练和推理工作负载均依赖于结合数据并行、张量并行和专家并行的多GPU集群,以满足延迟和吞吐量的要求。NCCL集群持续作为这些并行策略的通信基础,在一个通信器内跨多个工作进程(称为rank)实现计算同步。
通常,深度学习框架在启动时会执行一次初始化步骤,以确定数据分片并为多个并行维度中的每个 GPU 分配特定任务。然而,随着模型规模不断扩大以及推理引擎对并行性需求的提升,在运行时动态调整资源分配成为一种颇具吸引力的方法,有助于显著减少操作的占用空间。
动态可扩展的推理引擎能够根据用户流量的变化,灵活分配额外的 GPU 资源并将任务分散到多个 GPU 上,以应对流量增长;在流量较低时,则可释放多余的 GPU,从而优化成本。这类属于预设的扩展操作,系统各组件均按设计正常运行。我们还将展示,这种模式在提升容错能力方面同样具有重要作用。
NCCL通信器如何实现应用程序的动态扩展
NCCL 通信器在很大程度上借鉴了 MPI 通信器的设计理念,但引入了若干重要差异和新概念,以支持动态应用程序的扩展。
- 应用程序可在运行期间的任意时刻,通过将
uniqueId传递给ncclCommInit来从头创建 NCCL 通信器。相比之下,MPI 在初始化阶段会创建一个名为MPI_COMM_WORLD的特殊通信器,其余所有通信器均通过MPI_Comm_split从中派生而来。 - NCCL 通信器可被配置为非阻塞模式,使其能够在后台异步执行初始化操作。
- 在 NCCL 中,应用程序可自主为通信器成员分配 rank,从而灵活优化通信器的拓扑结构。
创建通信器后,成员集合(rank)将被视为不可变。因此,执行纵向扩展操作的 NCCL 应用程序所经历的流程,与第二次初始化过程非常相似。系统会生成一个新的 uniqueId,并在所有参与的秩之间共享,将其传递给 ncclCommInit。经过优化的应用程序可启用非阻塞模式,使初始化操作在后台持续进行,同时继续利用原有的通信器处理请求,直至新的通信器准备就绪。
同样,可以使用 ncclCommInit 实现纵向缩减,或者应用程序可调用经过优化的 ncclCommShrink 方法。该方法通过复用旧通信程序的 rank 信息,有效缩短初始化时间。这一优化不仅对超大规模通信设备具有显著优势,还能为各种规模的系统提供简化的 API 支持。
容错 NCCL 应用
故障检测、识别与缓解涉及一个复杂的课题,贯穿从物理层到应用层的整个技术栈。如需深入了解故障处理及检查点恢复机制,请参阅确保在 NVIDIA DGX 云上进行可靠的模型训练。若要掌握 Dynamo 0.4 在可观察性与容错性方面的改进,请查阅 Dynamo 0.4 实现 4 倍性能提升、支持基于 SLO 的自动扩缩容及实时可观察性。
除了传统的检查点和负载均衡等故障缓解技术外,NCCL 通信器还支持在发生故障后动态调整规模,使应用程序能够在无需完全重启工作负载的情况下实现恢复。
部署推理工作负载(例如在 Kubernetes 上)的常用方法已具备重启并替换故障工作负载的机制,但应用程序仍需针对 NCCL 通信器启动相应的容错处理步骤。从包含部分秩(rank)发生故障的情况中恢复,过程类似于将故障秩从通信器中移除的缩容操作。
区别在于,即使是健康的秩,也应预期 NCCL 可能返回错误或在任何集合操作上挂起。典型的恢复过程始于在现有通信器上执行 ncclCommAbort,随后执行 ncclCommInit,与仍在运行的秩共同构建一个新的通信器。
NCCL 2.27 引入了 ncclCommShrink,可优化并简化恢复过程。当传入 NCCL_SHRINK_ABORT 标志及需排除的秩列表时,ncclCommShrink 将取消所有挂起的操作,并自动创建一个新的通信器,无需调用 ncclGetUniqueId 或 ncclCommInit。
动态扩展与容错应用示例
利用这些概念,您可以构建一个简单的 NCCL 应用示例,以响应来自框架的扩展请求。
#include <stdio.h>
#include <unistd.h>
#include <string>
#include <chrono>
#include <cstdlib>
#include <stdexcept>
#include <vector>
#include "nccl.h"
/* the various kinds of scaling this example supports: */
enum scalingRequestType { NONE, SCALING_NORMAL, SCALING_ABORT, SHRINK_NORMAL, SHRINK_ABORT };
/* Framework Functions: The specific details are not important, so
implementation is not included.*/
void frameworkGetInferenceWork(void **queries, enum scalingRequestType *scaling);
void frameworkNotifyTimeout();
void frameworkNotifyError();
void frameworkDetermineNewRank(int *rank, int *count);
void frameworkGetUniqueId(ncclUniqueId *uid);
void frameworkPutUniqueId(ncclUniqueId uid);
void frameworkGetExcludedRanks(std::vector<int> *excluded);
void exitAbort();
void exitCleanly();
/* Example placeholder function for main job of this worker. Assumes the need
to use a communicator to coordinate work across workers. */
void executePrefillAndDecode(ncclComm_t comm, void *queries);
/* forward declarations of scaleCommunicator and shrinkCommunicator which are
implemented below. These replace the comm with a new, resized communicator. */
void scaleCommunicator(ncclComm_t *comm, enum scalingRequestType *scaling);
void shrinkCommunicator(ncclComm_t *comm, enum scalingRequestType *scaling);
/* In this example, use C++ exception handling to exit from
executePrefillAndDecode so that the framework may react to an error. Use
multiple kinds of exceptions to separate various classes of errors. */
struct AppException : public std::runtime_error {
AppException(const std::string& message): std::runtime_error(message) {}
};
struct AppNCCLTimeoutException : public AppException {
AppNCCLTimeoutException(const std::string& message): AppException(message) {}
};
struct AppNCCLErrorException : public AppException {
AppNCCLErrorException(const std::string& message): AppException(message) {}
};
/* We use a custom NCCL_CHECK macro which raises a C++ exception unless the
operation returns ncclSuccess or ncclInProgress */
#define NCCL_CHECK(call) do { \
ncclResult_t result = call; \
if (result != ncclSuccess && result != ncclInProgress) { \
printf("NCCL error: %s at %s:%d\n", ncclGetErrorString(result), __FILE__, __LINE__); \
AppNCCLErrorException("NCCL Error"); \
} \
} while (0)
/* Define a custom NCCL_WAIT macro, which will wait for some fixed amount of
time before assuming something is wrong. */
#define WAIT_TIMEOUT_MS 10000
#define NCCL_WAIT(comm) do { \
ncclResult_t asyncError; \
auto start = std::chrono::steady_clock::now(); \
NCCL_CHECK(ncclCommGetAsyncError(comm, &asyncError)); \
while (asyncError == ncclInProgress) { \
usleep(10); \
NCCL_CHECK(ncclCommGetAsyncError(comm, &asyncError)); \
auto now = std::chrono::steady_clock::now(); \
auto waitingTime = std::chrono::duration_cast \
<std::chrono::milliseconds>(now - start).count(); \
if (WAIT_TIMEOUT_MS > waitingTime ) { \
throw AppNCCLTimeoutException("NCCL Timeout"); \
} \
} \
NCCL_CHECK(asyncError); \
} while (0)
/* Use ncclCommInitRankConfig to create a new communicator to replace the old
one. Optionally call ncclCommAbort. */
void scaleCommunicator(ncclComm_t *comm, int scalingFlag) {
int rank, rankCount;
ncclComm_t oldComm = *comm;
ncclComm_t newComm = NULL;
if (scalingFlag == SCALING_ABORT) {
/* The framework has indicated there was an error. ncclCommAbort will exit
any operation currently in progress, and destroy the communicator. */
NCCL_CHECK(ncclCommAbort(oldComm));
NCCL_WAIT(oldComm);
} else {
/* Normal condition: simply clean up the old communicator before creating a
new one.*/
NCCL_CHECK(ncclCommDestroy(oldComm));
}
/* enable non-blocking NCCL communicator so that we may detect and react to
timeouts. */
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
ncclUniqueId uniqueId;
config.blocking = 0;
/* ask the framework what rank we are to be assigned in the new communicator,
and how many ranks there will be total. These are required inputs to
ncclCommInit.*/
frameworkDetermineNewRank(&rank, &rankCount);
if (rank == 0) {
/* This worker is special: it will generate the ncclUniqueId, and share it
with other ranks. */
ncclGetUniqueId(&uniqueId);
frameworkPutUniqueId(uniqueId);
} else if (rank > 0) {
frameworkGetUniqueId(&uniqueId);
} else if (rank < 0) {
/* special value for scale-down: this rank is being removed and should
exit. */
exitCleanly();
}
/* perform NCCL communicator initialization, and since it is a non-blocking
communicator, wait until the operation completes. */
NCCL_CHECK(ncclCommInitRankConfig(&newComm, rankCount, uniqueId, rank, &config));
NCCL_WAIT(newComm);
*comm = newComm;
}
/* shrinkCommunicator: Use ncclCommShrink as a simplified and optimized option
when scaling down. */
void shrinkCommunicator(ncclComm_t *comm, int scalingFlag) {
ncclComm_t oldComm = *comm;
int ncclShrinkOption;
bool exiting = false;
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
config.blocking = 0;
ncclComm_t newComm;
std::vector<int> excluded;
/* query the framework for which ranks will be excluded in the new
communicator. */
frameworkGetExcludedRanks(&excluded);
int oldRank;
NCCL_CHECK(ncclCommUserRank( oldComm, &oldRank) );
for (int i=0; i<(int)excluded.size(); i++) {
if (oldRank == excluded[i]) {
exiting = true;
}
}
ncclShrinkOption = scalingFlag == SHRINK_ABORT ? NCCL_SHRINK_ABORT : NCCL_SHRINK_DEFAULT;
if (!exiting) {
/* execute the shrink operation. After executing, wait on the old
communicator for success, and finally assign *comm to be the new communicator.
*/
NCCL_CHECK(ncclCommShrink(oldComm, excluded.data(), excluded.size(), \
&newComm, &config, ncclShrinkOption));
NCCL_WAIT(oldComm);
NCCL_WAIT(newComm);
*comm = newComm;
}
if (ncclShrinkOption == NCCL_SHRINK_ABORT) {
ncclCommAbort(oldComm);
} else {
ncclCommDestroy(oldComm);
}
if (exiting) { exitCleanly(); }
}
/* persistent state between mainLoop iterations */
ncclComm_t comm = NULL;
void *queries = NULL;
/* mainLoop: called repeatedly during the life of this worker. */
void mainLoop() {
enum scalingRequestType scalingFlag;
/* The framework provides the workers with some work to do (queries) and
signals any scaling actions that should happen. The framework will ensure all
workers observe the same value for scalingFlag during each pass through the
mainloop.
*/
frameworkGetInferenceWork(&queries, &scalingFlag);
/* Act on the scalingFlag: */
if (scalingFlag == SCALING_NORMAL || scalingFlag == SCALING_ABORT) {
scaleCommunicator(&comm, scalingFlag);
} else if (scalingFlag == SHRINK_NORMAL || scalingFlag == SHRINK_ABORT) {
shrinkCommunicator(&comm, scalingFlag);
}
/* Perform inference work. Catch any exceptions raised and communicate any
problems to the framework. */
try {
executePrefillAndDecode(comm, queries);
} catch (const AppNCCLTimeoutException &e) {
frameworkNotifyTimeout();
} catch (const AppNCCLErrorException &e) {
frameworkNotifyError();
}
}
该示例以分布式推理应用为模型,展示了框架如何协调工作节点执行纵向扩展或纵向收缩操作。核心逻辑体现在两个关键函数 scaleCommunicator 和 shrinkCommunicator 中,框架会根据需求调用它们。主要的推理任务由 executePrefillAndDecode 承担,该函数使用一种主动通信器,可在工作节点的生命周期内动态更换。
该应用基于一个中央主循环构建,用于模拟推理工作进程的连续运行。在每次迭代中,工作进程从框架中获取新任务,并检查是否存在名为 scalingFlag 的信号,以判断是否需要执行扩缩容操作。框架会确保这些扩缩容请求被同步传递至所有工作进程。若发生故障,工作进程将触发超时或接收到来自 NCCL 的错误信息。在这两种情况下,异常处理机制会将故障信息上报至框架,从而启动故障恢复流程。
工作者之间的协调操作需要一个中央监控组件,可称之为应用程序监控器。该组件通常负责监控工作节点的健康状况、流量负载以及请求延迟等关键指标,并据此向工作节点发送指令,指示其何时扩展或缩减资源池。
例如,为了应对增加的流量,应用程序监视器可识别可用的 GPU,启动新的工作进程,并设置缩放标志,以通知现有工作进程扩展通信器。scaleCommunicator 函数用于管理该过程,工作节点通过协调确定新的通信器规模,并共享所需的 ncclUniqueId。
当流量下降时,应用程序监视器会发出降噪信号,识别出可以删除的秩。在此情况下,shrinkCommunicator 函数利用 ncclCommShrink 提供的优化路径,通过一个简化的接口实现操作,无需生成和分发新的 ncclUniqueId。退出秩后,其底层的 GPU 资源可被释放回集群的资源分配系统或云服务商。
scaleCommunicator 和 shrinkCommunicator 均支持故障恢复处理。当应用程序监视器检测到故障组件时,可通过调用任一函数的“中止”路径,指示健康的工作进程将其移除。这些路径需执行额外操作(例如调用 ncclCommAbort 或设置 NCCL_SHRINK_ABORT 标志),以确保正在运行的通信器在等待故障对等端时不会发生阻塞。
开始使用具备可扩展性和容错能力的 NCCL 应用
NCCL 对动态通信器的支持为构建现代弹性 AI 基础设施提供了有力工具。通过突破静态的启动时配置限制,能够开发出适应不断变化工作负载的应用,并在效率与成本之间实现更优平衡。
此外,由于支持调用 ncclCommAbort 或 ncclCommShrink,系统可在不完全中止和重启的情况下处理意外的硬件或软件故障。利用这些动态功能,可构建下一代多 GPU 应用程序,实现可扩展且具备容错能力的系统。建议下载最新版本的 NCCL 或使用预构建容器(如 PyTorch NGC 容器)以快速开始。