VQNet的分布式计算模块

分布式计算​​是指通过多台设备(如GPU/CPU节点)协同完成神经网络的训练或推理任务,利用并行处理加速计算并扩展模型规模。 其核心是通过​​分布式接口​​(如MPI、NCCL、gRP)协调设备间的通信与同步

VQNet的分布式计算模块模块使用mpi启动多进程并行计算, 使用nccl进行GPU之间通信。该功能仅在linux操作系统下能够使用。

环境部署

以下介绍VQNet分别基于CPU、GPU分布式计算所需的Linux系统下环境的部署.该部分必须MPI的支持, 以下介绍MPI的环境部署。

MPI安装

MPI为CPU间通信的常用库, VQNet中CPU的分布式计算功能则基于MPI进行实现,以下将介绍如何在Linux系统中对MPI进行安装(目前基于CPU的分布式计算功能仅在Linux上实现)。

本软件当前编译依赖的是mpicxx==4.1.2,您可以通过 conda 或其他方式安装。

conda install conda-forge::mpich-mpicxx==4.1.2

此外我们还必须安装 mpi4py 库。通过pip install完成mpi4py的安装即可, 若是出现以下类似错误

../_images/mpi_bug.png

为mpi4py与python版本之间不兼容的问题, 可以通过以下方法解决

# 通过下列代码暂存当前python环境的编译器
pushd /root/anaconda3/envs/$CONDA_DEFAULT_ENV/compiler_compat && mv ld ld.bak && popd

# 再次安装
pip install mpi4py

# 还原
pushd /root/anaconda3/envs/$CONDA_DEFAULT_ENV/compiler_compat && mv ld.bak ld && popd

NCCL安装

NCCL为GPU间通信的常用库, VQNet中GPU的分布式计算功能则基于NCCL进行实现,本软件默认在安装时候同时安装NCCL的动态链接库, 一般不需要安装NCCL。 如果要安装NCCL,可以按照以下介绍如何在Linux系统中对NCCL进行安装(目前基于GPU的分布式计算功能仅在Linux上实现).

从github上将NCCL的仓库拉到本地:

git clone https://github.com/NVIDIA/nccl.git

进入nccl根目录并编译

cd nccl
make -j src.build

如果cuda没有安装到默认的路径即/usr/local/cuda, 则需要定义CUDA的路径, 使用以下代码来编译

make src.build CUDA_HOME=<path to cuda install>

并且可以根据BUILDDIR指定安装目录, 指令如下

make src.build CUDA_HOME=<path to cuda install> BUILDDIR=/usr/local/nccl

安装完成后在.bashrc文件中添加配置

vim ~/.bashrc

# 在最下面加入
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/nccl/lib
export PATH=$PATH:/usr/local/nccl/bin

保存后, 执行

source ~/.bashrc

可以通过nccl-test进行验证

git clone https://github.com/NVIDIA/nccl-tests.git
cd nccl-tests
make -j12 CUDA_HOME=/usr/local/cuda
./build/all_reduce_perf -b 8 -e 256M -f 2 -g 1

节点间通信环境部署

在多节点上实现分布式计算,首先 需要保证多节点上mpich环境的一致,python环境一致 ,其次,需要设置 节点间的免密通信

假设需要设置node0(主节点)、node1、node2三个节点的免密通信。

# 在每个节点上执行
ssh-keygen

# 之后一直回车,在.ssh文件夹下生成一个公钥(id_rsa.pub)一个私钥(id_rsa)
# 将其另外两个节点的公钥都添加到第一个节点的authorized_keys文件中,
# 再将第一个节点authorized_keys文件传到另外两个节点便可以实现节点间的免密通信
# 在子节点node1上执行
cat ~/.ssh/id_dsa.pub >> node0:~/.ssh/authorized_keys

# 在子节点node2上执行
cat ~/.ssh/id_dsa.pub >> node0:~/.ssh/authorized_keys

# 先删除node1、node2中的authorized_keys文件后,在node0上将authorized_keys文件拷贝到另外两个节点上
scp ~/.ssh/authorized_keys  node1:~/.ssh/authorized_keys
scp ~/.ssh/authorized_keys  node2:~/.ssh/authorized_keys

# 保证三个不同节点生成的公钥都在authorized_keys文件中,即可实现节点间的免密通信

可选的, 最好还设置一个共享目录,使得改变共享目录下的文件时,不同节点中文件也会进行更改,预防多节点运行模型时不同节点中的文件不同步的问题。 使用nfs-utils和rpcbind实现共享目录。

# 安装软件包
yum -y install nfs* rpcbind

# 编辑主节点上配置文件
vim /etc/exports
/data/mpi *(rw,sync,no_all_squash,no_subtree_check)

# 主节点上启动服务
systemctl start rpcbind
systemctl start nfs

# 在所有子结点node1,node2上mount要共享的目录
mount node1:/data/mpi/ /data/mpi
mount node2:/data/mpi/ /data/mpi

分布式启动

使用分布式计算接口,通过 vqnetrun 命令启动, 接下来介绍 vqnetrun 的各个参数.

n, np

vqnetrun 接口中可以通过 -n, -np 参数控制启动的进程数,执行样例如下:

Example:

from pyvqnet.distributed import CommController
Comm_OP = CommController("mpi") # init mpi controller

rank = Comm_OP.getRank()
size = Comm_OP.getSize()
print(f"rank: {rank}, size {size}")

# vqnetrun -n 2 python test.py
# vqnetrun -np 2 python test.py

H, hosts

vqnetrun 接口中可以通过 -H, --hosts 指定节点以及进程分配来跨节点执行(在跨节点运行时必须将节点的环境配置成功, 在相同的环境,相同的路径下执行),执行样例如下:

Example:

from pyvqnet.distributed import CommController, get_host_name
Comm_OP = CommController("mpi") # init mpi controller

rank = Comm_OP.getRank()
size = Comm_OP.getSize()
print(f"rank: {rank}, size {size}")
print(f"LocalRank {Comm_OP.getLocalRank()} hosts name {get_host_name()}")

# vqnetrun -np 4 -H node0:1,node2:1 python test.py
# vqnetrun -np 4 --hosts node0:1,node2:1 python test.py

hostfile, f, hostfile

vqnetrun 接口中可以通过指定hosts文件来指定节点以及进程分配来跨节点(在跨节点运行时必须将节点的环境配置成功, 在相同的环境,相同的路径下执行), 命令行参数为 -hostfile, -f, --hostfile.

文件内每行的格式必须为:<hostname> slots=<slots> 如;

node0 slots=1

node2 slots=1

执行样例如下

Example:

from pyvqnet.distributed import CommController, get_host_name
Comm_OP = CommController("mpi") # init mpi controller

rank = Comm_OP.getRank()
size = Comm_OP.getSize()
print(f"rank: {rank}, size {size}")
print(f"LocalRank {Comm_OP.getLocalRank()} hosts name {get_host_name()}")

# vqnetrun -np 4 -f hosts python test.py
# vqnetrun -np 4 -hostfile hosts python test.py
# vqnetrun -np 4 --hostfile hosts python test.py

output-filename

vqnetrun 接口中可以通过命令行参数 --output-filename 来将输出结果保存到指定文件.

执行样例如下

Example:

from pyvqnet.distributed import CommController, get_host_name
Comm_OP = CommController("mpi") # init mpi controller

rank = Comm_OP.getRank()
size = Comm_OP.getSize()
print(f"rank: {rank}, size {size}")
print(f"LocalRank {Comm_OP.getLocalRank()} hosts name {get_host_name()}")

# vqnetrun -np 4 --hostfile hosts --output-filename output  python test.py

verbose

vqnetrun 接口中可以通过命令行参数 --verbose 来对节点间的通信进行检测,并额外输出检测结果。

执行样例如下

Example:

from pyvqnet.distributed import CommController, get_host_name
Comm_OP = CommController("mpi") # init mpi controller

rank = Comm_OP.getRank()
size = Comm_OP.getSize()
print(f"rank: {rank}, size {size}")
print(f"LocalRank {Comm_OP.getLocalRank()} hosts name {get_host_name()}")

# vqnetrun -np 4 --hostfile hosts --verbose python test.py

start-timeout

vqnetrun 接口中可以通过命令行参数 --start-timeout 来指定超时前执行所有检查并启动进程。默认值为 30 秒。

执行样例如下

Example:

from pyvqnet.distributed import CommController, get_host_name
Comm_OP = CommController("mpi") # init mpi controller

rank = Comm_OP.getRank()
size = Comm_OP.getSize()
print(f"rank: {rank}, size {size}")
print(f"LocalRank {Comm_OP.getLocalRank()} hosts name {get_host_name()}")

# vqnetrun -np 4 --start-timeout 10 python test.py

h

vqnetrun 接口中可以通过该标志, 输出vqnetrun支持的所有参数以及参数的详细介绍。

执行代码如下

# vqnetrun -h

CommController

class pyvqnet.distributed.ControlComm.CommController(backend, rank=None, world_size=None)

CommController用于控制在cpu、gpu下数据通信的控制器, 通过设置参数 backend 来生成cpu(mpi)、gpu(nccl)的控制器。(目前分布式计算的功能仅支持linux操作系系统下使用)

参数:
  • backend – 用于生成cpu或者gpu的数据通信控制器。

  • rank – 该参数仅在非pyvqnet后端下有用, 默认值为: None。

  • world_size – 该参数仅在非pyvqnet后端下有用, 默认值为: None。

返回:

CommController 实例。

Examples:

from pyvqnet.distributed import CommController
Comm_OP = CommController("nccl") # init nccl controller

# Comm_OP = CommController("mpi") # init mpi controller
getRank()

用于获得当前进程的进程号。

返回:

返回当前进程的进程号。

Examples:

from pyvqnet.distributed import CommController
Comm_OP = CommController("nccl") # init nccl controller

Comm_OP.getRank()
getSize()

用于获得总共启动的进程数。

返回:

返回总共进程的数量。

Examples:

from pyvqnet.distributed import CommController
Comm_OP = CommController("nccl") # init nccl controller

Comm_OP.getSize()
# vqnetrun -n 2 python test.py
# 2
getLocalRank()

用于获得当前机器上的当前进程号。

返回:

当前机器上的当前进程号。

Examples:

from pyvqnet.distributed import CommController
Comm_OP = CommController("nccl") # init nccl controller

Comm_OP.getLocalRank()
# vqnetrun -n 2 python test.py
barrier()

同步。

返回:

同步操作。

Examples:

from pyvqnet.distributed import CommController
Comm_OP = CommController("nccl")

Comm_OP.barrier()
get_device_num()

用于获得当前节点上的显卡数量, (仅支持gpu下使用)。

返回:

返回当前节点上显卡数量。

Examples:

from pyvqnet.distributed import CommController
Comm_OP = CommController("nccl")

Comm_OP.get_device_num()
# python test.py
allreduce(tensor, c_op='avg')

支持对数据作allreduce通信。

参数:
  • tensor – 输入数据.

  • c_op – 计算方式.

Examples:

from pyvqnet.distributed import CommController
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("mpi")

num = tensor.to_tensor(np.random.rand(1, 5))
print(f"rank {Comm_OP.getRank()}  {num}")

Comm_OP.allreduce(num, "sum")
print(f"rank {Comm_OP.getRank()}  {num}")
# vqnetrun -n 2 python test.py
reduce(tensor, root=0, c_op='avg')

支持对数据作reduce通信。

参数:
  • tensor – 输入数据。

  • root – 指定数据返回的节点。

  • c_op – 计算方式。

Examples:

from pyvqnet.distributed import CommController
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("mpi")

num = tensor.to_tensor(np.random.rand(1, 5))
print(f"rank {Comm_OP.getRank()}  {num}")

Comm_OP.reduce(num, 1)
print(f"rank {Comm_OP.getRank()}  {num}")
# vqnetrun -n 2 python test.py
broadcast(tensor, root=0)

将指定进程root上的数据广播到所有进程上。

参数:
  • tensor – 输入数据。

  • root – 指定的节点。

Examples:

from pyvqnet.distributed import CommController
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("mpi")

num = tensor.to_tensor(np.random.rand(1, 5))
print(f"rank {Comm_OP.getRank()}  {num}")

Comm_OP.broadcast(num, 1)
print(f"rank {Comm_OP.getRank()}  {num}")
# vqnetrun -n 2 python test.py
allgather(tensor)

将所有进程上数据allgather到一起。

参数:

tensor – 输入数据。

Examples:

from pyvqnet.distributed import CommController
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("mpi")

num = tensor.to_tensor(np.random.rand(1, 5))
print(f"rank {Comm_OP.getRank()}  {num}")

num = Comm_OP.allgather(num)
print(f"rank {Comm_OP.getRank()}  {num}")
# vqnetrun -n 2 python test.py
send(tensor, dest)

p2p通信接口。

参数:
  • tensor – 输入数据.

  • dest – 目的进程.

Examples:

from pyvqnet.distributed import CommController,get_rank
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("mpi")

num = tensor.to_tensor(np.random.rand(1, 5))
recv = tensor.zeros_like(num)

if get_rank() == 0:
    Comm_OP.send(num, 1)
elif get_rank() == 1:
    Comm_OP.recv(recv, 0)
print(f"rank {Comm_OP.getRank()}  {num}")
print(f"rank {Comm_OP.getRank()}  {recv}")

# vqnetrun -n 2 python test.py
recv(tensor, source)

p2p通信接口。

参数:
  • tensor – 输入数据.

  • source – 接受进程.

Examples:

from pyvqnet.distributed import CommController,get_rank
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("mpi")

num = tensor.to_tensor(np.random.rand(1, 5))
recv = tensor.zeros_like(num)

if get_rank() == 0:
    Comm_OP.send(num, 1)
elif get_rank() == 1:
    Comm_OP.recv(recv, 0)
print(f"rank {Comm_OP.getRank()}  {num}")
print(f"rank {Comm_OP.getRank()}  {recv}")

# vqnetrun -n 2 python test.py
split_group(rankL)

根据入参设置的进程号列表用于划分多个通信组。

参数:

rankL – 进程组序号列表。

返回:

当后端为 nccl 返回的是进程组序号元组,当后端为 mpi 返回一个列表,其长度等于分组个数;每个元素是二元组 (comm, rank),其中 comm 为该分组的 MPI 通信器,rank 为组内序号。

Examples:

from pyvqnet.distributed import CommController,get_rank,get_local_rank
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("mpi")

groups = Comm_OP.split_group([[0, 1],[2,3]])
print(groups)
#[[<mpi4py.MPI.Intracomm object at 0x7f53691f3230>, [0, 3]], [<mpi4py.MPI.Intracomm object at 0x7f53691f3010>, [2, 1]]]

# mpirun -n 4 python test.py
allreduce_group(tensor, c_op='avg', group=None)

组内allreduce通信接口。

参数:
  • tensor – 输入数据.

  • c_op – 计算方法.

  • group – 当使用mpi后端时候,输入由 init_groupsplit_group 生成的组对应通信组,当使用nccl后端时候输入`split_group` 生成的组序号。

Examples:

from pyvqnet.distributed import CommController,get_rank,get_local_rank
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("nccl")

groups = Comm_OP.split_group([[0, 1]])

complex_data = tensor.QTensor([3+1j, 2, 1 + get_rank()],dtype=8).reshape((3,1)).toGPU(1000+ get_local_rank())

print(f"allreduce_group before rank {get_rank()}: {complex_data}")

Comm_OP.allreduce_group(complex_data, c_op="sum",group = groups[0])
print(f"allreduce_group after rank {get_rank()}: {complex_data}")
# vqnetrun -n 2 python test.py
reduce_group(tensor, root=0, c_op='avg', group=None)

组内reduce通信接口。

参数:
  • tensor – 输入数据.

  • root – 指定进程号.

  • c_op – 计算方法.

  • group – 当使用mpi后端时候,输入由 init_groupsplit_group 生成的组对应通信组,当使用nccl后端时候输入`split_group` 生成的组序号。

Examples:

from pyvqnet.distributed import CommController,get_rank,get_local_rank
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("nccl")

groups = Comm_OP.split_group([[0, 1]])

complex_data = tensor.QTensor([3+1j, 2, 1 + get_rank()],dtype=8).reshape((3,1)).toGPU(1000+ get_local_rank())

print(f"reduce_group before rank {get_rank()}: {complex_data}")

Comm_OP.reduce_group(complex_data, c_op="sum",group = groups[0])
print(f"reduce_group after rank {get_rank()}: {complex_data}")
# vqnetrun -n 2 python test.py
broadcast_group(tensor, root=0, group=None)

组内broadcast通信接口。

参数:
  • tensor – 输入数据.

  • root – 指定从哪个进程号广播, 默认为0.

  • group – 当使用mpi后端时候,输入由 init_groupsplit_group 生成的组对应通信组,当使用nccl后端时候输入`split_group` 生成的组序号。

Examples:

from pyvqnet.distributed import CommController,get_rank,get_local_rank
from pyvqnet.tensor import tensor
import numpy as np
Comm_OP = CommController("nccl")

groups = Comm_OP.split_group([[0, 1]])

complex_data = tensor.QTensor([3+1j, 2, 1 + get_rank()],dtype=8).reshape((3,1)).toGPU(1000+ get_local_rank())

print(f"broadcast_group before rank {get_rank()}: {complex_data}")

Comm_OP.broadcast_group(complex_data,group = groups[0])
Comm_OP.barrier()
print(f"broadcast_group after rank {get_rank()}: {complex_data}")
# vqnetrun -n 2 python test.py
allgather_group(tensor, group=None)

组内allgather通信接口。

参数:
  • tensor – 输入数据.

  • group – 当使用mpi后端时候,输入由 init_groupsplit_group 生成的组对应通信组,当使用nccl后端时候输入`split_group` 生成的组序号。

Examples:

from pyvqnet.distributed import CommController,get_rank,init_group
from pyvqnet.tensor import tensor

Comm_OP = CommController("mpi")
group = init_group([[0,1]])
#mpi init group internally
# A list of lists, where each sublist contains a communicator and the corresponding rank list.
complex_data = tensor.QTensor([3+1j, 2, 1 + get_rank()],dtype=8).reshape((3,1))
print(f" before rank {get_rank()}: {complex_data}")
for comm_ in group:
    if Comm_OP.getRank() in comm_[1]:
        complex_data = Comm_OP.all_gather_group(complex_data, comm_[0])
        print(f"after rank {get_rank()}: {complex_data}")
# mpirun -n 2 python test.py

from pyvqnet.distributed import CommController,get_rank,get_local_rank
from pyvqnet.tensor import tensor
Comm_OP = CommController("nccl")
groups = Comm_OP.split_group([[0, 1]])
complex_data = tensor.QTensor([3+1j, 2, 1 + get_rank()],dtype=8).reshape((3,1)).toGPU(1000+ get_local_rank())
print(f" before rank {get_rank()}: {complex_data}")
complex_data = Comm_OP.all_gather_group(complex_data, group = groups[0])
print(f"after rank {get_rank()}: {complex_data}")
# mpirun -n 2 python test.py

split_data

在多进程中,使用 split_data 根据进程数对数据进行切分,返回相应进程上数据。

pyvqnet.distributed.datasplit.split_data(x_train, y_train, shuffle=False)

设置分布式计算参数。

参数:
  • x_trainnp.array - 训练数据.

  • y_trainnp.array - 训练数据标签.

  • shufflebool - 是否打乱后再进行切分,默认值是False.

返回:

切分后的训练数据和标签。

Example:

from pyvqnet.distributed import split_data
import numpy as np

x_train = np.random.randint(255, size = (100, 5))
y_train = np.random.randint(2, size = (100, 1))

x_train, y_train= split_data(x_train, y_train)

get_local_rank

pyvqnet.distributed.ControlComm.get_local_rank()

得到当前节点上进程号。例如你在第2个节点的第3个进程,每个节点5个进程,则返回2。

返回:

当前机器上的当前进程号。

Example:

from pyvqnet.distributed.ControlComm import get_local_rank

print(get_local_rank())
# vqnetrun -n 2 python test.py

get_rank

pyvqnet.distributed.ControlComm.get_rank()

用于获得当前进程的全局进程号。例如你在第2个节点的第3个进程,每个节点5个进程,则返回7。

返回:

当前进程的进程号。

Example:

from pyvqnet.distributed.ControlComm import get_rank

print(get_rank())
# vqnetrun -n 2 python test.py

init_group

pyvqnet.distributed.ControlComm.init_group(rank_lists)

根据给出的进程数列表来对基于 mpi 后端的进程组进行初始化。

警告

该接口只支持分布式后端为 mpi

参数:

rank_lists – 通信进程组列表.

返回:

返回一个列表,其长度等于分组个数;每个元素是二元组 (comm, rank),其中 comm 为该分组的 MPI 通信器,rank 为组内序号。

Example:

from pyvqnet.distributed import *

Comm_OP = CommController("mpi")
num = tensor.to_tensor(np.random.rand(1, 5))
print(f"rank {Comm_OP.getRank()}  {num}")

group_l = init_group([[0,2], [1]])

for comm_ in group_l:
    if Comm_OP.getRank() in comm_[1]:
        Comm_OP.allreduce_group(num, "sum", group = comm_[0])
        print(f"rank {Comm_OP.getRank()}  {num} after")

# vqnetrun -n 3 python test.py

PipelineParallelTrainingWrapper

class pyvqnet.distributed.pp.PipelineParallelTrainingWrapper(args, join_layers, trainset)

Pipeline Parallel Training Wrapper 实现了 1F1B训练。仅在 Linux 平台上,且具有 GPU 的情况下可用。 更多算法细节可以在(https://www.deepspeed.ai/tutorials/pipeline/)找到。

参数:
  • args – 参数字典。参见示例。

  • join_layers – Sequential 模块的列表。

  • trainset – 数据集。

返回:

PipelineParallelTrainingWrapper 实例。

以下使用 CIFAR10数据库 CIFAR10_Dataset,在2块GPU上训练AlexNet上的分类任务。 本例子中分成两个流水线并行进程 pipeline_parallel_size = 2。 批处理大小为 train_batch_size = 64, 单GPU 上为 train_micro_batch_size_per_gpu = 32。 其他配置参数可见 args。 此外,每个进程需要在 __main__ 函数中配置环境变量的 LOCAL_RANK

os.environ["LOCAL_RANK"] = str(dist.get_local_rank())

调用 train_batch 进行训练。

Examples:

import os
import pyvqnet

from pyvqnet.nn import Module,Sequential,CrossEntropyLoss
from pyvqnet.nn import Linear
from pyvqnet.nn import Conv2D
from pyvqnet.nn import activation as F
from pyvqnet.nn import MaxPool2D
from pyvqnet.nn import CrossEntropyLoss

from pyvqnet.tensor import tensor
from pyvqnet.distributed.pp import PipelineParallelTrainingWrapper
from pyvqnet.distributed.configs import comm as dist
from pyvqnet.distributed import *


pipeline_parallel_size = 2

num_steps = 1000

def cifar_trainset_vqnet(local_rank, dl_path='./cifar10-data'):
    transform = pyvqnet.data.TransformCompose([
        pyvqnet.data.TransformResize(256),
        pyvqnet.data.TransformCenterCrop(224),
        pyvqnet.data.TransformToTensor(),
        pyvqnet.data.TransformNormalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    trainset = pyvqnet.data.CIFAR10_Dataset(root=dl_path,
                                            mode="train",
                                            transform=transform,layout="HWC")

    return trainset

class Model(Module):
    def __init__(self):
        super(Model, self).__init__()
        self.features = Sequential(
        Conv2D(input_channels=3, output_channels=8, kernel_size=(3, 3), stride=(1, 1), padding='same'),
        F.ReLu(),
        MaxPool2D([2, 2], [2, 2]),

        Conv2D(input_channels=8, output_channels=16, kernel_size=(3, 3), stride=(1, 1), padding='same'),
        F.ReLu(),
        MaxPool2D([2, 2], [2, 2]),

        Conv2D(input_channels=16, output_channels=32, kernel_size=(3, 3), stride=(1, 1), padding='same'),
        F.ReLu(),

        Conv2D(input_channels=32, output_channels=64, kernel_size=(3, 3), stride=(1, 1), padding='same'),
        F.ReLu(),

        Conv2D(input_channels=64, output_channels=64, kernel_size=(3, 3), stride=(1, 1), padding='same'),
        F.ReLu(),
        MaxPool2D([3, 3], [2, 2]),)

        self.cls = Sequential(
        Linear(64 * 27 * 27, 512),
        F.ReLu(),

        Linear(512, 256),
        F.ReLu(),
        Linear(256, 10) )

    def forward(self, x):
        x = self.features(x)
        x = tensor.flatten(x,1)
        x = self.cls(x)

        return x

def join_layers(vision_model):
    layers = [
        *vision_model.features,
        lambda x: tensor.flatten(x, 1),
        *vision_model.cls,
    ]
    return layers


if __name__ == "__main__":


    args = {
    "backend":'nccl',
    "train_batch_size" : 64,
    "train_micro_batch_size_per_gpu" : 32,
    "epochs":5,
"optimizer": {
    "type": "Adam",
    "params": {
    "lr": 0.001
    }},
    "local_rank":dist.get_local_rank(),
    "pipeline_parallel_size":pipeline_parallel_size, "seed":42, "steps":num_steps,
    "loss":CrossEntropyLoss(),
    }
    os.environ["LOCAL_RANK"] = str(dist.get_local_rank())
    trainset = cifar_trainset_vqnet(args["local_rank"])
    w = PipelineParallelTrainingWrapper(args,join_layers(Model()),trainset)

    all_loss = {}

    for i in range(args["epochs"]):
        w.train_batch()

    all_loss = w.loss_dict

ZeroModelInitial

class pyvqnet.distributed.ZeroModelInitial(args, model, optimizer)

Zero1 api接口, 目前仅用于linux平台下基于GPU并行计算。

参数:
  • args – 参数字典。参见示例。

  • model – 输入模型。

  • optimizer – 优化器。

返回:

Zero1 Engine.

以下使用 MNIST 数据库, 在2块GPU上训练一个MLP模型上的分类任务。

批处理大小为 train_batch_size = 64, zero_optimization 的阶段 stage 设置为1. 若Optimizer为None, 则采用 argsoptimizer 的设置. 其他配置参数可见 args。 此外,每个进程需要在配置环境变量的 LOCAL_RANK

os.environ["LOCAL_RANK"] = str(dist.get_local_rank())

Examples:

from pyvqnet.distributed import *
from pyvqnet import *
from time import time
import pyvqnet.optim as optim
import pyvqnet.nn as nn
import pyvqnet
import sys
import pyvqnet
import numpy as np
import os
import struct

def load_mnist(dataset="training_data",
            digits=np.arange(2),
            path="./"):
    """
    load mnist data
    """
    from array import array as pyarray
    if dataset == "training_data":
        fname_image = os.path.join(path, "train-images.idx3-ubyte").replace(
            "\\", "/")
        fname_label = os.path.join(path, "train-labels.idx1-ubyte").replace(
            "\\", "/")
    elif dataset == "testing_data":
        fname_image = os.path.join(path, "t10k-images.idx3-ubyte").replace(
            "\\", "/")
        fname_label = os.path.join(path, "t10k-labels.idx1-ubyte").replace(
            "\\", "/")
    else:
        raise ValueError("dataset must be 'training_data' or 'testing_data'")

    flbl = open(fname_label, "rb")
    _, size = struct.unpack(">II", flbl.read(8))

    lbl = pyarray("b", flbl.read())
    flbl.close()

    fimg = open(fname_image, "rb")
    _, size, rows, cols = struct.unpack(">IIII", fimg.read(16))
    img = pyarray("B", fimg.read())
    fimg.close()

    ind = [k for k in range(size) if lbl[k] in digits]
    num = len(ind)
    images = np.zeros((num, rows, cols),dtype=np.float32)

    labels = np.zeros((num, 1), dtype=int)
    for i in range(len(ind)):
        images[i] = np.array(img[ind[i] * rows * cols:(ind[i] + 1) * rows *
                                cols]).reshape((rows, cols))
        labels[i] = lbl[ind[i]]

    return images, labels


train_images_np, train_labels_np = load_mnist(dataset="training_data", digits=np.arange(10),path="../data/MNIST_data/")
train_images_np = train_images_np / 255.

test_images_np, test_labels_np = load_mnist(dataset="testing_data", digits=np.arange(10),path="../data/MNIST_data/")
test_images_np = test_images_np / 255.

local_rank = pyvqnet.distributed.get_rank()

from pyvqnet.distributed import ZeroModelInitial

class MNISTClassifier(nn.Module):

    def __init__(self):
        super(MNISTClassifier, self).__init__()
        self.fc1 = nn.Linear(28*28, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(128, 64)
        self.fc5 = nn.Linear(64, 10)
        self.ac = nn.activation.ReLu()

    def forward(self, x:pyvqnet.QTensor):

        x = x.reshape([-1, 28*28])
        x = self.ac(self.fc1(x))
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        x = self.fc5(x)
        return x

model = MNISTClassifier()

model.to(local_rank + 1000)

Comm_op = CommController("nccl")
Comm_op.broadcast_model_params(model, 0)

batch_size = 64

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

args_ = {
        "train_batch_size": batch_size, # 等效的总batch
        "optimizer": {
            "type": "adam",
            "params": {
            "lr": 0.001,
            }
        },
        "zero_optimization": {
            "stage": 1,
        }
    }

os.environ["LOCAL_RANK"] = str(get_local_rank())
model = ZeroModelInitial(args=args_, model=model, optimizer=optimizer)

def compute_acc(outputs, labels, correct, total):
    predicted = pyvqnet.tensor.argmax(outputs, dim=1, keepdims=True)
    total += labels.size
    correct += pyvqnet.tensor.sums(predicted == labels).item()
    return correct, total

train_acc = 0
test_acc = 0
epochs = 5
loss = 0
time1 = time()

for epoch in range(epochs):
    model.train()
    total = 0
    correct = 0
    step = 0

    num_batches = (train_images_np.shape[0] + batch_size - 1) // batch_size

    for i in range(num_batches):

        data_ = tensor.QTensor(train_images_np[i*batch_size: (i+1) * batch_size,:], dtype = kfloat32)
        labels = tensor.QTensor(train_labels_np[i*batch_size: (i+1) * batch_size,:], dtype = kint64)

        data_ = data_.to(local_rank + 1000)
        labels = labels.to(local_rank + 1000)

        outputs = model(data_)
        loss = criterion(labels, outputs)

        model.backward(loss) # 基于返回的model做backward、step
        model.step()

        correct, total = compute_acc(outputs, labels, correct, total)
        step += 1
        if step % 50 == 0:
            print(f"Train : rank {get_rank()} Epoch [{epoch+1}/{epochs}], step {step} Loss: {loss.item():.4f} acc {100 * correct / total}")
            sys.stdout.flush()

    train_acc = 100 * correct / total

time2 = time()
print(f'Accuracy of the model on the 10000 Train images: {train_acc}% time cost {time2 - time1}')

ColumnParallelLinear

class pyvqnet.distributed.ColumnParallelLinear(input_size, output_size, weight_initializer, bias_initializer, use_bias, dtype, name, tp_comm)

张量并行计算,列并行线性层

线性层定义为 Y = XA + b。 其二维并行为 A = [A_1,…,A_p]。

参数:
  • input_size – 矩阵 A 的第一个维度。

  • output_size – 矩阵 A 的第二个维度。

  • weight_initializercallable 默认为 normal

  • bias_initializercallable 默认为0。

  • use_biasbool - 默认为 True。

  • dtype – 默认 None,使用默认数据类型。

  • name – 模块名称,默认为“”。

  • tp_comm – 通讯控制器。

以下使用 MNIST 数据库, 在2块GPU上训练一个MLP模型上的分类任务。

使用时与经典的Linear层的使用相似

多进程使用时基于 vqnetrun -n 2 python test.py 的方式进行

Examples:

import pyvqnet.distributed
import pyvqnet.optim as optim
import pyvqnet.nn as nn
import pyvqnet
import sys
from pyvqnet.distributed.tensor_parallel import ColumnParallelLinear, RowParallelLinear
from pyvqnet.distributed import *
from time import time

import pyvqnet
import numpy as np
import os
from pyvqnet import *
import pytest

Comm_OP = CommController("nccl")

import struct
def load_mnist(dataset="training_data",
            digits=np.arange(2),
            path="./"):
    """
    load mnist data
    """
    from array import array as pyarray
    # download_mnist(path)
    if dataset == "training_data":
        fname_image = os.path.join(path, "train-images-idx3-ubyte").replace(
            "\\", "/")
        fname_label = os.path.join(path, "train-labels-idx1-ubyte").replace(
            "\\", "/")
    elif dataset == "testing_data":
        fname_image = os.path.join(path, "t10k-images-idx3-ubyte").replace(
            "\\", "/")
        fname_label = os.path.join(path, "t10k-labels-idx1-ubyte").replace(
            "\\", "/")
    else:
        raise ValueError("dataset must be 'training_data' or 'testing_data'")

    flbl = open(fname_label, "rb")
    _, size = struct.unpack(">II", flbl.read(8))

    lbl = pyarray("b", flbl.read())
    flbl.close()

    fimg = open(fname_image, "rb")
    _, size, rows, cols = struct.unpack(">IIII", fimg.read(16))
    img = pyarray("B", fimg.read())
    fimg.close()

    ind = [k for k in range(size) if lbl[k] in digits]
    num = len(ind)
    images = np.zeros((num, rows, cols),dtype=np.float32)

    labels = np.zeros((num, 1), dtype=int)
    for i in range(len(ind)):
        images[i] = np.array(img[ind[i] * rows * cols:(ind[i] + 1) * rows *
                                cols]).reshape((rows, cols))
        labels[i] = lbl[ind[i]]

    return images, labels

train_images_np, train_labels_np = load_mnist(dataset="training_data", digits=np.arange(10),path="./data/MNIST/raw/")
train_images_np = train_images_np / 255.

test_images_np, test_labels_np = load_mnist(dataset="testing_data", digits=np.arange(10),path="./data/MNIST/raw/")
test_images_np = test_images_np / 255.

local_rank = pyvqnet.distributed.get_rank()

class MNISTClassifier(nn.Module):
    def __init__(self):
        super(MNISTClassifier, self).__init__()
        self.fc1 = RowParallelLinear(28*28, 512, tp_comm = Comm_OP)
        self.fc2 = ColumnParallelLinear(512, 256, tp_comm = Comm_OP)
        self.fc3 = RowParallelLinear(256, 128, tp_comm = Comm_OP)
        self.fc4 = ColumnParallelLinear(128, 64, tp_comm = Comm_OP)
        self.fc5 = RowParallelLinear(64, 10, tp_comm = Comm_OP)
        self.ac = nn.activation.ReLu()

    def forward(self, x:pyvqnet.QTensor):

        x = x.reshape([-1, 28*28])
        x = self.ac(self.fc1(x))
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        x = self.fc5(x)
        return x


model = MNISTClassifier()
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

model.to(local_rank + 1000)

Comm_OP.broadcast_model_params(model, 0)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def compute_acc(outputs, labels, correct, total):
    predicted = pyvqnet.tensor.argmax(outputs, dim=1, keepdims=True)
    total += labels.size
    correct += pyvqnet.tensor.sums(predicted == labels).item()
    return correct, total

train_acc = 0
test_acc = 0
epochs = 5
loss = 0

time1 = time()
for epoch in range(epochs):
    model.train()
    total = 0
    correct = 0
    step = 0

    batch_size = 64
    num_batches = (train_images_np.shape[0] + batch_size - 1) // batch_size

    for i in range(num_batches):
        data_ = tensor.QTensor(train_images_np[i*batch_size: (i+1) * batch_size,:], dtype = kfloat32)
        labels = tensor.QTensor(train_labels_np[i*batch_size: (i+1) * batch_size,:], dtype = kint64)

        data_ = data_.to(local_rank + 1000)
        labels = labels.to(local_rank + 1000)

        optimizer.zero_grad()

        outputs = model(data_)
        loss = criterion(labels, outputs)

        loss.backward()
        optimizer.step()

        correct, total = compute_acc(outputs, labels, correct, total)
        step += 1
        if step % 50 == 0:
            print(f"Train : rank {get_rank()} Epoch [{epoch+1}/{epochs}], step {step} Loss: {loss.item():.4f} acc {100 * correct / total}")
            sys.stdout.flush()

    train_acc = 100 * correct / total
time2 = time()

print(f'Accuracy of the model on the 10000 Train images: {train_acc}% time cost {time2 - time1}')

RowParallelLinear

class pyvqnet.distributed.RowParallelLinear(input_size, output_size, weight_initializer, bias_initializer, use_bias, dtype, name, tp_comm)

张量并行计算,行并行线性层。

线性层的定义为 Y = XA + b。A 沿其一维并行,X 沿其二维并行。 A = transpose([A_1 … A_p]) X = [X_1, …, X_p]。

参数:
  • input_size – 矩阵 A 的第一个维度。

  • output_size – 矩阵 A 的第二个维度。

  • weight_initializercallable 默认为 normal

  • bias_initializercallable 默认为0。

  • use_biasbool - 默认为 True。

  • dtype – 默认 None,使用默认数据类型。

  • name – 模块名称。

  • tp_comm – 通讯控制器。

以下使用 MNIST 数据库, 在2块GPU上训练一个MLP模型上的分类任务。 使用时与经典的Linear层的使用相似

多进程使用时基于 vqnetrun -n 2 python test.py 的方式进行

Examples:

import pyvqnet.distributed
import pyvqnet.optim as optim
import pyvqnet.nn as nn
import pyvqnet
import sys
from pyvqnet.distributed.tensor_parallel import ColumnParallelLinear, RowParallelLinear
from pyvqnet.distributed import *
from time import time

import pyvqnet
import numpy as np
import os
from pyvqnet import *
import pytest

Comm_OP = CommController("nccl")

import struct
def load_mnist(dataset="training_data",
            digits=np.arange(2),
            path="./"):
    """
    load mnist data
    """
    from array import array as pyarray
    # download_mnist(path)
    if dataset == "training_data":
        fname_image = os.path.join(path, "train-images-idx3-ubyte").replace(
            "\\", "/")
        fname_label = os.path.join(path, "train-labels-idx1-ubyte").replace(
            "\\", "/")
    elif dataset == "testing_data":
        fname_image = os.path.join(path, "t10k-images-idx3-ubyte").replace(
            "\\", "/")
        fname_label = os.path.join(path, "t10k-labels-idx1-ubyte").replace(
            "\\", "/")
    else:
        raise ValueError("dataset must be 'training_data' or 'testing_data'")

    flbl = open(fname_label, "rb")
    _, size = struct.unpack(">II", flbl.read(8))

    lbl = pyarray("b", flbl.read())
    flbl.close()

    fimg = open(fname_image, "rb")
    _, size, rows, cols = struct.unpack(">IIII", fimg.read(16))
    img = pyarray("B", fimg.read())
    fimg.close()

    ind = [k for k in range(size) if lbl[k] in digits]
    num = len(ind)
    images = np.zeros((num, rows, cols),dtype=np.float32)

    labels = np.zeros((num, 1), dtype=int)
    for i in range(len(ind)):
        images[i] = np.array(img[ind[i] * rows * cols:(ind[i] + 1) * rows *
                                cols]).reshape((rows, cols))
        labels[i] = lbl[ind[i]]

    return images, labels

train_images_np, train_labels_np = load_mnist(dataset="training_data", digits=np.arange(10),path="./data/MNIST/raw/")
train_images_np = train_images_np / 255.

test_images_np, test_labels_np = load_mnist(dataset="testing_data", digits=np.arange(10),path="./data/MNIST/raw/")
test_images_np = test_images_np / 255.

local_rank = pyvqnet.distributed.get_rank()

class MNISTClassifier(nn.Module):
    def __init__(self):
        super(MNISTClassifier, self).__init__()
        self.fc1 = RowParallelLinear(28*28, 512, tp_comm = Comm_OP)
        self.fc2 = ColumnParallelLinear(512, 256, tp_comm = Comm_OP)
        self.fc3 = RowParallelLinear(256, 128, tp_comm = Comm_OP)
        self.fc4 = ColumnParallelLinear(128, 64, tp_comm = Comm_OP)
        self.fc5 = RowParallelLinear(64, 10, tp_comm = Comm_OP)
        self.ac = nn.activation.ReLu()


    def forward(self, x:pyvqnet.QTensor):

        x = x.reshape([-1, 28*28])
        x = self.ac(self.fc1(x))
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        x = self.fc5(x)
        return x

model = MNISTClassifier()
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

model.to(local_rank + 1000)
Comm_OP.broadcast_model_params(model, 0)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def compute_acc(outputs, labels, correct, total):
    predicted = pyvqnet.tensor.argmax(outputs, dim=1, keepdims=True)
    total += labels.size
    correct += pyvqnet.tensor.sums(predicted == labels).item()
    return correct, total

train_acc = 0
test_acc = 0
epochs = 5
loss = 0

time1 = time()
for epoch in range(epochs):
    model.train()
    total = 0
    correct = 0
    step = 0

    batch_size = 64
    num_batches = (train_images_np.shape[0] + batch_size - 1) // batch_size

    for i in range(num_batches):
        data_ = tensor.QTensor(train_images_np[i*batch_size: (i+1) * batch_size,:], dtype = kfloat32)
        labels = tensor.QTensor(train_labels_np[i*batch_size: (i+1) * batch_size,:], dtype = kint64)

        data_ = data_.to(local_rank + 1000)
        labels = labels.to(local_rank + 1000)

        optimizer.zero_grad()

        outputs = model(data_)
        loss = criterion(labels, outputs)

        loss.backward()
        optimizer.step()

        correct, total = compute_acc(outputs, labels, correct, total)
        step += 1
        if step % 50 == 0:
            print(f"Train : rank {get_rank()} Epoch [{epoch+1}/{epochs}], step {step} Loss: {loss.item():.4f} acc {100 * correct / total}")
            sys.stdout.flush()

    train_acc = 100 * correct / total
time2 = time()

print(f'Accuracy of the model on the 10000 Train images: {train_acc}% time cost {time2 - time1}')

比特重排序

量子比特重排序技术是比特并行中的技术,其核心是通过改变比特并行过程中量子逻辑门的排列顺序,减少比特并行中需要执行比特变换的次数,以下是基于比特并行构建大比特量子线路时需要的模块。参照论文 Lazy Qubit Reordering for Accelerating Parallel State-Vector-based Quantum Circuit Simulation 。 以下接口需要通过 mpi 启动多个进程进行计算。

DistributeQMachine

class pyvqnet.distributed.qubits_reorder.DistributeQMachine(num_wires, dtype, grad_mode)

用于比特并行中的变分量子计算的模拟类,包含每个节点包含的部分比特上的量子态。通过MPI,每个节点都申请一个该类,进行分布式的量子变分线路模拟,N的值必须等于2的分布式并行的比特个数 global_qubit 的幂次方,可通过 set_qr_config 进行配置。

参数:
  • num_wires – 完整量子线路的比特的数量。

  • dtype – 计算数据的数据类型。默认是 pyvqnet.kcomplex64,对应的参数精度是 pyvqnet.kfloat32。

  • grad_mode – 在 DistQuantumLayerAdjoint 进行反传时设置为 adjoint。

备注

输入的比特数是整个量子线路所需要的比特数量,通过DistributeQMachine会根据全局比特数构建量子模拟器, 其比特数量为 nums_wires - global_qubit, 反传必须基于 DistQuantumLayerAdjoint

警告

该接口只支持在Linux下运行;

必须对 DistributeQMachine 中比特并行中参数进行配置, 如样例中所示,包括:

qm.set_just_defined(True)
qm.set_save_op_history_flag(True) # open save op
qm.set_qr_config({'qubit': 总比特个数, 'global_qubit': 分布式比特个数})

Examples:

from pyvqnet.distributed import get_rank
from pyvqnet import tensor
from pyvqnet.qnn.vqc import rx, ry, cnot, MeasureAll,rz
import pyvqnet
from pyvqnet.distributed.qubits_reorder import DistributeQMachine,DistQuantumLayerAdjoint
pyvqnet.utils.set_random_seed(123)


qubit = 10
batch_size = 5

class QModel(pyvqnet.nn.Module):
    def __init__(self, num_wires, dtype, grad_mode=""):
        super(QModel, self).__init__()

        self._num_wires = num_wires
        self._dtype = dtype
        self.qm = DistributeQMachine(num_wires, dtype=dtype, grad_mode=grad_mode)

        self.qm.set_just_defined(True)
        self.qm.set_save_op_history_flag(True) # open save op
        self.qm.set_qr_config({"qubit": num_wires, # open qubit reordered, set config
                                "global_qubit": 1}) # global_qubit == log2(nproc)

        self.params = pyvqnet.nn.Parameter([qubit])

        self.measure = MeasureAll(obs={
            "X5":1.0
        })

    def forward(self, x, *args, **kwargs):
        self.qm.reset_states(x.shape[0])

        for i in range(qubit):
            rx(q_machine=self.qm, params=self.params[i], wires=i)
            ry(q_machine=self.qm, params=self.params[3], wires=i)
            rz(q_machine=self.qm, params=self.params[4], wires=i)
        cnot(q_machine=self.qm,  wires=[0, 1])
        rlt = self.measure(q_machine=self.qm)
        return rlt


input_x = tensor.QTensor([[0.1, 0.2, 0.3]], requires_grad=True).toGPU(pyvqnet.DEV_GPU_0+get_rank())

input_x = tensor.broadcast_to(input_x, [2, 3])

input_x.requires_grad = True

quantum_model = QModel(num_wires=qubit,
                    dtype=pyvqnet.kcomplex64,
                    grad_mode="adjoint").toGPU(pyvqnet.DEV_GPU_0+get_rank())

adjoint_model = DistQuantumLayerAdjoint(quantum_model)
adjoint_model.train()

batch_y = adjoint_model(input_x)
batch_y.backward()

print(batch_y)
# mpirun -n 2 python test.py

DistQuantumLayerAdjoint

class pyvqnet.distributed.qubits_reorder.DistQuantumLayerAdjoint(vqc_module, name)

使用伴随矩阵方式对比特并行计算中的参数进行梯度计算的DistQuantumLayer层

参数:
  • vqc_module – 输入的蕴含 DistributeQMachine 模块。

  • name – 模块名称。

备注

输入的vqc_module模块必须包含 DistributeQMachine, 基于 DistributeQMachine 进行比特并行下的adjoint反传梯度计算。

警告

该接口只支持在Linux下运行;

Examples:

from pyvqnet.distributed import get_rank
from pyvqnet import tensor
from pyvqnet.qnn.vqc import rx, ry, cnot, MeasureAll,rz
import pyvqnet
from pyvqnet.distributed.qubits_reorder import DistributeQMachine,DistQuantumLayerAdjoint
pyvqnet.utils.set_random_seed(123)


qubit = 10
batch_size = 5

class QModel(pyvqnet.nn.Module):
    def __init__(self, num_wires, dtype, grad_mode=""):
        super(QModel, self).__init__()

        self._num_wires = num_wires
        self._dtype = dtype
        self.qm = DistributeQMachine(num_wires, dtype=dtype, grad_mode=grad_mode)

        self.qm.set_just_defined(True)
        self.qm.set_save_op_history_flag(True) # open save op
        self.qm.set_qr_config({"qubit": num_wires, # open qubit reordered, set config
                                    "global_qubit": 1}) # global_qubit == log2(nproc)

        self.params = pyvqnet.nn.Parameter([qubit])

        self.measure = MeasureAll(obs={
            "X5":1.0
        })

    def forward(self, x, *args, **kwargs):
        self.qm.reset_states(x.shape[0])

        for i in range(qubit):
            rx(q_machine=self.qm, params=self.params[i], wires=i)
            ry(q_machine=self.qm, params=self.params[3], wires=i)
            rz(q_machine=self.qm, params=self.params[4], wires=i)
        cnot(q_machine=self.qm,  wires=[0, 1])
        rlt = self.measure(q_machine=self.qm)
        return rlt


input_x = tensor.QTensor([[0.1, 0.2, 0.3]], requires_grad=True).toGPU(pyvqnet.DEV_GPU_0+get_rank())

input_x = tensor.broadcast_to(input_x, [2, 3])

input_x.requires_grad = True

quantum_model = QModel(num_wires=qubit,
                    dtype=pyvqnet.kcomplex64,
                    grad_mode="adjoint").toGPU(pyvqnet.DEV_GPU_0+get_rank())

adjoint_model = DistQuantumLayerAdjoint(quantum_model)
adjoint_model.train()

batch_y = adjoint_model(input_x)
batch_y.backward()

print(batch_y)
# mpirun -n 2 python test.py