Process

Processing components perform transformations of data in vortex, such as computing the FFT. Each component provides a similar API.

Overview

A processing component is first initialized with a call to initialize(), supplying a matching configuration object. Initialization brings the component into a state where processing can begin and frequently involves the allocation of memory buffers. An unlimited amount of data can then be transformed through the processor. Processors allow changes to their configuration using the change() method which accepts the new configuration. Not all aspects of the configuration can be changed, however. Take care when changes of configuration require reallocation of internal buffers.

Data is transformed synchronously with next() or asynchronously with next_async(). Both methods accept an id argument, which is provided only for user bookkeeping and logging. In the documentation below, the input is referred to as \(x[i,j,k]\) (records) and the output is referred to as \(y[i,j,k]\) (A-scans). The indices \(i\), \(j\), and \(k\) correspond to the record/A-scan, sample, and channel dimensions, respectively. All buffers must have a single channel per sample (i.e., \(k \in \{ 1 \}\)).

Components

Null

class vortex.process.NullProcessor

Perform no processing.

This class is provided as an engine placeholder for testing or mocking. No operation is performed on the input or output buffers.

property config: NullProcessorConfig

Copy of the active configuration.

class vortex.process.NullProcessorConfig

Configuration object for NullProcessor.

property input_shape: List[int[3]]

Required shape of input buffers. Returns list of [ records_per_block, samples_per_record, 1 ]. Read-only.

property samples_per_record: int

Number of samples per record.

property records_per_block: int

Number of records in each input buffer or block. Identical to ascans_per_block.

property output_shape: List[int[3]]

Required shape of output buffers. Returns list of [ ascans_per_block, samples_per_ascan, 1]. Read-only.

property samples_per_ascan: int

Number of samples per A-scan. Returns either samples_per_record or the length of resampling_samples, if the latter is available. Read-only.

property ascans_per_block: int

Number of A-scans in each acquired buffer or block. Identical to records_per_block.

validate()

Check the configuration for errors.

Raises:

RuntimeError – If the configuration is invalid.

copy()

Create a copy of this configuration.

Returns:

NullProcessorConfig – The copy.

Copy

class vortex.process.CopyProcessor

Copy data from input to output buffers with optional slicing (\(s[n]\)) and linear transformation (\(a\) and \(b\)).

\[y[i,j,k] = a + b x[i,s[j],k]\]

OCT processing is not performed. This processor is intended primarily for acquisitions that provide processed OCT data, such as AlazarFFTAcquisition. Computation is performed on the CPU.

__init__(logger=None)

Create a new object with optional logging.

Parameters:

logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.

initialize(config)

Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.

Parameters:

config (CopyProcessorConfig) – New configuration to apply.

change(config)

Change the processor configuration. All configuration options may be changed, but changes to CopyProcessorConfig.slots will not have an effect.

Danger

It is not safe to call this method while a block is currently processing.

Parameters:

config (CopyProcessorConfig) – New configuration to apply.

next(input_buffer, output_buffer, id=0, append_history=True)

Process the next buffer.

Parameters:
next_async(input_buffer, output_buffer, callback, id=0, append_history=True)

Process the next buffer asynchronously and execute the callback when complete.

Caution

The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.

Parameters:
  • input_buffer (numpy.ndarray[numpy.uint16]) – The input buffer, with shape that matches self.config.input_shape.

  • output_buffer (numpy.ndarray[numpy.int8]) – The output buffer, with shape that matches self.config.output_shape.

  • callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.

  • id (int) – Number to associate with the buffer for logging purposes.

property config: CopyProcessorConfig

Copy of the active configuration.

class vortex.process.CopyProcessorConfig

Base: NullProcessorConfig

Configuration object for CopyProcessor.

property sample_slice: [NullSlice | SimpleSlice]

Optional slicing operation \(s[n]\) to apply along each record/A-scan. Defaults to NullSlice().

property sample_transform: [NullTransform | LinearTransform]

Optional linear transformation \(a\) and \(b\) applied to each sample during copy. Defaults to NullTransform().

property slots: int

Number of parallel processing pipelines. Adjust to achieve the desired CPU utilization for machines with high hardware concurrency. Recommended minimum is 2 slots to facilitate pipelining of successive blocks. The copy itself parallelized across all CPU cores; this field only affects pipeline-level parallelism. Defaults to 2.

copy()

Create a copy of this configuration.

Returns:

CopyProcessorConfig – The copy.

OCT

These OCT processors perform averaging, resampling, filtering, and FFT operations to transform raw spectra into A-scans. These operations are applied in the order shown below.

  1. Rolling average with window length \(M\).

    \[\hat{x}[i,j,k] = x[i,j,k] - \frac{1}{M} \sum_{l=0}^{M-1}{ x[i - l,j,k] } ,\]
  2. Resampling with linear interpolation (\(r[j]\)).

    \[z[i,j,k] = \big(\lceil r[j] \rceil - r[j] \big) \hat{x}\big[i, \lfloor r[j] \rfloor, k \big] + \big(r[j] - \lfloor r[j] \rfloor\big) \hat{x}\big[i, \lceil r[j] \rceil, k \big] ,\]
  3. Frequency-domain filtering (\(h[i,j,k]\)) and inverse FFT with normalization.

    \[y[i,j,k] = \log_{10} \left| \frac{1}{N} \mathcal{F}^{-1} \big\{ h[:,j,:] z[i,j,k] \big\} \right|^2\]

All computation is performed in floating-point until the cast to the output datatype. Each operation can be enabled, disabled, and/or customized via the processor configuration options.

CPU

class vortex.process.CPUProcessor

Perform OCT processing with averaging, resampling by linear interpolation, spectral filtering, and FFT on the CPU.

__init__(logger=None)

Create a new object with optional logging.

Parameters:

logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.

initialize(config)

Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.

Parameters:

config (CPUProcessorConfig) – New configuration to apply.

change(config)

Change the processor configuration. If the change requires buffer reallocation, the pipeline is stalled and record history for the rolling average may be lost. May be called while a block is currently processing.

Parameters:

config (CPUProcessorConfig) – New configuration to apply.

next(input_buffer, output_buffer, id=0, append_history=True)

Process the next buffer.

Parameters:
next_async(input_buffer, output_buffer, callback, id=0, append_history=True)

Process the next buffer asynchronously and execute the callback when complete.

Caution

The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.

Parameters:
  • input_buffer (numpy.ndarray[numpy.uint16]) – The input buffer, with shape that matches self.config.input_shape.

  • output_buffer (numpy.ndarray[numpy.int8]) – The output buffer, with shape that matches self.config.output_shape.

  • callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.

  • id (int) – Number to associate with the buffer for logging purposes.

  • append_history (bool) – Include the raw spectra from this input buffer in the rolling average.

property config: CPUProcessorConfig

Copy of the active configuration.

class vortex.process.CPUProcessorConfig

Base: NullProcessorConfig

Configuration object for CPUProcessor.

property average_window: int

Length \(M\) of rolling average window in records/A-scans for background subtraction. Disable background subtraction by setting to 0. Disabled by default.

Note

Record/A-scan history includes both active and inactive records.

Warning

Record/A-scan history is stored internally within the processor to allow average_window to exceed records_per_block. Create a new processor to discard this history.

property resampling_samples: numpy.ndarray[numpy.float32]

Optional positions at which to resample the raw spectra prior FFT. The number of resampled positions determines samples_per_record. Valid positions are in the range [0, samples_per_record]. Set to an empty array ([]) to disable resampling. Disabled by default.

property spectral_filter: numpy.ndarray[numpy.float32 | numpy.complex64]

Optional spectral filter to multiply with the spectra after resampling but before the FFT. Must have shape compatible with [ samples_per_ascan ]; that is, the spectral filter should match the length of the output A-scan, not the input record. Set to an empty array ([]) to disable spectral filtering. Disabled by default.

property enable_ifft: bool

Enable the FFT for OCT processing with length \(N\) determined by samples_per_ascan. When enabled, the FFT is multiplied by the normalization factor of \(1 / N\). When disabled, the complex magnitude is performed instead and no normalization factor is applied. Enabled by default.

Note

If FFT normalization is not desired, scale the spectral filter by \(N\) to cancel out the normalization factor.

property enable_log10

Enable the application of \(log_{10}(...)\) to the complex magnitude. Enabled by default.

property enable_square

Enabling squaring of the complex magnitude after the FFT. When enabled, the processor output is power. When disabled, the processor output is amplitude. Enabled by default.

property channel: int

Index of channel to select for processing.

property slots: int

Number of parallel processing pipelines. Adjust to achieve the desired CPU utilization for machines with high hardware concurrency. Recommended minimum is 2-4 slots to facilitate pipelining of successive blocks but higher numbers will likely yield better throughput. Each processing step is individually parallelized across all CPU cores; this field only affects pipeline-level parallelism. Defaults to 2.

copy()

Create a copy of this configuration.

Returns:

CPUProcessorConfig – The copy.

GPU

class vortex.process.CUDAProcessor

Perform OCT processing with averaging, resampling by linear interpolation, spectral filtering, and FFT on a CUDA-capable GPU.

__init__(logger=None)

Create a new object with optional logging.

Parameters:

logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.

initialize(config)

Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.

Parameters:

config (CUDAProcessorConfig) – New configuration to apply.

change(config)

Change the processor configuration. May be called while a block is currently processing. If the change requires buffer reallocation, the pipeline is stalled and record history for the rolling average may be lost. Changes that require no buffer reallocation and take effect immediately are

  • disabling any processing step (e.g., resampling or FFT),

  • reducing the window of the rolling average, and

  • altering a non-empty spectral filter or resampling vector without increasing its length.

All other changes will likely require a buffer reallocation.

Parameters:

config (CUDAProcessorConfig) – New configuration to apply.

next(input_buffer, output_buffer, id=0, append_history=True)

Process the next buffer.

Parameters:
  • input_buffer (cupy.ndarray[cupy.uint16]) – The input buffer, with shape that matches self.config.input_shape.

  • output_buffer (cupy.ndarray[cupy.int8]) – The output buffer, with shape that matches self.config.output_shape.

  • id (int) – Number to associate with the buffer for logging purposes.

  • append_history (bool) – Include the raw spectra from this input buffer in the rolling average.

next_async(input_buffer, output_buffer, callback, id=0, append_history=True)

Process the next buffer asynchronously and execute the callback when complete.

Caution

The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.

Parameters:
  • input_buffer (cupy.ndarray[cupy.uint16]) – The input buffer, with shape that matches self.config.input_shape.

  • output_buffer (cupy.ndarray[cupy.int8]) – The output buffer, with shape that matches self.config.output_shape.

  • callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.

  • id (int) – Number to associate with the buffer for logging purposes.

  • append_history (bool) – Include the raw spectra from this input buffer in the rolling average.

property config: CUDAProcessorConfig

Copy of the current configuration.

class vortex.process.CUDAProcessorConfig

Base: CPUProcessorConfig

Configuration object for CUDAProcessor.

property clock_channel: int | None

Index of k-clock channel for dynamic resampling. Enables k-clock resampling on a per A-scan basis with set to a non-None value. Defaults to None.

Note

Dynamic resampling is only supported when the cuda_dynamic_resampling feature is present. This requires CUDA 11 or higher.

property interpret_as_signed: bool

Interpret the input data as signed data instead of unsigned data. Relevant only for Python where data types are pre-specified. Defaults to False.

property slots: int

Number of parallel CUDA streams to use for processing. Recommended minimum is 2 slots to facilitate pipelining of successive blocks. For GPUs with sufficient compute resources, increasing the number of slots could enable parallel computation. Defaults to 2.

property device: int

Index of CUDA device to use for processing. Defaults to index 0.

copy()

Create a copy of this configuration.

Returns:

CUDAProcessorConfig – The copy.