Skip to content

量子云计算

本教程介绍如何连接到本源量子的量子云平台,使用 pyqpanda3.qcloud 模块将量子程序提交到真实量子处理器和云模拟器,跟踪任务状态,以及获取结果。

前置知识: 模拟 -- 在进行云端执行之前,你应该已经熟悉构建量子程序并在本地模拟器上运行。


目录


1. 量子云计算简介

1.1 为什么要使用量子云计算

本地模拟器非常适合学习、调试和开发算法。然而,它们面临着根本性的限制:

  • 模拟 n 个量子比特所需的内存以 O(2n) 的速度增长,在典型工作站上模拟超过大约 30 个量子比特是不切实际的。
  • 模拟器模拟的是理想的量子行为,这与受噪声、退相干和门错误影响的真实量子处理器存在显著差异。

量子云计算解决了这两个问题:

方面本地模拟器量子云计算
量子比特数量受 RAM 限制(通常约 25-30 个量子比特)在真实硬件上可达 72+ 个量子比特
噪声模型人为设定的,用户指定的真实的,物理设备固有的
速度小型线路速度快,指数级扩展在真实 QPU 上任何线路深度均为常数
成本免费消耗云端积分或订阅
可用性始终可用受排队时间和维护影响
保真度默认完美(无噪声)受硬件质量限制

云平台还提供云托管模拟器,能够处理比本地机器更大的线路,以及用于全振幅模拟、部分振幅计算和期望值估计的专用后端。

1.2 本源量子云平台概览

本源量子云平台提供对以下几类量子后端的访问:

  • 真实量子处理器(QPU) -- 具有 6 到 72+ 个量子比特的超导量子芯片。这些是在实际量子硬件上执行你的线路的物理设备。
  • 全振幅模拟器 -- 云托管的态矢量模拟器,计算所有 2n 个振幅。适用于验证和算法开发。
  • 部分振幅模拟器 -- 仅计算态矢量的特定振幅,能够模拟更大的线路。
  • 专用后端 -- 用于态层析、期望值计算和其他分析任务的后端。

通过 pyqpanda3.qcloud 模块访问云平台,该模块提供了用于身份验证、后端发现、任务提交和结果获取的 Python 接口。


2. QCloudService — 连接云平台

QCloudService 类是与本源量子云平台交互的主要入口。它处理身份验证、后端发现和日志配置。

2.1 创建服务实例

要创建 QCloudService,你需要一个 API 密钥。可选的 url 参数默认为本源量子云服务器。

python
from pyqpanda3.qcloud import QCloudService

# Connect using the default OriginQ cloud URL
service = QCloudService("your_api_key_here")

# Or specify a custom URL (e.g., for a private cloud deployment)
# service = QCloudService("your_api_key_here", url="https://custom-cloud.example.com")

API 密钥是一个用于验证你账户的字符串。你可以在本源量子云平台上注册来获取。请像对待密码一样保护你的 API 密钥 -- 不要分享它或将它提交到版本控制中。

推荐的做法是将 API 密钥存储在环境变量中:

python
import os
from pyqpanda3.qcloud import QCloudService

# Read the API key from an environment variable
api_key = os.environ["QPANDA3_API_KEY"]
service = QCloudService(api_key)

然后在运行脚本之前设置环境变量:

bash
export QPANDA3_API_KEY="your_api_key_here"
python your_script.py

2.2 配置日志

云服务支持可配置的日志功能,用于调试和监控。使用 setup_logging 来启用它:

python
from pyqpanda3.qcloud import QCloudService, LogOutput, LogLevel

service = QCloudService("your_api_key_here")

# Log to the console (default)
service.setup_logging(output=LogOutput.CONSOLE)

# Log to a file instead
service.setup_logging(output=LogOutput.FILE, file_path="cloud_debug.log")

setup_logging 方法接受两个参数:

参数类型默认值描述
outputLogOutputLogOutput.CONSOLE日志输出目标
file_pathstr""output=LogOutput.FILE 时的文件路径

日志功能是可选的。如果你不调用 setup_logging,服务将静默运行。

2.3 列出可用后端

backends() 方法返回一个字典,键为后端名称,值为可用性标志:

python
from pyqpanda3.qcloud import QCloudService

service = QCloudService("your_api_key_here")
backends = service.backends()

print("可用后端:")
for name, available in backends.items():
    status = "可用" if available else "不可用"
    print(f"  - {name}: {status}")

示例输出:

可用后端:
  - origin_wukong: 可用
  - origin_72: 可用
  - full_amplitude: 可用
  - partial_amplitude: 不可用

可用后端取决于你的账户权限和云平台的当前状态。真实量子处理器在校准或维护期间可能暂时不可用。

2.4 获取特定后端

一旦你知道了后端的名称,可以使用 backend() 来获取它:

python
from pyqpanda3.qcloud import QCloudService

service = QCloudService("your_api_key_here")

# Get a real quantum processor backend
qpu_backend = service.backend("origin_wukong")

# Get a cloud simulator backend
sim_backend = service.backend("full_amplitude")

返回的 QCloudBackend 对象提供了提交任务、查询芯片信息和计算期望值的方法。我们将在下一节详细探讨。


3. QCloudBackend — 执行目标

QCloudBackend 代表一个特定的量子计算资源 -- 可以是真实量子处理器或云托管的模拟器。你通过 QCloudService.backend() 获取它。

3.1 后端概览

python
from pyqpanda3.qcloud import QCloudService

service = QCloudService("your_api_key_here")
backend = service.backend("origin_wukong")

# Get the backend name
print(f"Backend name: {backend.name()}")

3.2 运行量子程序

最简单的运行方式是使用 run() 方法,它接受一个 QProg 和一个 shot 数:

python
from pyqpanda3 import core
from pyqpanda3.qcloud import QCloudService

# Build a Bell state circuit
prog = core.QProg()
prog << core.H(0)
prog << core.CNOT(0, 1)
prog << core.measure([0, 1], [0, 1])

# Connect and submit
service = QCloudService("your_api_key_here")
backend = service.backend("full_amplitude")

# Submit the job with 3000 shots
job = backend.run(prog, shots=3000)
print(f"Submitted job: {job.job_id()}")

run() 方法立即返回一个 QCloudJob 对象。实际执行在云服务器上异步进行。

3.3 带选项运行

为了更好地控制编译和执行,可以使用 QCloudOptions

python
from pyqpanda3 import core
from pyqpanda3.qcloud import QCloudService, QCloudOptions

prog = core.QProg()
prog << core.H(0)
prog << core.CNOT(0, 1)
prog << core.measure([0, 1], [0, 1])

service = QCloudService("your_api_key_here")
backend = service.backend("origin_wukong")

# Configure execution options
options = QCloudOptions()
options.set_amend(True)       # Enable amendment (error mitigation)
options.set_mapping(True)     # Enable qubit mapping
options.set_optimization(True) # Enable circuit optimization

# Submit with options
job = backend.run(prog, shots=3000, options=options)

QCloudOptions 类(在第 6 节详细介绍)让你能够控制服务器端的编译行为。

3.4 带噪声模型运行

当提交到云模拟器(而非真实 QPU)时,你可以指定噪声模型来模拟真实的硬件行为:

python
from pyqpanda3 import core
from pyqpanda3.qcloud import (
    QCloudService, QCloudNoiseModel, NOISE_MODEL
)

prog = core.QProg()
prog << core.H(0)
prog << core.CNOT(0, 1)
prog << core.measure([0, 1], [0, 1])

service = QCloudService("your_api_key_here")
backend = service.backend("full_amplitude")

# Create a depolarizing noise model
noise = QCloudNoiseModel(
    NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR,
    [0.001],   # single-qubit gate noise parameter
    [0.01]     # two-qubit gate noise parameter
)

# Submit with noise model
job = backend.run(prog, shots=3000, model=noise)

请注意,噪声模型仅对云模拟器有意义。真实量子处理器本身就有固有噪声 -- 你无法在之上再添加软件噪声模型。

3.5 芯片信息

对于真实 QPU 后端,你可以查询芯片物理特性的详细信息:

python
from pyqpanda3.qcloud import QCloudService

service = QCloudService("your_api_key_here")
backend = service.backend("origin_wukong")

# Get chip information
chip_info = backend.chip_info()

# Number of qubits on the chip
print(f"Qubits: {chip_info.qubits_num()}")

# Chip identifier
print(f"Chip ID: {chip_info.chip_id()}")

# Available qubits (not all physical qubits may be usable)
available = chip_info.available_qubits()
print(f"Available qubits: {available}")

# High-frequency qubits
hf_qubits = chip_info.high_frequency_qubits()
print(f"High-frequency qubits: {hf_qubits}")

# Basic gate set supported by this chip
gates = chip_info.get_basic_gates()
print(f"Basic gates: {gates}")

# Chip topology (connectivity between qubits)
topology = chip_info.get_chip_topology()
print(f"Topology edges ({len(topology)} connections):")
for edge in topology[:5]:  # Show first 5 edges
    print(f"  {edge[0]} <-> {edge[1]}")

芯片上的每个量子比特都有各自的特性:

python
# Get per-qubit information
single_info_list = chip_info.single_qubit_info()
for info in single_info_list[:3]:  # Show first 3 qubits
    print(f"Qubit {info.get_qubit_id()}:")
    print(f"  T1 (relaxation): {info.get_t1():.2f} us")
    print(f"  T2 (coherence):  {info.get_t2():.2f} us")
    print(f"  Gate fidelity:   {info.get_single_gate_fidelity():.4f}")
    print(f"  Readout fidelity: {info.get_readout_fidelity():.4f}")
    print(f"  Frequency:       {info.get_frequency():.2f} GHz")

双量子比特门信息同样可以获取:

python
# Get two-qubit gate information
double_info_list = chip_info.double_qubits_info()
for info in double_info_list[:3]:
    qubits = info.get_qubits()
    print(f"Qubits ({qubits[0]}, {qubits[1]}): "
          f"fidelity = {info.get_fidelity():.4f}")

这些信息对于以下方面很有价值:

  • 选择线路中使用哪些量子比特
  • 了解连通性约束(哪些量子比特对支持双量子比特门)
  • 估算线路在真实硬件上的预期保真度

3.6 期望值

云后端可以直接计算哈密顿量或泡利算符的期望值,无需手动解析测量结果:

python
from pyqpanda3 import core
from pyqpanda3.qcloud import QCloudService, QCloudOptions
from pyqpanda3.hamiltonian import Hamiltonian, PauliOperator

# Build a circuit
prog = core.QProg()
prog << core.X(0)
prog << core.CNOT(0, 1)
prog << core.SWAP(1, 2)

# Define a Hamiltonian
hamiltonian = Hamiltonian({"X0 Y1": 1.1, "X0 Z1 I2": 2.1, "I1 Z2": 3.1})

# Or equivalently, a Pauli operator
pauli_op = PauliOperator({"X0 Y1": 1.1, "X0 Z1 I2": 2.1, "I1 Z2": 3.1})

service = QCloudService("your_api_key_here")

# Compute expectation on a real QPU
backend = service.backend("origin_wukong")
expval = backend.expval_hamiltonian(prog, hamiltonian, QCloudOptions())
print(f"Hamiltonian expectation (QPU): {expval}")

# Compute on cloud simulator
sim_backend = service.backend("full_amplitude")
expval_sim = sim_backend.expval_hamiltonian(prog, hamiltonian)
print(f"Hamiltonian expectation (sim): {expval_sim}")

# Pauli operator expectation
expval_pauli = backend.expval_pauli_operator(prog, pauli_op, QCloudOptions())
print(f"Pauli operator expectation: {expval_pauli}")

expval_hamiltonianexpval_pauli_operator 方法也接受 shots 参数和可选的噪声模型:

python
# With explicit shots and noise model on simulator
from pyqpanda3.qcloud import QCloudNoiseModel, NOISE_MODEL

noise = QCloudNoiseModel(NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR, [0.001], [0.01])
expval = sim_backend.expval_hamiltonian(prog, hamiltonian, shots=5000, noise_model=noise)

4. QCloudJob — 跟踪执行

将量子程序提交到云端后,你会收到一个 QCloudJob 对象,它代表异步计算。任务在完成之前会经历多个状态。

4.1 任务生命周期

当你提交一个任务时,它会经历以下状态:

状态描述
QUEUING任务已被接收,正在执行队列中等待
WAITING任务正在等待被分配到计算资源
COMPUTING任务正在量子处理器或模拟器上执行
FINISHED任务已成功完成,结果可用
FAILED任务遇到错误,未能完成

4.2 创建任务引用

QCloudJob 可以通过两种方式创建:

从提交中创建 -- backend.run() 方法返回一个 QCloudJob

python
job = backend.run(prog, shots=3000)

从任务 ID 创建 -- 如果你保存了之前会话中的任务 ID,可以重建任务引用:

python
from pyqpanda3.qcloud import QCloudJob

# Reconstruct a job reference from a known job ID
job = QCloudJob("ABC123DEF456")

当你提交了一个长时间运行的任务,并想在新的 Python 会话中检查其状态时,这很有用。

4.3 检查任务状态

使用 status() 方法来检查任务的当前状态:

python
from pyqpanda3.qcloud import JobStatus

status = job.status()
print(f"Current status: {status}")

# Compare against known states
if status == JobStatus.FINISHED:
    print("Job completed successfully!")
elif status == JobStatus.FAILED:
    print("Job failed. Check error messages.")
elif status == JobStatus.QUEUING:
    print("Job is in the queue. Please wait.")
elif status == JobStatus.COMPUTING:
    print("Job is currently executing.")
elif status == JobStatus.WAITING:
    print("Job is waiting to be processed.")

4.4 查询任务信息

query() 方法在不获取完整结果的情况下检索任务的详细信息。它返回一个包含元数据的 QCloudResult

python
result_info = job.query()
print(f"Job ID: {result_info.job_id()}")
print(f"Job status: {result_info.job_status()}")

如果任务失败了,你可以获取错误信息:

python
if result_info.job_status() == JobStatus.FAILED:
    print(f"Error: {result_info.error_message()}")

4.5 获取任务结果

result() 方法会阻塞,直到任务完成(或服务器报告最终状态),然后返回一个 QCloudResult

python
# Wait for completion and get results
result = job.result()

# Access the data
probs = result.get_probs()
counts = result.get_counts()

print(f"Probabilities: {probs}")
print(f"Counts: {counts}")

如果任务尚未完成,result() 可能返回一个状态非 FINISHED 的结果。务必检查状态:

python
result = job.result()
if result.job_status() == JobStatus.FINISHED:
    counts = result.get_counts()
    print(f"Measurement counts: {counts}")
else:
    print(f"Job not finished. Status: {result.job_status()}")

5. QCloudResult — 处理输出

QCloudResult 类提供了多种方式来访问已完成的云端任务的输出。

5.1 获取概率

get_probs() 方法返回测量结果的概率分布估计:

python
from pyqpanda3.qcloud import DataBase

result = job.result()

# Get probabilities as binary strings (default)
probs_binary = result.get_probs(base=DataBase.Binary)
print("Probabilities (binary):", probs_binary)
# Example: {'00': 0.4987, '11': 0.5013}

# Get probabilities as hexadecimal strings
probs_hex = result.get_probs(base=DataBase.Hex)
print("Probabilities (hex):", probs_hex)
# Example: {'0x0': 0.4987, '0x3': 0.5013}

对于批量提交(多个线路),使用 get_probs_list(),它返回概率字典的列表:

python
# When running multiple programs in a batch
probs_list = result.get_probs_list()
for i, probs in enumerate(probs_list):
    print(f"Circuit {i} probabilities: {probs}")

5.2 获取测量计数

get_counts() 方法返回每次 shot 的原始测量计数:

python
from pyqpanda3.qcloud import DataBase

result = job.result()

# Counts as binary strings (default)
counts = result.get_counts(base=DataBase.Binary)
print("Counts (binary):", counts)
# Example: {'00': 1496, '11': 1504}

# Counts as hexadecimal strings
counts_hex = result.get_counts(base=DataBase.Hex)
print("Counts (hex):", counts_hex)
# Example: {'0x0': 1496, '0x3': 1504}

类似地,get_counts_list() 返回批量提交的结果(仅当使用批量 run() 提交多个线路时可用):

python
counts_list = result.get_counts_list(base=DataBase.Binary)
for i, counts in enumerate(counts_list):
    total = sum(counts.values())
    print(f"Circuit {i}: {total} total shots, counts = {counts}")

5.3 访问原始数据

origin_data() 方法返回来自云服务器的原始 JSON 响应。这对于调试或访问 Python API 未公开的字段很有用:

python
raw_json = result.origin_data()
print("Raw server response:")
print(raw_json)

5.4 其他结果方法

QCloudResult 类还提供了几个用于专用输出类型的方法:

python
result = job.result()

# Job ID from the result
job_id = result.job_id()
print(f"Job ID: {job_id}")

# Job status from the result
status = result.job_status()
print(f"Status: {status}")

# Error message (if job failed)
error = result.error_message()
if error:
    print(f"Error: {error}")

# Amplitudes (for partial amplitude backend)
amplitudes = result.get_amplitudes()
print(f"Amplitudes: {amplitudes}")

# State fidelity (for state tomography backend)
fidelity = result.get_state_fidelity()
print(f"State fidelity: {fidelity}")

# State tomography density matrix (returns list[list[complex]])
density = result.get_state_tomography_density()
print(f"Density matrix rows: {len(density)}, cols: {len(density[0])}")

并非所有方法都适用于所有任务类型。例如,get_state_fidelity()get_state_tomography_density() 仅在使用 run_quantum_state_tomography() 提交任务时可用。同样,get_amplitudes() 仅对部分振幅任务可用。


6. QCloudOptions — 配置执行

QCloudOptions 类提供了对云服务器如何编译和执行你的线路的细粒度控制。

6.1 创建选项

python
from pyqpanda3.qcloud import QCloudOptions

# Create with all defaults
options = QCloudOptions()

# Print current settings
options.print()

6.2 编译选项

当提交到真实 QPU 时,云服务器会执行多个编译步骤。你可以分别启用或禁用每个步骤:

python
from pyqpanda3.qcloud import QCloudOptions

options = QCloudOptions()

# Amendment: applies error mitigation based on calibration data
options.set_amend(True)

# Mapping: maps logical qubits to physical qubits based on chip topology
options.set_mapping(True)

# Optimization: simplifies the circuit (gate cancellation, decomposition)
options.set_optimization(True)

# Point label: specifies the calibration point to use
options.set_point_label(0)

# Specified block: restricts execution to a specific block on the chip
options.set_specified_block([1])

# Probability vs counts: controls the output format
options.set_is_prob_counts(True)

查询当前设置:

python
print(f"Amendment: {options.is_amend()}")
print(f"Mapping: {options.is_mapping()}")
print(f"Optimization: {options.is_optimization()}")
print(f"Point label: {options.point_label()}")
print(f"Is prob counts: {options.is_prob_counts()}")
print(f"Specified block: {options.specified_block()}")

6.3 自定义选项

对于高级用例,你可以设置任意键值对选项:

python
from pyqpanda3.qcloud import QCloudOptions

options = QCloudOptions()

# Set custom options of various types
options.set_custom_option("repetitions", 10)         # int
options.set_custom_option("threshold", 0.95)          # double
options.set_custom_option("label", "experiment_1")    # string
options.set_custom_option("verbose", True)            # bool

# Check if a custom option exists
if options.has_custom_option("threshold"):
    print("threshold exists")

# Get all custom options as a dictionary
all_options = options.get_custom_options()
print(f"All custom options: {all_options}")
print(f"Threshold: {all_options['threshold']}")
print(f"Repetitions: {all_options['repetitions']}")
print(f"Label: {all_options['label']}")
print(f"Verbose: {all_options['verbose']}")

7. QCloudNoiseModel — 云端噪声模拟

在云模拟器上运行时,你可以附加噪声模型来模拟真实量子处理器的行为。QCloudNoiseModel 类定义了噪声模型,分别为单量子比特门和双量子比特门设置独立的参数。

7.1 创建噪声模型

python
from pyqpanda3.qcloud import QCloudNoiseModel, NOISE_MODEL

# Default constructor -- no noise (disabled)
clean_model = QCloudNoiseModel()
print(f"Clean model enabled: {clean_model.is_enabled()}")
# Output: Clean model enabled: False

# Construct with a specific noise model and parameters
noisy_model = QCloudNoiseModel(
    NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR,
    [0.001],   # single-qubit gate error rate
    [0.01]     # two-qubit gate error rate
)
print(f"Noisy model enabled: {noisy_model.is_enabled()}")
# Output: Noisy model enabled: True

构造函数接受三个参数:

参数类型描述
modelNOISE_MODEL噪声信道类型
single_plist[float]单量子比特门的噪声参数
double_plist[float]双量子比特门的噪声参数

7.2 检查和修改参数

python
from pyqpanda3.qcloud import QCloudNoiseModel, NOISE_MODEL

noise = QCloudNoiseModel(
    NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR,
    [0.001],
    [0.01]
)

# Get the noise model type
model_type = noise.get_noise_model()
print(f"Noise model: {model_type}")

# Get current parameters
single_params = noise.get_single_params()
double_params = noise.get_double_params()
print(f"Single-qubit params: {single_params}")
print(f"Two-qubit params: {double_params}")

# Update parameters
noise.set_single_params([0.002])
noise.set_double_params([0.02])
print(f"Updated single params: {noise.get_single_params()}")

# Print a human-readable summary
noise.print()

7.3 比较噪声模型

两个噪声模型可以进行相等和不相等比较:

python
from pyqpanda3.qcloud import QCloudNoiseModel, NOISE_MODEL

model_a = QCloudNoiseModel(
    NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR,
    [0.001], [0.01]
)

model_b = QCloudNoiseModel(
    NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR,
    [0.001], [0.01]
)

model_c = QCloudNoiseModel(
    NOISE_MODEL.BITFLIP_KRAUS_OPERATOR,
    [0.005], [0.05]
)

print(f"model_a == model_b: {model_a == model_b}")   # True
print(f"model_a != model_c: {model_a != model_c}")   # True

8. 枚举参考

qcloud 模块定义了几个控制任务行为和数据格式的枚举。

8.1 JobStatus

表示云任务的生命周期状态。

描述
JobStatus.WAITING任务正在等待被处理
JobStatus.COMPUTING任务正在执行中
JobStatus.FINISHED任务已成功完成
JobStatus.FAILED任务失败,出现错误
JobStatus.QUEUING任务在提交队列中
python
from pyqpanda3.qcloud import JobStatus

# Status values can be compared directly
status = job.status()
if status == JobStatus.FINISHED:
    result = job.result()

8.2 DataFormat

指定量子线路发送到云端时的编码方式。

描述
DataFormat.DEFAULT默认格式,取决于后端实现
DataFormat.BINARY紧凑的二进制格式,用于高效传输
DataFormat.INSTRUCTION_SET显式的门指令序列

8.3 DataBase

控制测量结果在结果中的表示方式。

描述
DataBase.Binary二进制表示(例如 "00""11"
DataBase.Hex十六进制表示(例如 "0x0""0x3"
python
from pyqpanda3.qcloud import DataBase

result = job.result()

# Binary representation (human-readable)
probs_bin = result.get_probs(base=DataBase.Binary)
# {'00': 0.4987, '11': 0.5013}

# Hexadecimal representation (compact for many qubits)
probs_hex = result.get_probs(base=DataBase.Hex)
# {'0x0': 0.4987, '0x3': 0.5013}

对于具有多个量子比特的线路,十六进制表示更加紧凑。一个 10 量子比特的结果在二进制下需要 10 个字符("0010110101"),而在十六进制下只需要 4 个字符("0xB5")。

8.4 NOISE_MODEL

定义云端噪声模拟的噪声信道类型。

描述
NOISE_MODEL.BITFLIP_KRAUS_OPERATOR随机 X(比特翻转)错误
NOISE_MODEL.BIT_PHASE_FLIP_OPERATOR比特和相位翻转的组合错误
NOISE_MODEL.DAMPING_KRAUS_OPERATOR振幅阻尼(T1 衰减)
NOISE_MODEL.DECOHERENCE_KRAUS_OPERATORT1 和 T2 退相干的组合
NOISE_MODEL.DEPHASING_KRAUS_OPERATOR相位阻尼(T2 衰减)
NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR均匀去极化噪声
NOISE_MODEL.PHASE_DAMPING_OPERATOR纯退相位错误
python
from pyqpanda3.qcloud import NOISE_MODEL, QCloudNoiseModel

# Common choice: depolarizing noise (uniform error model)
noise = QCloudNoiseModel(
    NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR,
    [0.001],  # 0.1% error on single-qubit gates
    [0.01]    # 1% error on two-qubit gates
)

# More realistic: decoherence model (T1 + T2)
decoherence_noise = QCloudNoiseModel(
    NOISE_MODEL.DECOHERENCE_KRAUS_OPERATOR,
    [0.0005],  # single-qubit T1/T2 parameters
    [0.005]    # two-qubit T1/T2 parameters
)

8.5 LogLevel 和 LogOutput

控制云服务日志的详细程度和输出目标。

LogLevel:

描述
LogLevel.CLOUD_INFO信息性消息
LogLevel.CLOUD_DEBUG详细的调试消息
LogLevel.CLOUD_WARNING警告消息
LogLevel.CLOUD_ERROR仅错误消息

LogOutput:

描述
LogOutput.CONSOLE将日志打印到标准输出
LogOutput.FILE将日志写入文件
python
from pyqpanda3.qcloud import QCloudService, LogOutput

service = QCloudService("your_api_key_here")

# Console logging for development
service.setup_logging(output=LogOutput.CONSOLE)

# File logging for production
service.setup_logging(output=LogOutput.FILE, file_path="/var/log/qcloud.log")

9. 实践工作流

本节通过完整的端到端示例演示如何使用云服务。

9.1 在云端运行 Bell 态的完整步骤

以下是在云模拟器上运行 Bell 态的完整工作流程,从初始化到结果分析。

python
import os
from pyqpanda3 import core
from pyqpanda3.qcloud import (
    QCloudService, QCloudOptions,
    LogOutput, JobStatus, DataBase
)

# ------------------------------------------------
# Step 1: Authenticate with the cloud platform
# ------------------------------------------------
api_key = os.environ["QPANDA3_API_KEY"]
service = QCloudService(api_key)
service.setup_logging(output=LogOutput.CONSOLE)

# ------------------------------------------------
# Step 2: Discover available backends
# ------------------------------------------------
backends = service.backends()
print(f"Available backends: {backends}")

# ------------------------------------------------
# Step 3: Select a backend
# ------------------------------------------------
# Use the full-amplitude simulator for this example
backend = service.backend("full_amplitude")

# ------------------------------------------------
# Step 4: Build the quantum circuit
# ------------------------------------------------
prog = core.QProg()
prog << core.H(0)                    # Superposition on qubit 0
prog << core.CNOT(0, 1)              # Entangle qubits 0 and 1
prog << core.measure([0, 1], [0, 1]) # Measure both qubits

print("Circuit:")
print(prog)

# ------------------------------------------------
# Step 5: Submit the job
# ------------------------------------------------
# Note: full_amplitude simulator uses run(prog, shots) or run(prog, shots, model)
# QCloudOptions is only for real QPU backends
job = backend.run(prog, shots=3000)
job_id = job.job_id()
print(f"Job submitted. ID: {job_id}")

# ------------------------------------------------
# Step 6: Monitor job status
# ------------------------------------------------
import time

while True:
    status = job.status()
    print(f"  Status: {status}")
    if status == JobStatus.FINISHED:
        break
    elif status == JobStatus.FAILED:
        print("Job failed!")
        result = job.result()
        print(f"Error: {result.error_message()}")
        exit(1)
    time.sleep(2)  # Wait before polling again

# ------------------------------------------------
# Step 7: Retrieve and analyze results
# ------------------------------------------------
result = job.result()

# Get probabilities
probs = result.get_probs(base=DataBase.Binary)
print("\nProbabilities:")
for state, prob in sorted(probs.items()):
    print(f"  |{state}>: {prob:.4f}")

# Get raw counts
counts = result.get_counts(base=DataBase.Binary)
print("\nMeasurement counts:")
for state, count in sorted(counts.items()):
    print(f"  |{state}>: {count}")

# Expected output for Bell state:
#   |00>: ~0.50
#   |11>: ~0.50
#   (01 and 10 should be absent or near zero)

9.2 监控长时间运行的任务

当提交到真实 QPU 时,任务可能在队列中花费大量时间。以下是一个健壮的监控模式:

python
import time
from pyqpanda3.qcloud import QCloudJob, JobStatus

# Reconstruct job reference from a saved ID
saved_job_id = "ABC123DEF456"
job = QCloudJob(saved_job_id)

# Poll with exponential backoff
wait_times = [2, 4, 8, 15, 30, 60]  # seconds
attempt = 0

while True:
    status = job.status()
    print(f"[{time.strftime('%H:%M:%S')}] Job {saved_job_id}: {status}")

    if status == JobStatus.FINISHED:
        result = job.result()
        counts = result.get_counts()
        print(f"Final counts: {counts}")
        break

    if status == JobStatus.FAILED:
        result = job.result()
        print(f"Job failed: {result.error_message()}")
        break

    # Wait with increasing delay
    wait = wait_times[min(attempt, len(wait_times) - 1)]
    attempt += 1
    time.sleep(wait)

9.3 云计算工作流图

下图展示了从身份验证到结果获取的完整工作流程:

9.4 任务生命周期图

下图展示了任务从提交到完成所经历的状态转换:


10. 高级模式

10.1 批量任务提交

你可以在一个批次中提交多个量子程序,这比逐个提交更高效:

python
from pyqpanda3 import core
from pyqpanda3.qcloud import QCloudService, QCloudOptions

service = QCloudService("your_api_key_here")
backend = service.backend("origin_wukong")

# Build multiple circuits
progs = []
for angle_idx in range(4):
    prog = core.QProg()
    prog << core.H(0)
    prog << core.RZ(1, angle_idx * 3.14159 / 4)
    prog << core.CNOT(0, 1)
    prog << core.measure([0, 1], [0, 1])
    progs.append(prog)

# Submit all at once
options = QCloudOptions()
options.set_amend(True)
job = backend.run(progs, shots=2000, options=options)

# Get results for all circuits
result = job.result()
probs_list = result.get_probs_list()
for i, probs in enumerate(probs_list):
    print(f"Circuit {i}: {probs}")

10.2 云端提交前的转译

当目标是真实 QPU 时,线路必须与芯片的原生门集和量子比特连通性兼容。虽然云服务器可以通过 QCloudOptions.set_mapping(True) 自动处理这一点,但你可能希望本地转译以获得更多控制:

python
import time
from pyqpanda3 import core
from pyqpanda3.qcloud import QCloudService, QCloudOptions
from pyqpanda3.transpilation import Transpiler

service = QCloudService("your_api_key_here")
backend = service.backend("origin_wukong")

# Get the chip's physical configuration
chip_info = backend.chip_info()
chip_backend = chip_info.get_chip_backend()

# Build a circuit
prog = core.QProg()
prog << core.H(0)
prog << core.CNOT(0, 1)
prog << core.CNOT(1, 2)
prog << core.SWAP(0, 2)
prog << core.measure([0, 1, 2], [0, 1, 2])

# Transpile to the chip's native gate set and topology
transpiler = Transpiler()
basic_gates = chip_info.get_basic_gates()  # e.g., ["U3", "CZ"]
topo = chip_info.get_chip_topology()

start = time.time()
transpiled_prog = transpiler.transpile(
    prog,
    chip_backend,
    {},       # compensate angle map
    2         # optimization level
)
elapsed = (time.time() - start) * 1000
print(f"Transpilation took {elapsed:.2f} ms")

# Convert to instruction format for submission
instruction_json = transpiled_prog.to_instruction(chip_backend)

# Submit the transpiled instruction directly
from pyqpanda3.qcloud import QCloudOptions
options = QCloudOptions()
options.set_amend(False)  # Already transpiled, no need for server-side mapping

job = backend.run_instruction(instruction_json, 3000, options)
result = job.result()
print(result.get_probs())

10.3 部分振幅和态层析

云平台支持标准线路模拟之外的专用执行模式。

部分振幅计算 -- 仅计算态矢量的特定振幅:

python
from pyqpanda3 import core
from pyqpanda3.qcloud import QCloudService

service = QCloudService("your_api_key_here")
backend = service.backend("partial_amplitude")

prog = core.QProg()
prog << core.H(0)
prog << core.H(1)
prog << core.H(2)
prog << core.CNOT(0, 1)
prog << core.CNOT(1, 2)

# Request specific amplitudes
job = backend.run(prog, ["000", "111"])
result = job.result()
amplitudes = result.get_amplitudes()
print(f"Amplitudes: {amplitudes}")

态层析 -- 通过测量重建完整的密度矩阵(get_state_tomography_density() 返回 list[list[complex]]):

python
from pyqpanda3 import core
from pyqpanda3.qcloud import QCloudService, QCloudOptions

service = QCloudService("your_api_key_here")
backend = service.backend("origin_wukong")

prog = core.QProg()
prog << core.H(0)
prog << core.CNOT(0, 1)

# Run state tomography
options = QCloudOptions()
job = backend.run_quantum_state_tomography(prog, 5000, options)
result = job.result()

fidelity = result.get_state_fidelity()
density = result.get_state_tomography_density()
print(f"State fidelity: {fidelity}")
print(f"Density matrix ({len(density)}x{len(density[0])}):")
for row in density:
    print(row)

11. 总结

本教程介绍了 pyqpanda3 中完整的云计算工作流程:

组件用途关键方法
QCloudService身份验证和发现后端.backends() / .backend() / .setup_logging()
QCloudBackend执行线路和查询芯片信息.run() / .chip_info() / .expval_hamiltonian()
QCloudJob跟踪和获取任务状态.status() / .result() / .query() / .job_id()
QCloudResult访问测量数据.get_probs() / .get_counts() / .origin_data()
QCloudOptions配置编译和执行.set_amend() / .set_mapping() / .set_optimization()
QCloudNoiseModel为云模拟器定义噪声使用 NOISE_MODEL 和参数的构造函数

关键要点:

  1. 先进行身份验证 -- 在做任何其他事情之前,使用你的 API 密钥创建 QCloudService
  2. 选择正确的后端 -- 使用云模拟器进行开发和验证;在需要真实量子硬件时使用真实 QPU。
  3. 任务是异步的 -- 提交后,轮询 job.status() 直到 FINISHEDFAILED
  4. 结果有多种格式 -- 使用 DataBase.Binary 获取人类可读的输出,使用 DataBase.Hex 获取紧凑表示。
  5. 噪声模型仅适用于模拟器 -- 真实硬件具有固有噪声;软件噪声模型仅适用于云模拟器。
  6. 本地转译以获得更多控制 -- 当你需要对线路编译进行细粒度控制时,使用 Transpiler 类和 ChipInfo

后续学习:


快速参考

python
from pyqpanda3 import core
from pyqpanda3.qcloud import (
    QCloudService, QCloudJob, QCloudOptions,
    QCloudNoiseModel, QCloudResult,
    JobStatus, DataBase, NOISE_MODEL,
    LogOutput, LogLevel,
)
import os

# ===========================
# Authentication
# ===========================
service = QCloudService(os.environ["QPANDA3_API_KEY"])
service.setup_logging(output=LogOutput.CONSOLE)

# ===========================
# Backend Discovery
# ===========================
backends = service.backends()           # dict[str, bool] 键为后端名,值为可用性
qpu_backend = service.backend("origin_wukong")  # Real QPU
sim_backend = service.backend("full_amplitude") # Cloud simulator

# ===========================
# Chip Information (QPU only)
# ===========================
chip_info = qpu_backend.chip_info()
print(f"Qubits: {chip_info.qubits_num()}")
print(f"Available: {chip_info.available_qubits()}")
print(f"Topology: {chip_info.get_chip_topology()}")

# ===========================
# Submit a Job
# ===========================
prog = core.QProg()
prog << core.H(0) << core.CNOT(0, 1)
prog << core.measure([0, 1], [0, 1])

# Simulator: run(prog, shots) or run(prog, shots, model)
job = sim_backend.run(prog, shots=3000)

# QPU: run(prog, shots, options)
options = QCloudOptions()
options.set_amend(True)
options.set_mapping(True)
job = qpu_backend.run(prog, shots=3000, options=options)

# Simulator with noise model
noise = QCloudNoiseModel(
    NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR,
    [0.001], [0.01]
)
job = sim_backend.run(prog, shots=3000, model=noise)

# ===========================
# Monitor Job
# ===========================
job_id = job.job_id()
status = job.status()

# Reconstruct from saved job ID
job = QCloudJob(job_id)

# ===========================
# Get Results
# ===========================
result = job.result()
probs = result.get_probs(base=DataBase.Binary)
counts = result.get_counts(base=DataBase.Binary)
raw = result.origin_data()

# ===========================
# Expectation Values
# ===========================
from pyqpanda3.hamiltonian import Hamiltonian
h = Hamiltonian({"X0 Y1": 1.0, "Z0 Z1": 0.5})
expval = qpu_backend.expval_hamiltonian(prog, h, QCloudOptions())

# ===========================
# Noise Model
# ===========================
noise = QCloudNoiseModel(
    NOISE_MODEL.DEPOLARIZING_KRAUS_OPERATOR,
    [0.001], [0.01]
)
noise.is_enabled()            # True
noise.get_noise_model()       # DEPOLARIZING_KRAUS_OPERATOR
noise.get_single_params()     # [0.001]
noise.get_double_params()     # [0.01]
noise.set_single_params([0.002])
noise.print()

Released under the MIT License.