Source code for tomocupy.rec_steps

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# *************************************************************************** #
#                  Copyright © 2022, UChicago Argonne, LLC                    #
#                           All Rights Reserved                               #
#                         Software Name: Tomocupy                             #
#                     By: Argonne National Laboratory                         #
#                                                                             #
#                           OPEN SOURCE LICENSE                               #
#                                                                             #
# Redistribution and use in source and binary forms, with or without          #
# modification, are permitted provided that the following conditions are met: #
#                                                                             #
# 1. Redistributions of source code must retain the above copyright notice,   #
#    this list of conditions and the following disclaimer.                    #
# 2. Redistributions in binary form must reproduce the above copyright        #
#    notice, this list of conditions and the following disclaimer in the      #
#    documentation and/or other materials provided with the distribution.     #
# 3. Neither the name of the copyright holder nor the names of its            #
#    contributors may be used to endorse or promote products derived          #
#    from this software without specific prior written permission.            #
#                                                                             #
#                                                                             #
# *************************************************************************** #
#                               DISCLAIMER                                    #
#                                                                             #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS         #
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT           #
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS           #
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT    #
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,      #
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED    #
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR      #
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF      #
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING        #
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS          #
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.                #
# *************************************************************************** #

from tomocupy import utils
from tomocupy import logging

from tomocupy.processing import proc_functions
from tomocupy.reconstruction import backproj_parallel
from tomocupy.reconstruction import backproj_lamfourier_parallel
from tomocupy.global_vars import args, params
import signal
import cupy as cp
import numpy as np

__author__ = "Viktor Nikitin"
__copyright__ = "Copyright (c) 2022, UChicago Argonne, LLC."
__docformat__ = 'restructuredtext en'
__all__ = ['GPURecSteps', ]

pinned_memory_pool = cp.cuda.PinnedMemoryPool()
cp.cuda.set_pinned_memory_allocator(pinned_memory_pool.malloc)

log = logging.getLogger(__name__)


[docs]class GPURecSteps(): """ Class for a stepwise tomographic reconstruction on GPU with pipeline data processing by sinogram and projection chunks. Steps include 1) pre-processing the whole data volume by splitting into sinograms, 2) pre-processing the whole data volume by splitting into proejections, 3) reconstructing the whole volume by splitting into sinograms and projections The implemented reconstruction methods are 1) Fourier-based method with exponential functions for interpoaltion in the frequency domain (implemented with CUDA C), 2) Direct discretization of the backprojection intergral """ def __init__(self, cl_reader, cl_writer): # Set ^C interrupt to abort and deallocate memory on GPU signal.signal(signal.SIGINT, utils.signal_handler) signal.signal(signal.SIGTERM, utils.signal_handler) # use pinned memory cp.cuda.set_pinned_memory_allocator(cp.cuda.PinnedMemoryPool().malloc) # chunks for processing self.shape_data_chunk_z = (params.nproj, params.ncz, params.ni) self.shape_dark_chunk_z = (params.ndark, params.ncz, params.ni) self.shape_flat_chunk_z = (params.nflat, params.ncz, params.ni) self.shape_data_chunk_zn = (params.nproj, params.ncz, params.n) self.shape_data_chunk_t = (params.ncproj, params.nz, params.ni) self.shape_data_chunk_tn = (params.ncproj, params.nz, params.n) self.shape_recon_chunk = (params.ncz, params.n, params.n) # full shapes self.shape_data_full = (params.nproj, params.nz, params.ni) self.shape_data_fulln = (params.nproj, params.nz, params.n) # init tomo functions self.cl_proc_func = proc_functions.ProcFunctions() # streams for overlapping data transfers with computations self.stream1 = cp.cuda.Stream(non_blocking=False) self.stream2 = cp.cuda.Stream(non_blocking=False) self.stream3 = cp.cuda.Stream(non_blocking=False) # threads for data writing to disk self.write_threads = [] for k in range(args.max_write_threads): self.write_threads.append(utils.WRThread()) self.cl_reader = cl_reader self.cl_writer = cl_writer # define reconstruction method if args.lamino_angle != 0 and args.reconstruction_algorithm == 'fourierrec' and args.reconstruction_type == 'full': # available only for full recon self.cl_backproj = backproj_lamfourier_parallel.BackprojLamFourierParallel( cl_writer) else: self.cl_backproj = backproj_parallel.BackprojParallel(cl_writer)
[docs] def recon_steps_all(self): """GPU reconstruction by loading a full dataset in memory and processing by steps, with reading the whole data to memory """ log.info('Reading data.') data, flat, dark = self.cl_reader.read_data_parallel() if args.pre_processing == 'True': log.info('Processing by chunks in z.') data = self.proc_sino_parallel(data, dark, flat) log.info('Processing by chunks in angles.') data = self.proc_proj_parallel(data) log.info('Filtered backprojection and writing by chunks.') self.cl_backproj.rec_fun(data)
[docs] def proc_sino_parallel(self, data, dark, flat): """Data processing by splitting into sinogram chunks""" # refs for faster access nzchunk = params.nzchunk lzchunk = params.lzchunk ncz = params.ncz # result res = np.zeros(data.shape, dtype=params.dtype) # pinned memory for data item item_pinned = {} item_pinned['data'] = utils.pinned_array( np.zeros([2, *self.shape_data_chunk_z], dtype=params.in_dtype)) item_pinned['dark'] = utils.pinned_array( np.zeros([2, *self.shape_dark_chunk_z], dtype=params.in_dtype)) item_pinned['flat'] = utils.pinned_array( np.ones([2, *self.shape_flat_chunk_z], dtype=params.in_dtype)) # gpu memory for data item item_gpu = {} item_gpu['data'] = cp.zeros( [2, *self.shape_data_chunk_z], dtype=params.in_dtype) item_gpu['dark'] = cp.zeros( [2, *self.shape_dark_chunk_z], dtype=params.in_dtype) item_gpu['flat'] = cp.ones( [2, *self.shape_flat_chunk_z], dtype=params.in_dtype) # pinned memory for res rec_pinned = utils.pinned_array( np.zeros([2, *self.shape_data_chunk_z], dtype=params.dtype)) # gpu memory for res rec_gpu = cp.zeros([2, *self.shape_data_chunk_z], dtype=params.dtype) # pipeline for data cpu-gpu copy and reconstruction for k in range(nzchunk+2): utils.printProgressBar( k, nzchunk+1, nzchunk-k+1, length=40) if (k > 0 and k < nzchunk+1): with self.stream2: # reconstruction self.cl_proc_func.proc_sino(item_gpu['data'][( k-1) % 2], item_gpu['dark'][(k-1) % 2], item_gpu['flat'][(k-1) % 2], rec_gpu[(k-1) % 2]) if (k > 1): with self.stream3: # gpu->cpu copy rec_gpu[(k-2) % 2].get(out=rec_pinned[(k-2) % 2]) if (k < nzchunk): # copy to pinned memory utils.copy(data[:, k*ncz:k*ncz+lzchunk[k]], item_pinned['data'][k % 2, :, :lzchunk[k]]) utils.copy(dark[:, k*ncz:k*ncz+lzchunk[k]], item_pinned['dark'][k % 2, :, :lzchunk[k]]) utils.copy(flat[:, k*ncz:k*ncz+lzchunk[k]], item_pinned['flat'][k % 2, :, :lzchunk[k]]) with self.stream1: # cpu->gpu copy item_gpu['data'][k % 2].set(item_pinned['data'][k % 2]) item_gpu['dark'][k % 2].set(item_pinned['dark'][k % 2]) item_gpu['flat'][k % 2].set(item_pinned['flat'][k % 2]) self.stream3.synchronize() if (k > 1): # copy to result utils.copy(rec_pinned[(k-2) % 2, :, :lzchunk[k-2]], res[:, (k-2)*ncz:(k-2)*ncz+lzchunk[k-2]]) self.stream1.synchronize() self.stream2.synchronize() return res
[docs] def proc_proj_parallel(self, data): """Data processing by splitting into projection chunks""" # refs for faster access ntchunk = params.ntchunk ltchunk = params.ltchunk ncproj = params.ncproj if args.file_type != 'double_fov': res = data else: res = np.zeros([*self.shape_data_fulln], dtype=params.dtype) # pinned memory for data item data_pinned = utils.pinned_array( np.zeros([2, *self.shape_data_chunk_t], dtype=params.dtype)) # gpu memory for data item data_gpu = cp.zeros([2, *self.shape_data_chunk_t], dtype=params.dtype) # pinned memory for processed data rec_pinned = utils.pinned_array( np.zeros([2, *self.shape_data_chunk_tn], dtype=params.dtype)) # gpu memory for processed data rec_gpu = cp.zeros([2, *self.shape_data_chunk_tn], dtype=params.dtype) # pipeline for data cpu-gpu copy and reconstruction for k in range(ntchunk+2): utils.printProgressBar(k, ntchunk+1, ntchunk-k+1, length=40) if (k > 0 and k < ntchunk+1): with self.stream2: # reconstruction self.cl_proc_func.proc_proj( data_gpu[(k-1) % 2], 0, self.shape_data_chunk_t[1], res=rec_gpu[(k-1) % 2]) if (k > 1): with self.stream3: # gpu->cpu copy rec_gpu[(k-2) % 2].get(out=rec_pinned[(k-2) % 2]) if (k < ntchunk): # copy to pinned memory utils.copy(data[ncproj*k:ncproj*k+ltchunk[k]], data_pinned[k % 2, :ltchunk[k]]) with self.stream1: # cpu->gpu copy data_gpu[k % 2].set(data_pinned[k % 2]) self.stream3.synchronize() if (k > 1): utils.copy(rec_pinned[(k-2) % 2, :ltchunk[k-2]], res[(k-2)*ncproj:(k-2)*ncproj+ltchunk[k-2]]) self.stream1.synchronize() self.stream2.synchronize() return res