Dask-MPI 批处理作业

Dask-MPI 批处理作业

Dask 及其分布式组件 Dask Distributed 是交互式会话背后极其强大的引擎(参见 Dask-MPI 交互式作业)。然而,在许多场景下,您的工作是预先定义好的,并且您不需要交互式会话来执行任务。在这种情况下,以*批处理模式*运行是最佳选择。

Dask-MPI 通过提供一个 API 来实现为 dask-mpi 命令行接口 (CLI) 创建的相同功能,使得在 MPI 环境中以批处理模式运行变得容易。然而,在批处理模式下,运行您的 Dask Client 的脚本需要在构建 Dask 集群的同一环境中运行,并且您希望在 Client 脚本执行完毕后 Dask 集群能够关闭。

为了实现此功能,Dask-MPI 在其应用程序接口 (API) 中提供了 initialize() 方法。initialize() 函数在 MPI 环境中(即使用 mpirunmpiexec 创建的环境)运行时,会在 MPI 秩 0 上启动 Dask Scheduler,在 MPI 秩 2 及以上启动 Dask Workers。在 MPI 秩 1 上,initialize() 函数会“透传”到 Client 脚本,运行用户希望执行的基于 Dask 的 Client 代码。

例如,如果您有一个名为 myscript.py 的基于 Dask 的脚本,您可以使用以下命令通过 Dask 并行运行该脚本。

mpirun -np 4 python myscript.py

这将会在 MPI 秩 0 上运行 Dask Scheduler,在 MPI 秩 1 上运行用户的 Client 代码,并在 MPI 秩 2 和 MPI 秩 3 上运行 2 个 Worker。为了使其工作,myscript.py 脚本中必须包含(通常在脚本顶部附近)以下代码。

from dask_mpi import initialize
initialize()

from distributed import Client
client = Client()

Dask Client 将自动检测在 MPI 秩 0 上运行的 Dask Scheduler 的位置并连接到它。

当 Client 代码执行完毕后,Dask Scheduler 和 Workers(以及可能还有 Nannies)将被终止。

提示

使用作业调度器运行批处理作业

在高性能计算 (HPC) 环境中,通常使用作业调度器(例如 LSF、PBS 或 SLURM)来请求所需的计算资源。在这种环境中,建议将 mpirun ... python myscript.py 命令放在作业提交脚本中,以便从作业调度器请求的资源与 mpirun 命令使用的资源匹配。

有关 initialize() 方法的更多详细信息,请参阅应用程序接口 (API)

连接到仪表盘

由于 Dask 可能在非登录节点上初始化,简单的端口转发可能不足以连接到仪表盘。

要找出哪个节点托管仪表盘,请在初始化代码中添加位置日志记录

from dask_mpi import initialize
initialize()

from dask.distributed import Client
from distributed.scheduler import logger
import socket

client = Client()

host = client.run_on_scheduler(socket.gethostname)
port = client.scheduler_info()['services']['dashboard']
login_node_address = "supercomputer.university.edu" # Provide address/domain of login node

logger.info(f"ssh -N -L {port}:{host}:{port} {login_node_address}")

然后,在批处理作业输出文件中查找日志记录的行并在您的终端中使用

ssh -N -L PORT_NUMBER:node03:PORT_NUMBER supercomputer.university.edu

Bokeh 仪表盘将在 localhost:PORT_NUMBER 可用。