pcb

Pauli Coloring Benchmark

Python 3.13 License Code style Documentation

How to do the thing?

Prep. work

  1. Install uv.

    curl -LsSf https://astral.sh/uv/install.sh | sh
    
  2. Install dependencies:

    uv sync
    
  3. Build binaries (requires gcc). This is only required if you want to use the degree_c reordering method.

    make dll
    
  4. Build the index. This is a SQlite3 file that lists all the Hamiltonian from all the hdf5 files present in the HamLib website.

    uv run python -m pcb build-index out/index.db
    
  5. Download the Hamiltonian ZIP files.

    uv run python -m pcb download out/index.db out/ham
    # it is also possible to apply a filter to only download files in a given subdirectory
    uv run python -m pcb download out/index.db out/ham --prefix binaryoptimization/maxcut
    

Reordering benchmark

  1. Run the benchmark.

    uv run python -m pcb benchmark-reorder out/index.db out/ham out/reordering
    # or
    uv run python -m pcb benchmark-reorder out/index.db out/ham out/reordering --prefix binaryoptimization/maxcut --n-trials 1 --n-jobs 200 --methods none,saturation,misra_gries --min-qubits 32 --max-qubits 127 --min-terms 16 --max-terms 256
    
  2. You can obtain temporary consolidated results during the benchmark by running:

    uv run python -m pcb consolidate out/reordering
    

    This is automatically done at the end of the benchmark.

More help

uv run python -m pcb --help

Changelog

 1"""
 2.. include:: ../README.md
 3.. include:: ../CHANGELOG.md
 4"""
 5
 6from .benchmark import jid_to_json_path
 7from .hamlib import hid_to_file_key, open_hamiltonian_file
 8from .io import load, save
 9from .qaoa import qaoa
10from .qiskit import reorder_operator, to_evolution_gate, trim_qc
11from .reordering import reorder
12
13__all__ = [
14    "hid_to_file_key",
15    "jid_to_json_path",
16    "load",
17    "open_hamiltonian_file",
18    "qaoa",
19    "reorder_operator",
20    "reorder",
21    "save",
22    "to_evolution_gate",
23    "trim_qc",
24]
def hid_to_file_key(hid: str, ham_dir: str | pathlib.Path) -> tuple[pathlib.Path, str]:
48def hid_to_file_key(hid: str, ham_dir: str | Path) -> tuple[Path, str]:
49    """
50    Converts a Hamiltonian ID to a `.hdf5.zip` file path and a key inside the
51    HDF5 file.
52    """
53    p = Path(ham_dir) / ("__".join(hid.split("/")[:-1]) + ".hdf5.zip")
54    k = hid.split("/")[-1]
55    return p, k

Converts a Hamiltonian ID to a .hdf5.zip file path and a key inside the HDF5 file.

def jid_to_json_path(jid: str, output_dir: str | pathlib.Path) -> pathlib.Path:
 7def jid_to_json_path(jid: str, output_dir: str | Path) -> Path:
 8    """
 9    Converts a job ID to a JSON file path that looks like
10
11        output_dir / jobs / jid[:2] / jid[2:4] / jid.json
12
13    """
14    return (
15        Path(output_dir)
16        / "jobs"
17        / jid[:2]  # spread files in subdirs
18        / jid[2:4]
19        / f"{jid}.json"
20    )

Converts a job ID to a JSON file path that looks like

output_dir / jobs / jid[:2] / jid[2:4] / jid.json
def load(path: str | pathlib.Path) -> Any:
15def load(path: str | Path) -> Any:
16    """Inverse of `save`"""
17    path = Path(path)
18    path.parent.mkdir(parents=True, exist_ok=True)
19    extension = ".".join(path.name.split(".")[1:])
20    if extension == "qpy":
21        with path.open("rb") as fp:
22            return qpy.load(fp)[0]
23    if extension == "qpy.gz":
24        with gzip.open(path, "rb") as fp:
25            return qpy.load(fp)[0]
26    if extension == "hdf5":
27        with h5py.File(path, "r") as fp:
28            return {k: np.array(v) for k, v in fp.items()}
29    if extension == "json":
30        with path.open("r", encoding="utf-8") as fp:
31            return json.load(fp)
32    if extension == "csv":
33        return pd.read_csv(path)
34    raise ValueError(f"Unsupported file extension: {extension}")

Inverse of save

@contextmanager
def open_hamiltonian_file( path: str | pathlib.Path) -> Generator[h5py._hl.files.File, NoneType, NoneType]:
58@contextmanager
59def open_hamiltonian_file(
60    path: str | Path,
61) -> Generator[h5py.File, None, None]:
62    """
63    A Hamiltonian file is a ZIP archive containing a single HDF5 file, which
64    contains essentially a dict of byte strings, each representing a serialized
65    sparse Pauli operator. The serialization format looks like this (after
66    decoding):
67
68        >>> with open_hamiltonian_file("...") as fp:
69        >>>     k = list(fp.keys())[0]
70        >>>     print(fp[k][()].decode("utf-8"))
71        22.5 [] +
72        -0.5 [Z9 Z20] +
73        -0.5 [Z9 Z26] +
74        -0.5 [Z9 Z29] +
75        -0.5 [Z9 Z56] +
76
77    Warning:
78        Note the `[()]` when accessing a key!
79
80    """
81    with zipfile.ZipFile(path, mode="r") as fp:
82        if not (
83            len(fp.namelist()) == 1 and fp.namelist()[0].endswith(".hdf5")
84        ):
85            raise ValueError(
86                f"Expected exactly one .hdf5 file in archive {path}, found "
87                + ", ".join(fp.namelist())
88            )
89        name = fp.namelist()[0]
90        with fp.open(name, "r") as h5fp:
91            with h5py.File(h5fp, "r") as h5fp:
92                yield h5fp

A Hamiltonian file is a ZIP archive containing a single HDF5 file, which contains essentially a dict of byte strings, each representing a serialized sparse Pauli operator. The serialization format looks like this (after decoding):

>>> with open_hamiltonian_file("...") as fp:
>>>     k = list(fp.keys())[0]
>>>     print(fp[k][()].decode("utf-8"))
22.5 [] +
-0.5 [Z9 Z20] +
-0.5 [Z9 Z26] +
-0.5 [Z9 Z29] +
-0.5 [Z9 Z56] +
Warning:

Note the [()] when accessing a key!

def qaoa( operator: qiskit.quantum_info.operators.symplectic.sparse_pauli_op.SparsePauliOp, backend: qiskit.providers.backend.BackendV2, estimator: qiskit_ibm_runtime.estimator.EstimatorV2 | qiskit_aer.primitives.estimator_v2.EstimatorV2, cost_qc: qiskit.circuit.quantumcircuit.QuantumCircuit | None = None, n_qaoa_steps: int = 2, pm_optimization_level: int = 3, max_iter: int = 128) -> tuple[tuple[numpy.ndarray, float], tuple[numpy.ndarray, numpy.ndarray], list[dict]]:
 62def qaoa(
 63    operator: SparsePauliOp,
 64    backend: Backend,
 65    estimator: IBMEstimator | AerEstimator,
 66    cost_qc: QuantumCircuit | None = None,
 67    n_qaoa_steps: int = 2,
 68    pm_optimization_level: int = 3,
 69    max_iter: int = 128,
 70) -> tuple[
 71    tuple[np.ndarray, float], tuple[np.ndarray, np.ndarray], list[dict]
 72]:
 73    """
 74    Runs QAOA on the given operator using the given backend. The Trotterization
 75    of the cost operator can be given explicitely as a `QuantumCircuit`.
 76    Otherwise, it is automatically Trotterized.
 77
 78    This method submits PUBs in batches, meaning that at every iteration,
 79    `batch_size` parameters are tested.
 80
 81    Warning:
 82        This method tries to *MAXIMIZE* the energy of the operator. If you want
 83        to minimize, flip the sign of the weights in `operator`.
 84
 85    Args:
 86        operator (SparsePauliOp): The operator whose energy to maximize
 87        backend (Backend):
 88        estimator (IBMEstimator | AerEstimator):
 89        cost_qc (QuantumCircuit | None, optional):
 90        n_qaoa_steps (int, optional):
 91        pm_optimization_level (int, optional):
 92
 93    Returns:
 94        1. A tuple containing the optimal parameters and the optimal energy.
 95        2. A tuple containing array of all parameters that were tried and the
 96           array of all resulting energies.
 97        3. A list of dict decribing the results of each job. An element looks
 98           like:
 99
100            {
101                "energy": 0.006450488764709524,
102                "ibmq_job_id": "3c94c73c-385b-4e71-8675-e3e1ad76aa4e",
103                "ibmq_session_id": "cz4h80039f40008scarg",
104                "batch": 3,
105            }
106    """
107    ansatz = QAOAAnsatz(
108        cost_operator=cost_qc if cost_qc is not None else operator,
109        reps=n_qaoa_steps,
110    )
111    pm = generate_preset_pass_manager(
112        target=backend.target, optimization_level=pm_optimization_level
113    )
114    ansatz_isa = pm.run(ansatz)
115    operator_isa = operator.apply_layout(ansatz_isa.layout)
116    logging.debug(
117        "ISA ansatz depth/size: {}/{}",
118        ansatz_isa.depth(),
119        ansatz_isa.size(),
120    )
121    x0 = np.array(
122        ([np.pi / 2] * (len(ansatz_isa.parameters) // 2))  # β's
123        + ([np.pi] * (len(ansatz_isa.parameters) // 2))  # γ's
124    )
125    bounds = (
126        ([(0, np.pi)] * (len(ansatz_isa.parameters) // 2))  # β's
127        + ([(0, 2 * np.pi)] * (len(ansatz_isa.parameters) // 2))  # γ's
128    )
129    records: list[tuple[np.ndarray, float, dict]] = []
130    minimize(
131        lambda *args: -1 * _energy(*args),  # energy maximization
132        x0=x0,
133        args=(ansatz_isa, operator_isa, estimator, records),
134        method="cobyla",
135        bounds=bounds,
136        options={"maxiter": max_iter},
137    )
138    all_x = np.stack([x for x, _, _ in records])
139    all_e = np.array([e for _, e, _ in records])
140    results = [
141        {"energy": float(e), "step": i, **m}
142        for i, (_, e, m) in enumerate(records)
143    ]
144    j = all_e.argmax()
145    return (all_x[j], all_e[j]), (all_x, all_e), results

Runs QAOA on the given operator using the given backend. The Trotterization of the cost operator can be given explicitely as a QuantumCircuit. Otherwise, it is automatically Trotterized.

This method submits PUBs in batches, meaning that at every iteration, batch_size parameters are tested.

Warning:

This method tries to MAXIMIZE the energy of the operator. If you want to minimize, flip the sign of the weights in operator.

Arguments:
  • operator (SparsePauliOp): The operator whose energy to maximize
  • backend (Backend):
  • estimator (IBMEstimator | AerEstimator):
  • cost_qc (QuantumCircuit | None, optional):
  • n_qaoa_steps (int, optional):
  • pm_optimization_level (int, optional):
Returns:
  1. A tuple containing the optimal parameters and the optimal energy.
  2. A tuple containing array of all parameters that were tried and the array of all resulting energies.
  3. A list of dict decribing the results of each job. An element looks like:

    { "energy": 0.006450488764709524, "ibmq_job_id": "3c94c73c-385b-4e71-8675-e3e1ad76aa4e", "ibmq_session_id": "cz4h80039f40008scarg", "batch": 3, }

def reorder_operator( operator: qiskit.quantum_info.operators.symplectic.sparse_pauli_op.SparsePauliOp, term_indices: numpy.ndarray) -> qiskit.quantum_info.operators.symplectic.sparse_pauli_op.SparsePauliOp:
58def reorder_operator(
59    operator: SparsePauliOp, term_indices: np.ndarray
60) -> SparsePauliOp:
61    """
62    Changes the order of the terms in the operator given a term index vector,
63    which is just a permutation of `[0, 1, ..., len(operator) - 1]`.
64    """
65    terms = operator.to_sparse_list()
66    return SparsePauliOp.from_sparse_list(
67        [terms[i] for i in term_indices], num_qubits=operator.num_qubits
68    )

Changes the order of the terms in the operator given a term index vector, which is just a permutation of [0, 1, ..., len(operator) - 1].

def reorder( gate: qiskit.circuit.library.pauli_evolution.PauliEvolutionGate, method: Literal['degree_c', 'degree', 'misra_gries', 'saturation', 'simplicial']) -> tuple[qiskit.circuit.library.pauli_evolution.PauliEvolutionGate, dict[int, list[int]], numpy.ndarray]:
27def reorder(
28    gate: PauliEvolutionGate,
29    method: Literal[
30        "degree_c",
31        "degree",
32        "misra_gries",
33        "saturation",
34        "simplicial",
35    ],
36) -> tuple[PauliEvolutionGate, Coloring, np.ndarray]:
37    """
38    Applies Pauli coloring to reorder the Pauli terms in the underlying operator
39    of the gate.
40
41    The supported coloring methods are:
42    * `degree_c`: C implementation of the `degree` method
43    * `degree`
44    * `misra_gries`: **Only for Ising or transverse-Ising Hamiltonians**
45    * `saturation`
46    * `simplicial`: **Only for 3SAT Hamiltonians**
47
48    Returns the reordered gate and the coloring, which is a `dict[int,
49    set[int]]` that maps a color to the let of all term (indices) with that
50    color.
51    """
52    methods: dict[
53        str,
54        Callable[
55            [PauliEvolutionGate],
56            tuple[PauliEvolutionGate, Coloring, np.ndarray],
57        ],
58    ] = {
59        "degree_c": degree_reordering_c,
60        "degree": degree_reordering,
61        "misra_gries": misra_gries_reordering,
62        "saturation": saturation_reordering,
63        "simplicial": simplicial_reordering,
64    }
65    return methods[method](gate)

Applies Pauli coloring to reorder the Pauli terms in the underlying operator of the gate.

The supported coloring methods are:

  • degree_c: C implementation of the degree method
  • degree
  • misra_gries: Only for Ising or transverse-Ising Hamiltonians
  • saturation
  • simplicial: Only for 3SAT Hamiltonians

Returns the reordered gate and the coloring, which is a dict[int, set[int]] that maps a color to the let of all term (indices) with that color.

def save(obj: Any, path: str | pathlib.Path) -> None:
37def save(obj: Any, path: str | Path) -> None:
38    """
39    Saves an object following the file extension:
40    - `.qpy`: `QuantumCircuit`
41    - `.qpy.gz`: `QuantumCircuit`
42    - `.hdf5`: `dict[np.ndarray]`
43    - `.json`
44    - `.csv`: `pd.DataFrame`
45    """
46    path = Path(path)
47    path.parent.mkdir(parents=True, exist_ok=True)
48    extension = ".".join(path.name.split(".")[1:])
49    if extension == "qpy":
50        assert isinstance(obj, QuantumCircuit)
51        with path.open("wb") as fp:
52            qpy.dump(obj, fp)
53    elif extension == "qpy.gz":
54        assert isinstance(obj, QuantumCircuit)
55        with gzip.open(path, "wb") as fp:
56            qpy.dump(obj, fp)
57    elif extension == "hdf5":
58        assert isinstance(obj, dict)
59        assert all(isinstance(v, np.ndarray) for v in obj.values())
60        with h5py.File(path, "w") as fp:
61            for k, v in obj.items():
62                fp.create_dataset(
63                    k, data=v, compression="gzip", compression_opts=9
64                )
65    elif extension == "json":
66        with path.open("w", encoding="utf-8") as fp:
67            json.dump(obj, fp)
68    elif extension == "csv":
69        assert isinstance(obj, pd.DataFrame)
70        obj.to_csv(path)
71    else:
72        raise ValueError(f"Unsupported file extension: {extension}")

Saves an object following the file extension:

  • .qpy: QuantumCircuit
  • .qpy.gz: QuantumCircuit
  • .hdf5: dict[np.ndarray]
  • .json
  • .csv: pd.DataFrame
def to_evolution_gate( h: bytes | str, shuffle: bool = False, global_phase: float | numpy.complex128 | None = None) -> qiskit.circuit.library.pauli_evolution.PauliEvolutionGate:
15def to_evolution_gate(
16    h: bytes | str,
17    shuffle: bool = False,
18    global_phase: float | np.complex128 | None = None,
19) -> PauliEvolutionGate:
20    """
21    Converts a serialized sparse Pauli operator (in the format explained
22    `hamlib.open_hamiltonian`) to a qiskit
23    [`PauliEvolutionGate`](https://docs.quantum.ibm.com/api/qiskit/qiskit.circuit.library.PauliEvolutionGate)
24    object.
25
26    The underlying
27    [`SparsePauliOp`](https://docs.quantum.ibm.com/api/qiskit/qiskit.quantum_info.SparsePauliOp)
28    object can be retrived with `PauliEvolutionGate.operator`. The time
29    parameter is an abstract unbound parameter called `δt`.
30    """
31
32    def _m2t(w_str: str, p_str: str) -> tuple[str, list[int], np.complex128]:
33        """
34        Example: `"Z66 X81"` becomes `("ZX", [66, 81], 1.0)`.
35        """
36        w = np.complex128(w_str if w_str else 1.0)
37        if global_phase is not None:
38            w *= global_phase
39        return (
40            "".join(re.findall(r"[IXYZ]", p_str)),
41            [int(k) for k in re.findall(r"\d+", p_str)],
42            w,
43        )
44
45    h = h if isinstance(h, str) else h.decode("utf-8")
46    matches = re.findall(r"(\(?[+-.\dj][^\s]*) \[([^\]]+)\]", h)
47    if not matches:
48        raise ValueError("No terms found in Hamiltonian: " + h)
49    terms = [_m2t(*m) for m in matches]
50    if shuffle:
51        random.shuffle(terms)
52    n_qubits = max(max(t[1]) for t in terms) + 1
53    operator = SparsePauliOp.from_sparse_list(terms, n_qubits)
54    dt = Parameter("δt")
55    return PauliEvolutionGate(operator, dt)

Converts a serialized sparse Pauli operator (in the format explained hamlib.open_hamiltonian) to a qiskit PauliEvolutionGate object.

The underlying SparsePauliOp object can be retrived with PauliEvolutionGate.operator. The time parameter is an abstract unbound parameter called δt.

def trim_qc( qc: qiskit.circuit.quantumcircuit.QuantumCircuit, op: qiskit.quantum_info.operators.symplectic.sparse_pauli_op.SparsePauliOp | None = None) -> tuple[qiskit.circuit.quantumcircuit.QuantumCircuit, qiskit.quantum_info.operators.symplectic.sparse_pauli_op.SparsePauliOp | None]:
 81def trim_qc(
 82    qc: QuantumCircuit, op: SparsePauliOp | None = None
 83) -> tuple[QuantumCircuit, SparsePauliOp | None]:
 84    """
 85    Removes all idle qubits in a QuantumCircuit. If `op` is provided, also
 86    removes the same qubits from the operator.
 87
 88    Warning:
 89        Does not remove final measurements. So if this circuit has a final
 90        complete measurement, this method cannot do anything.
 91
 92    See also:
 93        https://quantumcomputing.stackexchange.com/questions/25672/remove-inactive-qubits-from-qiskit-circuit/37192#37192
 94    """
 95    index = dict(zip(qc.qubits, range(len(qc.qubits))))
 96    used: list[int] = []
 97    for gate in qc.data:
 98        used.extend([index[qubit] for qubit in gate.qubits])
 99    used = sorted(set(used))
100    qc = QuantumCircuit.from_instructions(qc.data)
101    if isinstance(op, SparsePauliOp):
102        op = SparsePauliOp.from_list(
103            [
104                ("".join(s[::-1][i] for i in used)[::-1], w)
105                for s, w in op.to_list()
106            ],
107            num_qubits=len(used),
108        )
109    return qc, op

Removes all idle qubits in a QuantumCircuit. If op is provided, also removes the same qubits from the operator.

Warning:

Does not remove final measurements. So if this circuit has a final complete measurement, this method cannot do anything.

See also:

https://quantumcomputing.stackexchange.com/questions/25672/remove-inactive-qubits-from-qiskit-circuit/37192#37192