Source code for netket.operator._local_liouvillian

# Copyright 2021 The NetKet Authors - All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

import numpy as np
import numba
from numba import jit
from numba.typed import List

from scipy.sparse.linalg import LinearOperator

import netket.jax as nkjax
from netket.jax import canonicalize_dtypes
from netket.utils.optional_deps import import_optional_dependency
from netket.utils.types import DType

from ._discrete_operator import DiscreteOperator
from ._local_operator import LocalOperator
from ._abstract_super_operator import AbstractSuperOperator


[docs] class LocalLiouvillian(AbstractSuperOperator): """ LocalLiouvillian super-operator, acting on the DoubledHilbert (tensor product) space ℋ⊗ℋ. Internally it uses :ref:`netket.operator.LocalOperator` everywhere. The Liouvillian is defined according to the definition: .. math :: \\mathcal{L} = -i \\left[ \\hat{H}, \\hat{\\rho}\\right] + \\sum_i \\left[ \\hat{L}_i\\hat{\\rho}\\hat{L}_i^\\dagger - \\left\\{ \\hat{L}_i^\\dagger\\hat{L}_i, \\hat{\\rho} \\right\\} \\right] which generates the dynamics according to the equation .. math :: \\frac{d\\hat{\\rho}}{dt} = \\mathcal{L}\\hat{\\rho} Internally, it stores the non-hermitian hamiltonian .. math :: \\hat{H}_{nh} = \\hat{H} - \\sum_i \\frac{i}{2}\\hat{L}_i^\\dagger\\hat{L}_i That is then composed with the jump operators in the inner kernel with the formula: .. math :: \\mathcal{L} = -i \\hat{H}_{nh}\\hat{\\rho} +i\\hat{\\rho}\\hat{H}_{nh}^\\dagger + \\sum_i \\hat{L}_i\\hat{\\rho}\\hat{L}_i^\\dagger """ def __init__( self, ham: DiscreteOperator, jump_ops: list[DiscreteOperator] = [], dtype: Optional[DType] = None, ): super().__init__(ham.hilbert) dtype = canonicalize_dtypes(complex, ham, *jump_ops, dtype=dtype) if not nkjax.is_complex_dtype(dtype): raise TypeError(f"A complex dtype is required (dtype={dtype} specified).") self._H = ham self._jump_ops = [op.copy(dtype=dtype) for op in jump_ops] # to accept dicts self._Hnh = ham self._max_dissipator_conn_size = 0 self._max_conn_size = 0 self._dtype = dtype self._compute_hnh() @property def dtype(self): return self._dtype @property def is_hermitian(self): return False @property def hamiltonian(self) -> LocalOperator: """The hamiltonian of this Liouvillian""" return self._H @property def hamiltonian_nh(self) -> LocalOperator: """The non hermitian Local Operator part of the Liouvillian""" return self._Hnh @property def jump_operators(self) -> list[LocalOperator]: """The list of local operators in this Liouvillian""" return self._jump_ops @property def max_conn_size(self) -> int: """The maximum number of non zero ⟨x|O|x'⟩ for every x.""" return self._max_conn_size def _compute_hnh(self): # There is no i here because it's inserted in the kernel Hnh = np.asarray(1.0, dtype=self.dtype) * self.hamiltonian self._max_dissipator_conn_size = 0 for L in self._jump_ops: Hnh = ( Hnh - np.asarray(0.5j, dtype=self.dtype) * L.conjugate().transpose() @ L ) self._max_dissipator_conn_size += L.max_conn_size**2 self._Hnh = Hnh.collect().copy(dtype=self.dtype) max_conn_size = self._max_dissipator_conn_size + 2 * Hnh.max_conn_size self._max_conn_size = max_conn_size self._xprime = np.empty((max_conn_size, self.hilbert.size)) self._xr_prime = np.empty((max_conn_size, self.hilbert.physical.size)) self._xc_prime = np.empty((max_conn_size, self.hilbert.physical.size)) self._xrv = self._xprime[:, 0 : self.hilbert.physical.size] self._xcv = self._xprime[ :, self.hilbert.physical.size : 2 * self.hilbert.physical.size ] self._mels = np.empty(max_conn_size, dtype=self.dtype) self._xprime_f = np.empty((max_conn_size, self.hilbert.size)) self._mels_f = np.empty(max_conn_size, dtype=self.dtype)
[docs] def add_jump_operator(self, op): self._jump_ops.append(op) self._compute_hnh()
[docs] def add_jump_operators(self, ops): for op in ops: self._jump_ops.append(op) self._compute_hnh()
[docs] def get_conn(self, x): n_sites = x.shape[0] // 2 xr, xc = x[0:n_sites], x[n_sites : 2 * n_sites] i = 0 xrp, mel_r = self._Hnh.get_conn(xr) self._xrv[i : i + len(mel_r), :] = xrp self._xcv[i : i + len(mel_r), :] = xc self._mels[i : i + len(mel_r)] = -1j * mel_r i = i + len(mel_r) xcp, mel_c = self._Hnh.get_conn(xc) self._xrv[i : i + len(mel_c), :] = xr self._xcv[i : i + len(mel_c), :] = xcp self._mels[i : i + len(mel_r)] = 1j * np.conj(mel_c) i = i + len(mel_c) for L in self._jump_ops: L_xrp, L_mel_r = L.get_conn(xr) L_xcp, L_mel_c = L.get_conn(xc) nr = len(L_mel_r) nc = len(L_mel_c) # start filling batches for r in range(nr): self._xrv[i : i + nc, :] = L_xrp[r, :] self._xcv[i : i + nc, :] = L_xcp self._mels[i : i + nc] = L_mel_r[r] * np.conj(L_mel_c) i = i + nc return np.copy(self._xprime[0:i, :]), np.copy(self._mels[0:i])
# pad option pads all sections to have the same (biggest) size. # to avoid using the biggest possible size, we dynamically check what is # the biggest size for the current size... # TODO: investigate if this is worthless
[docs] def get_conn_flattened(self, x, sections, pad=False): batch_size = x.shape[0] N = x.shape[1] // 2 n_jops = len(self.jump_operators) assert sections.shape[0] == batch_size # Separate row and column inputs xr, xc = x[:, 0:N], x[:, N : 2 * N] # Compute all flattened connections of each term sections_r = np.empty(batch_size, dtype=np.int64) sections_c = np.empty(batch_size, dtype=np.int64) xr_prime, mels_r = self._Hnh.get_conn_flattened(xr, sections_r) xc_prime, mels_c = self._Hnh.get_conn_flattened(xc, sections_c) if pad: # if else to accommodate for batches of 1 element, because # sections don't start from 0-index... # TODO: if we ever switch to 0-indexing, remove this. if batch_size > 1: max_conns_r = np.max(np.diff(sections_r)) max_conns_c = np.max(np.diff(sections_c)) else: max_conns_r = sections_r[0] max_conns_c = sections_c[0] max_conns_Lrc = 0 #  Must type those lists otherwise, if they are empty, numba # cannot infer their type L_xrps = List.empty_list(numba.typeof(x.dtype)[:, :]) L_xcps = List.empty_list(numba.typeof(x.dtype)[:, :]) L_mel_rs = List.empty_list(numba.typeof(self.dtype)[:]) L_mel_cs = List.empty_list(numba.typeof(self.dtype)[:]) sections_Lr = np.empty(batch_size * n_jops, dtype=np.int32) sections_Lc = np.empty(batch_size * n_jops, dtype=np.int32) for i, L in enumerate(self._jump_ops): L_xrp, L_mel_r = L.get_conn_flattened( xr, sections_Lr[i * batch_size : (i + 1) * batch_size] ) L_xcp, L_mel_c = L.get_conn_flattened( xc, sections_Lc[i * batch_size : (i + 1) * batch_size] ) L_xrps.append(L_xrp) L_xcps.append(L_xcp) L_mel_rs.append(L_mel_r) L_mel_cs.append(L_mel_c) if pad: if batch_size > 1: max_lr = np.max( np.diff(sections_Lr[i * batch_size : (i + 1) * batch_size]) ) max_lc = np.max( np.diff(sections_Lc[i * batch_size : (i + 1) * batch_size]) ) else: max_lr = sections_Lr[i * batch_size] max_lc = sections_Lc[i * batch_size] max_conns_Lrc += max_lr * max_lc # compose everything again if self._xprime_f.dtype != x.dtype: self._xprime_f = np.empty(self._xprime_f.shape, dtype=x.dtype) if self._xprime_f.shape[0] < self._max_conn_size * batch_size: # refcheck=False because otherwise this errors when testing self._xprime_f.resize( self._max_conn_size * batch_size, self.hilbert.size, refcheck=False ) self._mels_f.resize(self._max_conn_size * batch_size, refcheck=False) if pad: pad = max_conns_Lrc + max_conns_r + max_conns_c else: pad = 0 self._xprime_f[:] = 0 self._mels_f[:] = 0 return self._get_conn_flattened_kernel( self._xprime_f, self._mels_f, sections, np.asarray(xr), np.asarray(xc), sections_r, sections_c, xr_prime, mels_r, xc_prime, mels_c, L_xrps, L_xcps, L_mel_rs, L_mel_cs, sections_Lr, sections_Lc, n_jops, batch_size, N, pad, )
@staticmethod @jit(nopython=True) def _get_conn_flattened_kernel( xs, mels, sections, xr, xc, sections_r, sections_c, xr_prime, mels_r, xc_prime, mels_c, L_xrps, L_xcps, L_mel_rs, L_mel_cs, sections_Lr, sections_Lc, n_jops, batch_size, N, pad, ): off = 0 n_hr_i = 0 n_hc_i = 0 n_Lr_is = np.zeros(n_jops, dtype=np.int32) n_Lc_is = np.zeros(n_jops, dtype=np.int32) for i in range(batch_size): off_i = off n_hr_f = sections_r[i] n_hr = n_hr_f - n_hr_i xs[off : off + n_hr, 0:N] = xr_prime[n_hr_i:n_hr_f, :] xs[off : off + n_hr, N : 2 * N] = xc[i, :] mels[off : off + n_hr] = -1j * mels_r[n_hr_i:n_hr_f] off += n_hr n_hr_i = n_hr_f n_hc_f = sections_c[i] n_hc = n_hc_f - n_hc_i xs[off : off + n_hc, N : 2 * N] = xc_prime[n_hc_i:n_hc_f, :] xs[off : off + n_hc, 0:N] = xr[i, :] mels[off : off + n_hc] = 1j * np.conj(mels_c[n_hc_i:n_hc_f]) off += n_hc n_hc_i = n_hc_f for j in range(n_jops): L_xrp, L_mel_r = L_xrps[j], L_mel_rs[j] L_xcp, L_mel_c = L_xcps[j], L_mel_cs[j] n_Lr_f = sections_Lr[j * batch_size + i] n_Lc_f = sections_Lc[j * batch_size + i] n_Lr_i = n_Lr_is[j] n_Lc_i = n_Lc_is[j] n_Lr = n_Lr_f - n_Lr_is[j] n_Lc = n_Lc_f - n_Lc_is[j] # start filling batches for r in range(n_Lr): xs[off : off + n_Lc, 0:N] = L_xrp[n_Lr_i + r, :] xs[off : off + n_Lc, N : 2 * N] = L_xcp[n_Lc_i:n_Lc_f, :] mels[off : off + n_Lc] = L_mel_r[n_Lr_i + r] * np.conj( L_mel_c[n_Lc_i:n_Lc_f] ) off = off + n_Lc n_Lr_is[j] = n_Lr_f n_Lc_is[j] = n_Lc_f if pad != 0: n_entries = off - off_i mels[off : off + n_entries] = 0 off = (i + 1) * pad sections[i] = off return np.copy(xs[0:off, :]), np.copy(mels[0:off])
[docs] def to_linear_operator( self, *, sparse: bool = True, append_trace: bool = False ) -> LinearOperator: r"""Returns a lazy scipy linear_operator representation of the Lindblad Super-Operator. The returned operator behaves like the M**2 x M**2 matrix obtained with to_dense/sparse, and accepts vectorised density matrices as input. Args: sparse: If True internally uses sparse matrices for the hamiltonian and jump operators, dense otherwise (default=True) append_trace: If True (default=False) the resulting operator has size M**2 + 1, and the last element of the input vector is the trace of the input density matrix. This is useful when implementing iterative methods. Returns: A linear operator taking as input vectorised density matrices and returning the product L*rho """ M = self.hilbert.physical.n_states iHnh = -1j * self.hamiltonian_nh if sparse: iHnh = iHnh.to_sparse() J_ops = [j.to_sparse() for j in self.jump_operators] J_ops_c = [ j.conjugate().transpose().to_sparse() for j in self.jump_operators ] else: iHnh = iHnh.to_dense() J_ops = [j.to_dense() for j in self.jump_operators] J_ops_c = [ j.conjugate().transpose().to_dense() for j in self.jump_operators ] if not append_trace: op_size = M**2 def matvec(rho_vec): rho = rho_vec.reshape((M, M)) drho = np.zeros((M, M), dtype=rho.dtype) drho += iHnh @ rho + rho @ iHnh.conj().T for J, J_c in zip(J_ops, J_ops_c): drho += (J @ rho) @ J_c return drho.reshape(-1) else: # This function defines the product Liouvillian x density matrix, without # constructing the full density matrix (passed as a vector M^2). # An extra row is added at the bottom of the therefore M^2+1 long array, # with the trace of the density matrix. This is needed to enforce the # trace-1 condition. # The logic behind the use of Hnh_dag_ and Hnh_ is derived from the # convention adopted in local_liouvillian.cc, and inspired from reference # arXiv:1504.05266 op_size = M**2 + 1 def matvec(rho_vec): rho = rho_vec[:-1].reshape((M, M)) out = np.zeros((M**2 + 1), dtype=rho.dtype) drho = out[:-1].reshape((M, M)) drho += iHnh @ rho + rho @ iHnh.conj().T for J, J_c in zip(J_ops, J_ops_c): drho += (J @ rho) @ J_c out[-1] = rho.trace() return out L = LinearOperator((op_size, op_size), matvec=matvec, dtype=iHnh.dtype) return L
[docs] def to_qobj(self): # -> "qutip.liouvillian" r"""Convert the operator to a qutip's liouvillian Qobj. Returns: A :class:`qutip.liouvillian` object. """ qutip = import_optional_dependency("qutip", descr="to_qobj") return qutip.liouvillian( self.hamiltonian.to_qobj(), [op.to_qobj() for op in self.jump_operators] )