Source code for tomocupy.rec

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# *************************************************************************** #
#                  Copyright © 2022, UChicago Argonne, LLC                    #
#                           All Rights Reserved                               #
#                         Software Name: Tomocupy                             #
#                     By: Argonne National Laboratory                         #
#                                                                             #
#                           OPEN SOURCE LICENSE                               #
#                                                                             #
# Redistribution and use in source and binary forms, with or without          #
# modification, are permitted provided that the following conditions are met: #
#                                                                             #
# 1. Redistributions of source code must retain the above copyright notice,   #
#    this list of conditions and the following disclaimer.                    #
# 2. Redistributions in binary form must reproduce the above copyright        #
#    notice, this list of conditions and the following disclaimer in the      #
#    documentation and/or other materials provided with the distribution.     #
# 3. Neither the name of the copyright holder nor the names of its            #
#    contributors may be used to endorse or promote products derived          #
#    from this software without specific prior written permission.            #
#                                                                             #
#                                                                             #
# *************************************************************************** #
#                               DISCLAIMER                                    #
#                                                                             #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS         #
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT           #
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS           #
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT    #
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,      #
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED    #
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR      #
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF      #
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING        #
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS          #
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.                #
# *************************************************************************** #

from tomocupy import utils
from tomocupy import logging
from tomocupy.processing import proc_functions
from tomocupy.reconstruction import backproj_functions
from tomocupy.global_vars import args, params

from threading import Thread
from queue import Queue
import cupy as cp
import numpy as np
import signal

__author__ = "Viktor Nikitin"
__copyright__ = "Copyright (c) 2022, UChicago Argonne, LLC."
__docformat__ = 'restructuredtext en'
__all__ = ['GPURec', ]


log = logging.getLogger(__name__)


[docs]class GPURec(): ''' Class for tomographic reconstruction on GPU with conveyor data processing by sinogram chunks (in z direction). Data reading/writing are done in separate threads, CUDA Streams are used to overlap CPU-GPU data transfers with computations. The implemented reconstruction method is Fourier-based with exponential functions for interpoaltion in the frequency domain (implemented with CUDA C). ''' def __init__(self, cl_reader, cl_writer): # Set ^C, ^Z interrupt to abort and deallocate memory on GPU signal.signal(signal.SIGINT, utils.signal_handler) signal.signal(signal.SIGTERM, utils.signal_handler) # # use pinned memory cp.cuda.set_pinned_memory_allocator(cp.cuda.PinnedMemoryPool().malloc) # chunks for processing self.shape_data_chunk = (params.nproj, params.ncz, params.ni) self.shape_recon_chunk = (params.ncz, params.n, params.n) self.shape_dark_chunk = (params.ndark, params.ncz, params.ni) self.shape_flat_chunk = (params.nflat, params.ncz, params.ni) # init tomo functions self.cl_proc_func = proc_functions.ProcFunctions() self.cl_backproj_func = backproj_functions.BackprojFunctions() # streams for overlapping data transfers with computations self.stream1 = cp.cuda.Stream(non_blocking=False) self.stream2 = cp.cuda.Stream(non_blocking=False) self.stream3 = cp.cuda.Stream(non_blocking=False) # threads for data reading from disk self.read_threads = [] for k in range(args.max_read_threads): self.read_threads.append(utils.WRThread()) # threads for data writing to disk self.write_threads = [] for k in range(args.max_write_threads): self.write_threads.append(utils.WRThread()) self.data_queue = Queue(32) # thread for reading data to a queue self.main_read_thread = Thread( target=cl_reader.read_data_to_queue, args=(self.data_queue, self.read_threads)) # additional refs self.cl_reader = cl_reader self.cl_writer = cl_writer
[docs] def recon_all(self): """Reconstruction of data from an h5file by splitting into sinogram chunks""" # start readint to the queue self.main_read_thread.start() # refs for faster access dtype = params.dtype in_dtype = params.in_dtype nzchunk = params.nzchunk lzchunk = params.lzchunk ncz = params.ncz # pinned memory for data item item_pinned = {} item_pinned['data'] = utils.pinned_array( np.zeros([2, *self.shape_data_chunk], dtype=in_dtype)) item_pinned['dark'] = utils.pinned_array( np.zeros([2, *self.shape_dark_chunk], dtype=in_dtype)) item_pinned['flat'] = utils.pinned_array( np.ones([2, *self.shape_flat_chunk], dtype=in_dtype)) # gpu memory for data item item_gpu = {} item_gpu['data'] = cp.zeros( [2, *self.shape_data_chunk], dtype=in_dtype) item_gpu['dark'] = cp.zeros( [2, *self.shape_dark_chunk], dtype=in_dtype) item_gpu['flat'] = cp.ones( [2, *self.shape_flat_chunk], dtype=in_dtype) # pinned memory for reconstrution rec_pinned = utils.pinned_array( np.zeros([args.max_write_threads, *self.shape_recon_chunk], dtype=dtype)) # gpu memory for reconstrution rec_gpu = cp.zeros([2, *self.shape_recon_chunk], dtype=dtype) # chunk ids with parallel read ids = [] st = end = [] log.info('Full reconstruction') # Conveyor for data cpu-gpu copy and reconstruction for k in range(nzchunk+2): utils.printProgressBar( k, nzchunk+1, self.data_queue.qsize(), length=40) if (k > 0 and k < nzchunk+1): with self.stream2: # reconstruction data = item_gpu['data'][(k-1) % 2] dark = item_gpu['dark'][(k-1) % 2] flat = item_gpu['flat'][(k-1) % 2] rec = rec_gpu[(k-1) % 2] st = ids[k-1]*ncz+args.start_row//2**args.binning end = st+lzchunk[ids[k-1]] data = self.cl_proc_func.proc_sino(data, dark, flat) data = self.cl_proc_func.proc_proj(data, st, end) data = cp.ascontiguousarray(data.swapaxes(0, 1)) sht = cp.tile(np.float32(0), data.shape[0]) data = self.cl_backproj_func.fbp_filter_center(data, sht) self.cl_backproj_func.cl_rec.backprojection( rec, data, self.stream2) if (k > 1): with self.stream3: # gpu->cpu copy # find free thread ithread = utils.find_free_thread(self.write_threads) rec_gpu[(k-2) % 2].get(out=rec_pinned[ithread]) if (k < nzchunk): # copy to pinned memory item = self.data_queue.get() ids.append(item['id']) item_pinned['data'][k % 2, :, :lzchunk[ids[k]]] = item['data'] item_pinned['dark'][k % 2, :, :lzchunk[ids[k]]] = item['dark'] item_pinned['flat'][k % 2, :, :lzchunk[ids[k]]] = item['flat'] with self.stream1: # cpu->gpu copy item_gpu['data'][k % 2].set(item_pinned['data'][k % 2]) item_gpu['dark'][k % 2].set(item_pinned['dark'][k % 2]) item_gpu['flat'][k % 2].set(item_pinned['flat'][k % 2]) self.stream3.synchronize() if (k > 1): # add a new thread for writing to hard disk (after gpu->cpu copy is done) st = ids[k-2]*ncz+args.start_row//2**args.binning end = st+lzchunk[ids[k-2]] self.write_threads[ithread].run( self.cl_writer.write_data_chunk, (rec_pinned[ithread], st, end, ids[k-2])) self.stream1.synchronize() self.stream2.synchronize() for t in self.write_threads: t.join()
[docs] def recon_try(self): """GPU reconstruction of 1 slice for different centers""" for id_slice in params.id_slices: log.info(f'Processing slice {id_slice}') self.cl_reader.read_data_try(self.data_queue, id_slice) # read slice item = self.data_queue.get() # copy to gpu data = cp.array(item['data']) dark = cp.array(item['dark']) flat = cp.array(item['flat']) # preprocessing data = self.cl_proc_func.proc_sino(data, dark, flat) data = self.cl_proc_func.proc_proj(data) data = cp.ascontiguousarray(data.swapaxes(0, 1)) # refs for faster access dtype = params.dtype nschunk = params.nschunk lschunk = params.lschunk ncz = params.ncz # pinned memory for reconstrution rec_pinned = utils.pinned_array( np.zeros([args.max_write_threads, *self.shape_recon_chunk], dtype=dtype)) # gpu memory for reconstrution rec_gpu = cp.zeros([2, *self.shape_recon_chunk], dtype=dtype) # Conveyor for data cpu-gpu copy and reconstruction for k in range(nschunk+2): utils.printProgressBar( k, nschunk+1, self.data_queue.qsize(), length=40) if (k > 0 and k < nschunk+1): with self.stream2: # reconstruction sht = cp.pad(cp.array(params.shift_array[( k-1)*ncz:(k-1)*ncz+lschunk[k-1]]), [0, ncz-lschunk[k-1]]) datat = cp.tile(data, [ncz, 1, 1]) datat = self.cl_backproj_func.fbp_filter_center( datat, sht) self.cl_backproj_func.cl_rec.backprojection( rec_gpu[(k-1) % 2], datat, self.stream2) if (k > 1): with self.stream3: # gpu->cpu copy # find free thread ithread = utils.find_free_thread(self.write_threads) rec_gpu[(k-2) % 2].get(out=rec_pinned[ithread]) self.stream3.synchronize() if (k > 1): # add a new thread for writing to hard disk (after gpu->cpu copy is done) for kk in range(lschunk[k-2]): self.write_threads[ithread].run(self.cl_writer.write_data_try, ( rec_pinned[ithread, kk], params.save_centers[(k-2)*ncz+kk], id_slice)) self.stream1.synchronize() self.stream2.synchronize() for t in self.write_threads: t.join()