Process#

Processing components perform transformations of data in vortex, such as computing the FFT. Each component provides a similar API.

Overview#

A processing component is first initialized with a call to initialize(), supplying a matching configuration object. Initialization brings the component into a state where processing can begin and frequently involves the allocation of memory buffers. An unlimited amount of data can then be transformed through the processor. Processors allow changes to their configuration using the change() method which accepts the new configuration. Not all aspects of the configuration can be changed, however. Take care when changes of configuration require reallocation of internal buffers.

Data is transformed synchronously with next() or asynchronously with next_async(). Both methods accept an id argument, which is provided only for user bookkeeping and logging. In the documentation below, the input is referred to as \(x[i,j,k]\) (records) and the output is referred to as \(y[i,j,k]\) (A-scans). The indices \(i\), \(j\), and \(k\) correspond to the record/A-scan, sample, and channel dimensions, respectively. All buffers must have a single channel per sample (i.e., \(k \in \{ 1 \}\)).

Components#

Null#

class vortex.process.NullProcessor#

Perform no processing.

This class is provided as an engine placeholder for testing or mocking. No operation is performed on the input or output buffers.

property config: NullProcessorConfig#

Copy of the active configuration.

class vortex.process.NullProcessorConfig#

Configuration object for NullProcessor.

property input_shape: List[int[3]]#

Required shape of input buffers. Returns list of [ records_per_block, samples_per_record, 1 ]. Read-only.

property samples_per_record: int#

Number of samples per record.

property records_per_block: int#

Number of records in each input buffer or block. Identical to ascans_per_block.

property output_shape: List[int[3]]#

Required shape of output buffers. Returns list of [ ascans_per_block, samples_per_ascan, 1]. Read-only.

property samples_per_ascan: int#

Number of samples per A-scan. Returns either samples_per_record or the length of resampling_samples, if the latter is available. Read-only.

property ascans_per_block: int#

Number of A-scans in each acquired buffer or block. Identical to records_per_block.

validate()#

Check the configuration for errors.

Raises

RuntimeError – If the configuration is invalid.

copy()#

Create a copy of this configuration.

Returns

NullProcessorConfig – The copy.

Copy#

class vortex.process.CopyProcessor#

Copy data from input to output buffers with optional slicing (\(s[n]\)) and linear transformation (\(a\) and \(b\)).

\[y[i,j,k] = a + b x[i,s[j],k]\]

OCT processing is not performed. This processor is intended primarily for acquisitions that provide processed OCT data, such as AlazarFFTAcquisition. Computation is performed on the CPU.

__init__(logger=None)#

Create a new object with optional logging.

Parameters

logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.

initialize(config)#

Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.

Parameters

config (CopyProcessorConfig) – New configuration to apply.

change(config)#

Change the processor configuration. All configuration options may be changed, but changes to CopyProcessorConfig.slots will not have an effect.

Danger

It is not safe to call this method while a block is currently processing.

Parameters

config (CopyProcessorConfig) – New configuration to apply.

next(input_buffer, output_buffer, id=0, append_history=True)#

Process the next buffer.

Parameters
next_async(input_buffer, output_buffer, callback, id=0, append_history=True)#

Process the next buffer asynchronously and execute the callback when complete.

Caution

The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.

Parameters
  • input_buffer (numpy.ndarray[numpy.uint16]) – The input buffer, with shape that matches self.config.input_shape.

  • output_buffer (numpy.ndarray[numpy.int8]) – The output buffer, with shape that matches self.config.output_shape.

  • callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.

  • id (int) – Number to associate with the buffer for logging purposes.

property config: CopyProcessorConfig#

Copy of the active configuration.

class vortex.process.CopyProcessorConfig#

Base: NullProcessorConfig

Configuration object for CopyProcessor.

property sample_slice: [NullSlice | SimpleSlice]#

Optional slicing operation \(s[n]\) to apply along each record/A-scan. Defaults to NullSlice().

property sample_transform: [NullTransform | LinearTransform]#

Optional linear transformation \(a\) and \(b\) applied to each sample during copy. Defaults to NullTransform().

property slots: int#

Number of parallel processing pipelines. Adjust to achieve the desired CPU utilization for machines with high hardware concurrency. Recommended minimum is 2 slots to facilitate pipelining of successive blocks. The copy itself parallelized across all CPU cores; this field only affects pipeline-level parallelism. Defaults to 2.

copy()#

Create a copy of this configuration.

Returns

CopyProcessorConfig – The copy.

OCT#

These OCT processors perform averaging, resampling, filtering, and FFT operations to transform raw spectra into A-scans. These operations are applied in the order shown below.

  1. Rolling average with window length \(M\).

    \[\hat{x}[i,j,k] = x[i,j,k] - \frac{1}{M} \sum_{l=0}^{M-1}{ x[i - l,j,k] } ,\]
  2. Resampling with linear interpolation (\(r[j]\)).

    \[z[i,j,k] = \big(\lceil r[j] \rceil - r[j] \big) \hat{x}\big[i, \lfloor r[j] \rfloor, k \big] + \big(r[j] - \lfloor r[j] \rfloor\big) \hat{x}\big[i, \lceil r[j] \rceil, k \big] ,\]
  3. Frequency-domain filtering (\(h[i,j,k]\)) and inverse FFT with normalization.

    \[y[i,j,k] = \log_{10} \left| \frac{1}{N} \mathcal{F}^{-1} \big\{ h[:,j,:] z[i,j,k] \big\} \right|^2\]

All computation is performed in floating-point until the cast to the output datatype. Each operation can be enabled, disabled, and/or customized via the processor configuration options.

CPU#

class vortex.process.CPUProcessor#

Perform OCT processing with averaging, resampling by linear interpolation, spectral filtering, and FFT on the CPU.

__init__(logger=None)#

Create a new object with optional logging.

Parameters

logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.

initialize(config)#

Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.

Parameters

config (CPUProcessorConfig) – New configuration to apply.

change(config)#

Change the processor configuration. If the change requires buffer reallocation, the pipeline is stalled and record history for the rolling average may be lost. May be called while a block is currently processing.

Parameters

config (CPUProcessorConfig) – New configuration to apply.

next(input_buffer, output_buffer, id=0, append_history=True)#

Process the next buffer.

Parameters
next_async(input_buffer, output_buffer, callback, id=0, append_history=True)#

Process the next buffer asynchronously and execute the callback when complete.

Caution

The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.

Parameters
  • input_buffer (numpy.ndarray[numpy.uint16]) – The input buffer, with shape that matches self.config.input_shape.

  • output_buffer (numpy.ndarray[numpy.int8]) – The output buffer, with shape that matches self.config.output_shape.

  • callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.

  • id (int) – Number to associate with the buffer for logging purposes.

  • append_history (bool) – Include the raw spectra from this input buffer in the rolling average.

property config: CPUProcessorConfig#

Copy of the active configuration.

class vortex.process.CPUProcessorConfig#

Base: NullProcessorConfig

Configuration object for CPUProcessor.

property average_window: int#

Length \(M\) of rolling average window in records/A-scans for background subtraction. Disable background subtraction by setting to 0. Disabled by default.

Note

Record/A-scan history includes both active and inactive records.

Warning

Record/A-scan history is stored internally within the processor to allow average_window to exceed records_per_block. Create a new processor to discard this history.

property resampling_samples: numpy.ndarray[numpy.float32]#

Optional positions at which to resample the raw spectra prior FFT. The number of resampled positions determines samples_per_record. Valid positions are in the range [0, samples_per_record]. Set to an empty array ([]) to disable resampling. Disabled by default.

property spectral_filter: numpy.ndarray[numpy.float32 | numpy.complex64]#

Optional spectral filter to multiply with the spectra after resampling but before the FFT. Must have shape compatible with [ samples_per_ascan ]; that is, the spectral filter should match the length of the output A-scan, not the input record. Set to an empty array ([]) to disable spectral filtering. Disabled by default.

property enable_ifft: bool#

Enable the FFT for OCT processing with length \(N\) determined by samples_per_ascan. When enabled, the FFT is multiplied by the normalization factor of \(1 / N\). When disabled, the squared complex magnitude is performed instead and no normalization factor is applied. Enabled by default.

Note

If FFT normalization is not desired, scale the spectral filter by \(N\) to cancel out the normalization factor.

property enable_log10#

Enable the application of \(log_{10}(...)\) after the FFT. Enabled by default.

property channel: int#

Index of channel to select for processing. Used only by engine.

Note

This property is likely to be moved from the processor configuration to the engine configuration in the future.

property slots: int#

Number of parallel processing pipelines. Adjust to achieve the desired CPU utilization for machines with high hardware concurrency. Recommended minimum is 2-4 slots to facilitate pipelining of successive blocks but higher numbers will likely yield better throughput. Each processing step is individually parallelized across all CPU cores; this field only affects pipeline-level parallelism. Defaults to 2.

copy()#

Create a copy of this configuration.

Returns

CPUProcessorConfig – The copy.

GPU#

class vortex.process.CUDAProcessor#

Perform OCT processing with averaging, resampling by linear interpolation, spectral filtering, and FFT on a CUDA-capable GPU.

__init__(logger=None)#

Create a new object with optional logging.

Parameters

logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.

initialize(config)#

Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.

Parameters

config (CUDAProcessorConfig) – New configuration to apply.

change(config)#

Change the processor configuration. May be called while a block is currently processing. If the change requires buffer reallocation, the pipeline is stalled and record history for the rolling average may be lost. Changes that require no buffer reallocation and take effect immediately are

  • disabling any processing step (e.g., resampling or FFT),

  • reducing the window of the rolling average, and

  • altering a non-empty spectral filter or resampling vector without increasing its length.

All other changes will likely require a buffer reallocation.

Parameters

config (CUDAProcessorConfig) – New configuration to apply.

next(input_buffer, output_buffer, id=0, append_history=True)#

Process the next buffer.

Parameters
  • input_buffer (cupy.ndarray[cupy.uint16]) – The input buffer, with shape that matches self.config.input_shape.

  • output_buffer (cupy.ndarray[cupy.int8]) – The output buffer, with shape that matches self.config.output_shape.

  • id (int) – Number to associate with the buffer for logging purposes.

  • append_history (bool) – Include the raw spectra from this input buffer in the rolling average.

next_async(input_buffer, output_buffer, callback, id=0, append_history=True)#

Process the next buffer asynchronously and execute the callback when complete.

Caution

The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.

Parameters
  • input_buffer (cupy.ndarray[cupy.uint16]) – The input buffer, with shape that matches self.config.input_shape.

  • output_buffer (cupy.ndarray[cupy.int8]) – The output buffer, with shape that matches self.config.output_shape.

  • callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.

  • id (int) – Number to associate with the buffer for logging purposes.

  • append_history (bool) – Include the raw spectra from this input buffer in the rolling average.

property config: CUDAProcessorConfig#

Copy of the current configuration.

class vortex.process.CUDAProcessorConfig#

Base: CPUProcessorConfig

Configuration object for CUDAProcessor.

property slots: int#

Number of parallel CUDA streams to use for processing. Recommended minimum is 2 slots to facilitate pipelining of successive blocks. For GPUs with sufficient compute resources, increasing the number of slots could enable parallel computation. Defaults to 2.

property device: int#

Index of CUDA device to use for processing. Defaults to index 0.

copy()#

Create a copy of this configuration.

Returns

CUDAProcessorConfig – The copy.