FZGPUModules 2.0
GPU-accelerated modular compression pipelines
Loading...
Searching...
No Matches
fz::Pipeline Class Reference

#include <compressor.h>

Classes

struct  FZMFileHeader
 

Public Member Functions

 Pipeline (size_t input_data_size=0, MemoryStrategy strategy=MemoryStrategy::MINIMAL, float pool_multiplier=3.0f)
 
 Pipeline (const std::string &config_path)
 
void setMemoryStrategy (MemoryStrategy strategy)
 
void setNumStreams (int num_streams)
 
void setDims (size_t x, size_t y=1, size_t z=1)
 
template<typename StageT , typename... Args>
StageT * addStage (Args &&... args)
 
int connect (Stage *dependent, Stage *producer, const std::string &output_name="output")
 
int connect (Stage *dependent, const std::vector< Stage * > &producers)
 
void finalize ()
 
void warmup (cudaStream_t stream=0)
 
void setWarmupOnFinalize (bool enable)
 
void setPoolManagedDecompOutput (bool enable)
 
size_t getMaxCompressedSize (size_t input_bytes) const
 
size_t getLastUncompressedSize () const
 
void compress (const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
 
void compress (const void *d_input, size_t input_size, void *d_output_buf, size_t output_buf_capacity, size_t *actual_output_size, cudaStream_t stream=0)
 
void decompress (const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
 
void decompress (const void *d_input, size_t input_size, void *d_output_buf, size_t output_buf_capacity, size_t *actual_output_size, cudaStream_t stream=0)
 
void reset (cudaStream_t stream=0)
 
void enableProfiling (bool enable)
 
const PipelinePerfResultgetLastPerfResult () const
 
CompressionDAGgetDAG ()
 
size_t getPoolThreshold () const
 
bool isMemPoolFallbackMode () const
 
void enableBoundsCheck (bool enable)
 
void setColoringEnabled (bool enable)
 
void enableGraphMode (bool enable)
 
void captureGraph (cudaStream_t stream=0)
 
void writeToFile (const std::string &filename, cudaStream_t stream=0)
 
FZMFileHeader buildHeader () const
 
void decompressFromFileInstance (const std::string &filename, void **d_output, size_t *output_size, cudaStream_t stream=0, PipelinePerfResult *perf_out=nullptr)
 
void loadConfig (const std::string &path)
 
void saveConfig (const std::string &path) const
 

Static Public Member Functions

static FZMFileHeader readHeader (const std::string &filename)
 
static void decompressFromFile (const std::string &filename, void **d_output, size_t *output_size, cudaStream_t stream=0, PipelinePerfResult *perf_out=nullptr, size_t pool_override_bytes=0)
 

Detailed Description

High-level pipeline API for building and executing compression workflows.

Stages are added with addStage<T>(), wired with connect(), then the pipeline is finalized and ready for compress()/decompress().

Ownership:

  • compress() output is pool-owned — do NOT cudaFree it. Valid until the next compress()/reset() or Pipeline destruction.
  • decompress() output is pool-owned by default - do NOT cudaFree it. Call setPoolManagedDecompOutput(false) to receive a caller-owned pointer instead.

Constructor & Destructor Documentation

◆ Pipeline() [1/2]

fz::Pipeline::Pipeline ( size_t  input_data_size = 0,
MemoryStrategy  strategy = MemoryStrategy::MINIMAL,
float  pool_multiplier = 3.0f 
)
explicit
Parameters
input_data_sizeExpected input size in bytes for pool sizing (0 = default).
strategyMINIMAL (on-demand alloc) or PREALLOCATE (upfront, required for graph mode).
pool_multiplierPool size = input_size × multiplier.

◆ Pipeline() [2/2]

fz::Pipeline::Pipeline ( const std::string &  config_path)
explicit

Construct directly from a TOML config file. Equivalent to the default constructor followed by loadConfig(path). The pipeline is finalized on return.

Parameters
config_pathPath to the .toml config file.

Member Function Documentation

◆ setMemoryStrategy()

void fz::Pipeline::setMemoryStrategy ( MemoryStrategy  strategy)

Must be called before finalize().

◆ setNumStreams()

void fz::Pipeline::setNumStreams ( int  num_streams)

Number of parallel CUDA streams for level-based execution. Must be called before finalize().

◆ setDims()

void fz::Pipeline::setDims ( size_t  x,
size_t  y = 1,
size_t  z = 1 
)
inline

Dataset spatial dimensions. Forwarded to every stage immediately on addStage() and again at finalize(). Call setDims() before addStage() so that dimension- aware stages (e.g. LorenzoQuantStage) have the correct ndim() from the moment they are added. Default: 1-D ({n, 1, 1}).

◆ addStage()

template<typename StageT , typename... Args>
StageT * fz::Pipeline::addStage ( Args &&...  args)

Add a stage to the pipeline.

Returns
Raw pointer owned by the Pipeline.

◆ connect() [1/2]

int fz::Pipeline::connect ( Stage dependent,
Stage producer,
const std::string &  output_name = "output" 
)

Connect two stages (dependent consumes an output of producer).

Parameters
dependentThe downstream stage that reads the output.
producerThe upstream stage that writes the output.
output_nameNamed output port of producer (default: "output").
Returns
Buffer ID (rarely needed directly).

◆ connect() [2/2]

int fz::Pipeline::connect ( Stage dependent,
const std::vector< Stage * > &  producers 
)

Connect a stage to multiple producers (one input per producer).

◆ finalize()

void fz::Pipeline::finalize ( )

Finalize the pipeline: validate topology, assign execution levels, and (for PREALLOCATE) allocate all buffers. Must be called before compress/decompress. If setWarmupOnFinalize(true) was set and input_size_hint > 0, runs warmup() automatically.

◆ warmup()

void fz::Pipeline::warmup ( cudaStream_t  stream = 0)

JIT-compile all pipeline kernels by running a dummy compress+decompress pass. Eliminates the first-call latency spike from CUDA's lazy PTX→SASS compilation. Requires a non-zero input_size_hint in the constructor.

◆ setWarmupOnFinalize()

void fz::Pipeline::setWarmupOnFinalize ( bool  enable)
inline

When true, finalize() automatically calls warmup(). Must be set before finalize().

◆ setPoolManagedDecompOutput()

void fz::Pipeline::setPoolManagedDecompOutput ( bool  enable)
inline

When true (default), decompress() returns a pool-owned pointer (do NOT cudaFree). Valid until the next decompress() call or Pipeline destruction. When false, decompress() returns a freshly cudaMalloc'd pointer that the caller must cudaFree().

◆ getMaxCompressedSize()

size_t fz::Pipeline::getMaxCompressedSize ( size_t  input_bytes) const

Return the worst-case compressed output size in bytes for the given input.

Must be called after finalize(). Use this to pre-allocate a caller-owned output buffer before passing it to the user-owned compress() overload.

The returned value is a tight upper bound derived from each stage's estimateOutputSizes() chain — it should rarely exceed ~110% of the actual compressed size for typical data.

Parameters
input_bytesNumber of bytes you intend to compress.
Exceptions
std::runtime_errorif the pipeline is not yet finalized.

◆ getLastUncompressedSize()

size_t fz::Pipeline::getLastUncompressedSize ( ) const
inline

Return the uncompressed byte count from the most recent compress() call.

Returns the original (pre-padding) input size so the value always matches what the caller passed in, even when the pipeline transparently padded the input for alignment. Returns 0 if compress() has never been called.

Useful for pre-allocating a decompression output buffer without needing out-of-band metadata:

pipeline.compress(d_in, in_bytes, &d_comp, &comp_sz, stream);
size_t decomp_bytes = pipeline.getLastUncompressedSize();
// allocate d_out of size decomp_bytes, then:
pipeline.decompress(d_comp, comp_sz, d_out, decomp_bytes, &actual, stream);

The value persists across reset()reset() invalidates the compressed output pointer but the size remains meaningful for planning the next call.

◆ compress() [1/2]

void fz::Pipeline::compress ( const void *  d_input,
size_t  input_size,
void **  d_output,
size_t *  output_size,
cudaStream_t  stream = 0 
)

Compress (pool-owned output). The pool retains the output buffer.

Parameters
d_inputDevice pointer to raw input data.
input_sizeSize of d_input in bytes.
d_outputReceives a pool-owned pointer to the compressed output. Do NOT call cudaFree() — valid until the next compress(), reset(), or Pipeline destruction.
output_sizeReceives the exact compressed size in bytes.
streamCUDA stream for all GPU operations.

◆ compress() [2/2]

void fz::Pipeline::compress ( const void *  d_input,
size_t  input_size,
void *  d_output_buf,
size_t  output_buf_capacity,
size_t *  actual_output_size,
cudaStream_t  stream = 0 
)

Compress (user-owned output). The compressed data is written into the caller-provided device buffer.

The buffer just needs to be large enough for the actual compressed output of this specific call — which depends on the data. If the actual output exceeds output_buf_capacity a std::runtime_error is thrown with the actual and capacity sizes so the caller can retry with a larger buffer.

Use getMaxCompressedSize(input_bytes) for a guaranteed safe upper bound. Alternatively, if you know empirically that your data compresses to at most X bytes for your workload, you can pass X directly and accept the small risk of a runtime error on unusually incompressible inputs.

Incompatible with CUDA Graph mode (the output address cannot be baked into a captured graph). Throws if enableGraphMode(true) was set.

Parameters
d_inputDevice pointer to raw input data.
input_sizeSize of d_input in bytes.
d_output_bufCaller-allocated device buffer to write compressed data into.
output_buf_capacityCapacity of d_output_buf in bytes. Must fit the actual compressed output for this call.
actual_output_sizeReceives the exact compressed bytes written.
streamCUDA stream for all GPU operations.

◆ decompress() [1/2]

void fz::Pipeline::decompress ( const void *  d_input,
size_t  input_size,
void **  d_output,
size_t *  output_size,
cudaStream_t  stream = 0 
)

Decompress. Inverse of compress().

Parameters
d_inputnullptr to read from the forward DAG's live buffers (simplest path, valid immediately after compress()). Non-null for an external compressed buffer.
input_sizeByte size of d_input (ignored when d_input is nullptr).
d_outputReceives the decompressed device pointer. Ownership depends on setPoolManagedDecompOutput(): false → caller-owned, must cudaFree. true (default) → pool-owned, do NOT cudaFree.
output_sizeReceives the exact decompressed size in bytes.
streamCUDA stream for all GPU operations.

◆ decompress() [2/2]

void fz::Pipeline::decompress ( const void *  d_input,
size_t  input_size,
void *  d_output_buf,
size_t  output_buf_capacity,
size_t *  actual_output_size,
cudaStream_t  stream = 0 
)

Decompress into a caller-provided device buffer (user-owned output).

The decompressed data is written directly into d_output_buf. No cudaMalloc or pool allocation is performed — the caller owns the buffer entirely.

The buffer just needs to be large enough for the actual decompressed output of this call. If it is too small a std::runtime_error is thrown with the actual size so the caller can retry. Typically the uncompressed size is known from the file header (FZMHeaderCore::uncompressed_size) or from the original compress() call.

Parameters
d_inputSee decompress() above.
input_sizeSee decompress() above.
d_output_bufCaller-allocated device buffer to receive decompressed data.
output_buf_capacityCapacity of d_output_buf in bytes.
actual_output_sizeReceives the exact bytes written.
streamCUDA stream for all GPU operations.

◆ reset()

void fz::Pipeline::reset ( cudaStream_t  stream = 0)

Free non-persistent buffers and reset execution state for re-use.

◆ enableProfiling()

void fz::Pipeline::enableProfiling ( bool  enable)

Enable per-stage CUDA event profiling. Zero overhead when disabled. Results available via getLastPerfResult() after each compress()/decompress().

◆ getLastPerfResult()

const PipelinePerfResult & fz::Pipeline::getLastPerfResult ( ) const
inline

Performance snapshot from the most recent compress() or decompress() call.

◆ getDAG()

CompressionDAG * fz::Pipeline::getDAG ( )
inline

The underlying DAG (for advanced/diagnostic use).

◆ getPoolThreshold()

size_t fz::Pipeline::getPoolThreshold ( ) const

Pool release threshold in bytes as configured by finalize().

◆ isMemPoolFallbackMode()

bool fz::Pipeline::isMemPoolFallbackMode ( ) const

Returns true if the internal memory pool is running in cudaMalloc fallback mode.

Fallback mode is active when pool creation failed at construction time (e.g. vGPU environments), when FZ_FORCE_MEMPOOL_FALLBACK is set in the environment, or when MemoryPoolConfig::force_fallback was passed to the Pipeline constructor. In fallback mode all allocations use cudaMalloc/cudaFree with explicit stream synchronization rather than stream-ordered pool allocations.

◆ enableBoundsCheck()

void fz::Pipeline::enableBoundsCheck ( bool  enable)
inline

Enable runtime buffer-overwrite detection. After each stage executes, checks that actual output size ≤ allocated capacity. Always active in debug builds regardless of this flag.

◆ setColoringEnabled()

void fz::Pipeline::setColoringEnabled ( bool  enable)
inline

Enable or disable buffer coloring for PREALLOCATE mode (default: enabled). Disable when per-buffer memory inspection is needed (e.g. cuda-memcheck). Must be called before finalize().

◆ enableGraphMode()

void fz::Pipeline::enableGraphMode ( bool  enable)

Enable CUDA Graph mode. captureGraph() will record the forward compression pass as a replayable CUDA Graph, eliminating per-call CPU dispatch overhead.

Requirements: PREALLOCATE strategy, non-zero input_size_hint, all stages graph-compatible, single-source pipeline. Must be set before finalize().

◆ captureGraph()

void fz::Pipeline::captureGraph ( cudaStream_t  stream = 0)

Record the forward compression pass as a CUDA Graph.

After this call compress() uses cudaGraphLaunch() instead of dag_->execute(). The input pointer is baked into the graph via a stable internal buffer (d_graph_input_); compress() copies user input there before each launch.

Can be called again to re-capture. Must be called after finalize() and before the first compress().

◆ writeToFile()

void fz::Pipeline::writeToFile ( const std::string &  filename,
cudaStream_t  stream = 0 
)

Write compressed data to an FZM file. compress() must have been called first.

◆ readHeader()

static FZMFileHeader fz::Pipeline::readHeader ( const std::string &  filename)
static

Parse the FZM header from a file without decompressing the payload.

◆ buildHeader()

FZMFileHeader fz::Pipeline::buildHeader ( ) const

Build the FZM header from current pipeline state. Requires a prior compress().

◆ decompressFromFile()

static void fz::Pipeline::decompressFromFile ( const std::string &  filename,
void **  d_output,
size_t *  output_size,
cudaStream_t  stream = 0,
PipelinePerfResult perf_out = nullptr,
size_t  pool_override_bytes = 0 
)
static

One-shot decompress from an FZM file. Reconstructs the pipeline from the file header, allocates a pool, and runs decompression.

Output is always caller-owned (caller must cudaFree *d_output).

Parameters
filenamePath to the .fzm file.
d_outputReceives the decompressed device pointer (caller must cudaFree).
output_sizeReceives the decompressed size in bytes.
streamCUDA stream for all GPU operations.
perf_outOptional timing result (GPU compute only, excludes I/O).
pool_override_bytesOverride automatic pool sizing (0 = automatic). Formula: C + 2.5×max_stage_uncompressed + 32 MiB.

◆ decompressFromFileInstance()

void fz::Pipeline::decompressFromFileInstance ( const std::string &  filename,
void **  d_output,
size_t *  output_size,
cudaStream_t  stream = 0,
PipelinePerfResult perf_out = nullptr 
)

One-shot decompress from an FZM file (instance overload).

Behaves identically to the static decompressFromFile() overload but respects the setPoolManagedDecompOutput() flag on this instance: false → caller must cudaFree(*d_output). true (default) → *d_output is pool-owned; do NOT cudaFree.

The distinct name avoids overload-resolution ambiguity at call sites that are not member functions.

Parameters
filenamePath to the .fzm file.
d_outputReceives the decompressed device pointer.
output_sizeReceives the decompressed size in bytes.
streamCUDA stream for all GPU operations.
perf_outOptional timing result.

◆ loadConfig()

void fz::Pipeline::loadConfig ( const std::string &  path)

Build and finalize the pipeline from a TOML config file.

Adds stages, wires connections, applies pipeline-level settings, then calls finalize() internally. The pipeline must not be finalized before this call.

Recognized stage types: LorenzoQuant, Lorenzo, Quantizer, Bitshuffle, RZE, RLE, Bitpack, Difference, Zigzag, Negabinary.

Exceptions
std::runtime_errorFile not found, parse error, unknown stage type, bad wiring reference, or already finalized.

◆ saveConfig()

void fz::Pipeline::saveConfig ( const std::string &  path) const

Serialize the current pipeline to a TOML config file.

Requires finalize() to have been called. The written file can be passed back to loadConfig() to reconstruct an equivalent pipeline.

Exceptions
std::runtime_errorPipeline not finalized.