Engine#

Warning

This document is under construction.

The Engine implements a high-performance data processing pipeline composed of acquisition, processing, and formatting stages. See Concepts for more detail regarding this pipeline and its capabilities.

class vortex.engine.Engine#

A real-time engine that moves data through an acquisition, processing, and formatting pipeline.

Once the pipeline is configured, the engine can operate the pipeline in multiple sessions, bounded by start() and stop() calls. The session is active until the scan_queue becomes empty, EngineConfig.blocks_to_acquire have been acquired, stop() is called, or an error occurs.

__init__(logger=None)#

Create a new object with optional logging.

Parameters

logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.

initialize(config)#

Initialize the engine using the supplied configuration.

Parameters

config (EngineConfig) – New configuration to apply.

prepare()#

Prepare to start the engine. The engine allocates all blocks, assigns and allocates transfer buffers, and notifies endpoints to allocate their internal buffers, if needed.

start()#

Start a new session with the engine. Acquisition and IO components are preloaded, if applicable, and started. Every call of this method must be paired with a call to stop() at a later time.

wait()#

Wait for the active session to complete. This method will block indefinitely.

Raises

RuntimeError – If any error occurred to shut down the session prematurely. This is guaranteed to be the first of such errors in case a cascade of errors occurs.

wait_for(timeout)#

Wait the specified duration for the session to complete.

Parameters

timeout (float) – Maximum duration to wait in seconds.

Returns

boolTrue if engine has finished and False otherwise.

Raises

RuntimeError – If any error occurred to shut down the session prematurely. This is guaranteed to be the first of such errors in case a cascade of errors occurs.

stop()#

Request that the engine shut down and wait for it to do so. This method must be called for every call to start().

Raises

RuntimeError – If any error occurred to shut down the engine prematurely. This is guaranteed to be the first of such errors in case a cascade of errors occurs.

status()#

Query the engine status.

Returns

EngineStatus – The current status of the engine.

property config: EngineConfig#

Copy of the active configuration.

property scan_queue: ScanQueue#

Access the engine’s scan queue.

property done: bool#

Return True if the engine is stopped and False otherwise.

Warning

This property will return True until stop() is called. Do no rely on this property to determine if the engine has exited.

property event_callback: Callable[[Engine.Event, Exception], None]#

Callback to receive status events from the engine.

property job_callback: Callable[[int, EngineStatus, JobTiming], None]#

Callback to receive status and timing information for each block when it exits the pipeline.

Caution

Avoid computationally expensive tasks in this callback or the session may shut down prematurely due to delayed block recycling.

class vortex.engine.EngineConfig#

Configuration object for Engine.

property records_per_block: int#

Number of records (spectra or A-scans) in a block. Each block represents a slice of time, as determined by the number of records acquired per second. Default is 1000.

property blocks_to_allocate: int#

Number of blocks to pre-allocate prior to starting the session. This determines the maximum session duration or size that can be buffered in memory. The maximum buffered duration in records is the product of blocks_to_allocate and records_per_block. Default is 4.

property preload_count: int#

Number of blocks to commit to the hardware drivers prior to starting the session. This determines how far in advance the engine is generating signals and scheduling buffers for acquisition. Once the engine starts, the engine will never have more that this number of blocks pending for acquisition. The product of preload_count and records_per_block determines the number of records required for new inputs (e.g., scan pattern changes) to propagate through the pipeline. Default is 2.

property blocks_to_acquire#

The total acquisition duration as measured in blocks. Set to 0 for an indefinite acquisition, which ends only when the last scan reports that it is complete. Default is 0.

property post_scan_records: int#

The number of records to acquire after the last scan reports that is is complete. This is useful if hardware latencies cause the final samples of the scan to physically occur after the acquisition would have otherwise ended. Default is 0.

property scanner_warp: [NoWarp | AngularWarp | TelecentricWarp]#

Scan warp to generate the sample waveforms from the galvo waveforms. Default is NoWarp.

property galvo_output_channels: int#

Number of output channels to allocate for galvo waveforms. Default is 2.

property galvo_input_channels: int#

Number of input channels to allocate for galvo waveforms. Default is 2.

property strobes: List[SampleStrobe | SegmentStrobe | VolumeStrobe | ScanStrobe | EventStrobe]#

Scan pattern-derived strobes to generate for output. Default is [SampleStrobe(0, 2), SampleStrobe(1, 1000), SampleStrobe(2, 1000, Polarity.Low), SegmentStrobe(3), VolumeStrobe(4)].

add_acquisition(acquisition, processors, preload=True, master=True)#

Register the acquisition with the engine and route its output data to the listed processors. divide() and cycle() may be used to build arbitrarily nested schemes for distributing data between processors. For example, cycle() can be used to realize “ping-pong” GPU processing of data, such as below.

engine.add_acquisition(acquisition, [
    cycle([
        processorA,
        processorB
    ])
])

There is a single acquisition dispatch thread for the whole engine. Processing is dispatched sequentially to divided processing in the listed order but may complete out of order.

Parameters
  • acquisition (Acquisition) – The acquisition component to register with the engine. See the list of acquisition components for supported object types.

  • processors (List[Processor | Cycle | Divide]) – The graph of processing components to receive the data from this acquisition component. See the list of processing components for supported object types.

  • preload (bool) – Enable or disable preloading with this acquisition. If True, preload_count blocks will be queued before the engine starts. If False, the first block will be queued immediately after the engine starts.

  • master (bool) – Control when this acquisition is started in relation to other acquisition or IO components. All components with master=True are started after all components with master=False.

add_processor(processor, formatters)#

Register the processor with the engine and route its output to all listed formatters. The engine ensures that data arrives on the correct GPU device, if any, and deinterleaves multiple channels, if present, in preparation for processing. There is a single processing dispatch thread for the whole engine. Formatters are scheduled receive data in the order listed, although out-of-order completion may causes formatters to execute in different orders.

Parameters
  • processor (Processor) – The processor component to register with the engine. See the list of processors for supported object types.

  • formatters (List[Formatter]) – The list of formatting components to receive the data from this processor component and its parent acquisition component.

add_formatter(formatter, endpoints)#

Register the formatter component with the engine and apply its format plans to acquired and/or processed via the listed endpoints. Each formatter is allocated a dedicated thread for its endpoints. Endpoints receive data sequentially in the order listed.

Parameters
  • formatter (Formatter) – The formater component to register with the engine. See the list of processors for supported object types.

  • endpoints (List[Endpoints]) – The list of endpoints to receive this formatter’s plan and the data associated with the parent acquisition and processor components.

add_io(io, preload=True, master=False, lead_samples=0)#

Register the IO component with the engine.

Parameters
  • io (IO) – The IO component to register with the engine.

  • preload (bool) – Enable or disable preloading for this IO component. See add_acquisition() for full explanation.

  • master (bool) – Enable or disable master status for this IO component. See add_acquisition() for full explanation.

  • lead_samples (int) – The number of samples in advance to generate output waveforms in order to cancel out IO delay. This is done by looking ahead into the scan pattern-derived waveforms by the specified number of samples. Default is 0.

validate()#

Check the configuration for errors.

Raises

RuntimeError – If the configuration is invalid.

Warning

This method is not fully implemented yet.

vortex.engine.divide(nodes)#

Return an object that informs the engine to divide the acquired data between multiple processors that execute in parallel. In Python, this is equivalent to creating a list of nodes so this function effectively returns nodes.

Parameters

nodes (List[Processor | Cycle | Divide]) – The processor components to divide.

vortex.engine.cycle(nodes)#

Return an object that informs the engine to rotate between the processors in order for each block of acquired data.

Parameters

nodes (List[Processor | Cycle | Divide]) – The processor components to cycle.

class vortex.engine.ScanQueue#

A queue of scans to execute with the engine. When the scan queue is empty, the engine initiates a graceful shutdown.

append(scan, callback=None, marker=ScanBoundary())#

Append the specified scan to the queue.

Parameters
  • scan (Scan) – The scan to append to the queue. See the list of scans for supported object types.

  • callback (Callable[[int, ScanQueue.Event], None]) – Callback to receive notifications regarding this scan.

  • marker (ScanBoundary) – Boundary marker for the start of this scan.

Caution

This callback is executed in the acquisition dispatch thread of the engine, which is highly sensitive to timing delays. Avoid computationally expensive tasks in this callback or the session may shut down prematurely due to buffer underflows or overflows.

interrupt(scan, callback=None, marker=ScanBoundary())#

Clear the scan queue and immediately switch to the specified scan. The latency with which the transition propagates through the engine is determined by the engine’s preload_count.

Parameters
  • scan (Scan) – The scan to append to the queue. See the list of scans for supported object types.

  • callback (Callable[[int, ScanQueue.Event], None]) – Callback to receive notifications regarding this scan.

  • marker (ScanBoundary) – Boundary marker for the start of this scan.

Caution

This callback is executed in the acquisition dispatch thread of the engine, which is highly sensitive to timing delays. Avoid computationally expensive tasks in this callback or the engine may shut down prematurely due to buffer underflows or overflows.

reset()#

Clears the scan queue and internal scan state (i.e., last position and velocity).

clear()#

Clears the scan queue but maintains internal scan state (e.g., last position and velocity).

property empty_callback: Callable[[OnlineScanQueue], None]#

Callback to execute when the scan queue is empty but before the engine initiates a graceful shutdown. This represents the last opportunity to prolong the session. The callback receives a single argument of OnlineScanQueue which exposes a single OnlineScanQueue.append() method to provide a thread-safe mechanism to append another scan. This method is identical in operation to ScanQueue.append(). If no scan is appended, the engine will initiate a graceful shutdown immediately after the callback returns.

Attention

Any attempt to call a method of the ScanQueue during this callback will lead to deadlock. Only call methods of the OnlineScanQueue provided as the callback’s argument.