FZGPUModules 1.0
GPU-accelerated modular compression pipeline
Loading...
Searching...
No Matches
fz::Pipeline Class Reference

#include <compressor.h>

Classes

struct  FZMFileHeader
 
struct  InputSpec
 

Public Member Functions

 Pipeline (size_t input_data_size=0, MemoryStrategy strategy=MemoryStrategy::MINIMAL, float pool_multiplier=3.0f)
 
void setMemoryStrategy (MemoryStrategy strategy)
 
void setNumStreams (int num_streams)
 
void setDims (size_t x, size_t y=1, size_t z=1)
 
template<typename StageT , typename... Args>
StageT * addStage (Args &&... args)
 
int connect (Stage *dependent, Stage *producer, const std::string &output_name="output")
 
int connect (Stage *dependent, const std::vector< Stage * > &producers)
 
void finalize ()
 
void warmup (cudaStream_t stream=0)
 
void setWarmupOnFinalize (bool enable)
 
void setPoolManagedDecompOutput (bool enable)
 
void compress (const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
 
void compress (const std::vector< InputSpec > &inputs, void **d_output, size_t *output_size, cudaStream_t stream=0)
 
void setInputSizeHint (Stage *source, size_t size)
 
void decompress (const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
 
std::vector< std::pair< void *, size_t > > decompressMulti (const void *d_input=nullptr, size_t input_size=0, cudaStream_t stream=0)
 
void reset (cudaStream_t stream=0)
 
void enableProfiling (bool enable)
 
const PipelinePerfResultgetLastPerfResult () const
 
CompressionDAGgetDAG ()
 
size_t getPoolThreshold () const
 
void enableBoundsCheck (bool enable)
 
void setColoringEnabled (bool enable)
 
void enableGraphMode (bool enable)
 
void captureGraph (cudaStream_t stream=0)
 
void writeToFile (const std::string &filename, cudaStream_t stream=0)
 
FZMFileHeader buildHeader () const
 

Static Public Member Functions

static FZMFileHeader readHeader (const std::string &filename)
 
static void decompressFromFile (const std::string &filename, void **d_output, size_t *output_size, cudaStream_t stream=0, PipelinePerfResult *perf_out=nullptr, size_t pool_override_bytes=0)
 

Detailed Description

High-level pipeline API for building and executing compression workflows.

Stages are added with addStage<T>(), wired with connect(), then the pipeline is finalized and ready for compress()/decompress().

Ownership:

  • compress() output is pool-owned — do NOT cudaFree it. Valid until the next compress()/reset() or Pipeline destruction.
  • decompress() output is caller-owned by default (freshly cudaMalloc'd). Set setPoolManagedDecompOutput(true) to receive a pool-owned pointer instead.
Note
Not thread-safe. Use one Pipeline per host thread.

Constructor & Destructor Documentation

◆ Pipeline()

fz::Pipeline::Pipeline ( size_t  input_data_size = 0,
MemoryStrategy  strategy = MemoryStrategy::MINIMAL,
float  pool_multiplier = 3.0f 
)
explicit
Parameters
input_data_sizeExpected input size in bytes for pool sizing (0 = default).
strategyMINIMAL (on-demand alloc) or PREALLOCATE (upfront, required for graph mode).
pool_multiplierPool size = input_size × multiplier.

Member Function Documentation

◆ setMemoryStrategy()

void fz::Pipeline::setMemoryStrategy ( MemoryStrategy  strategy)

Must be called before finalize().

◆ setNumStreams()

void fz::Pipeline::setNumStreams ( int  num_streams)

Number of parallel CUDA streams for level-based execution. Must be called before finalize().

◆ setDims()

void fz::Pipeline::setDims ( size_t  x,
size_t  y = 1,
size_t  z = 1 
)
inline

Dataset spatial dimensions. Controls which Lorenzo variant is selected by addLorenzo() and is forwarded to LorenzoStage at finalize(). Default: 1-D ({n, 1, 1}).

◆ addStage()

template<typename StageT , typename... Args>
StageT * fz::Pipeline::addStage ( Args &&...  args)

Add a stage to the pipeline.

Returns
Raw pointer owned by the Pipeline.

◆ connect() [1/2]

int fz::Pipeline::connect ( Stage dependent,
Stage producer,
const std::string &  output_name = "output" 
)

Connect two stages (dependent consumes an output of producer).

Parameters
dependentThe downstream stage that reads the output.
producerThe upstream stage that writes the output.
output_nameNamed output port of producer (default: "output").
Returns
Buffer ID (rarely needed directly).

◆ connect() [2/2]

int fz::Pipeline::connect ( Stage dependent,
const std::vector< Stage * > &  producers 
)

Connect a stage to multiple producers (one input per producer).

◆ finalize()

void fz::Pipeline::finalize ( )

Finalize the pipeline: validate topology, assign execution levels, and (for PREALLOCATE) allocate all buffers. Must be called before compress/decompress. If setWarmupOnFinalize(true) was set and input_size_hint > 0, runs warmup() automatically.

◆ warmup()

void fz::Pipeline::warmup ( cudaStream_t  stream = 0)

JIT-compile all pipeline kernels by running a dummy compress+decompress pass. Eliminates the first-call latency spike from CUDA's lazy PTX→SASS compilation. Requires a non-zero input_size_hint in the constructor.

◆ setWarmupOnFinalize()

void fz::Pipeline::setWarmupOnFinalize ( bool  enable)
inline

When true, finalize() automatically calls warmup(). Must be set before finalize().

◆ setPoolManagedDecompOutput()

void fz::Pipeline::setPoolManagedDecompOutput ( bool  enable)
inline

When true, decompress() returns a pool-owned pointer (do NOT cudaFree). Valid until the next decompress() call or Pipeline destruction. When false (default), decompress() returns a freshly cudaMalloc'd pointer that the caller must cudaFree().

◆ compress() [1/2]

void fz::Pipeline::compress ( const void *  d_input,
size_t  input_size,
void **  d_output,
size_t *  output_size,
cudaStream_t  stream = 0 
)

Compress (single-source). Throws if the pipeline has more than one source. *d_output is pool-owned — do NOT cudaFree.

◆ compress() [2/2]

void fz::Pipeline::compress ( const std::vector< InputSpec > &  inputs,
void **  d_output,
size_t *  output_size,
cudaStream_t  stream = 0 
)

Compress (multi-source). One InputSpec per source stage; order does not matter. *d_output is pool-owned — do NOT cudaFree.

◆ setInputSizeHint()

void fz::Pipeline::setInputSizeHint ( Stage source,
size_t  size 
)
inline

Per-source input size hint for multi-source pipelines. Call after addStage() but before finalize() to seed propagateBufferSizes() for accurate estimates.

◆ decompress()

void fz::Pipeline::decompress ( const void *  d_input,
size_t  input_size,
void **  d_output,
size_t *  output_size,
cudaStream_t  stream = 0 
)

Decompress (single-source). Inverse of compress().

Parameters
d_inputnullptr to read from the forward DAG's live buffers (simplest path, valid immediately after compress()). Non-null for an external compressed buffer.
input_sizeByte size of d_input (ignored when d_input is nullptr).
d_outputReceives the decompressed device pointer. Ownership depends on setPoolManagedDecompOutput().
output_sizeReceives the exact decompressed size in bytes.
streamCUDA stream for all GPU operations.

For multi-source pipelines, *d_output is a concat buffer: [num_bufs:u32][size1:u64][data1][size2:u64][data2]... Use decompressMulti() to receive individual per-source buffers.

◆ decompressMulti()

std::vector< std::pair< void *, size_t > > fz::Pipeline::decompressMulti ( const void *  d_input = nullptr,
size_t  input_size = 0,
cudaStream_t  stream = 0 
)

Decompress (multi-source). Returns one {device_ptr, size} pair per source, in the same order as forward source discovery. Ownership follows setPoolManagedDecompOutput().

◆ reset()

void fz::Pipeline::reset ( cudaStream_t  stream = 0)

Free non-persistent buffers and reset execution state for re-use.

◆ enableProfiling()

void fz::Pipeline::enableProfiling ( bool  enable)

Enable per-stage CUDA event profiling. Zero overhead when disabled. Results available via getLastPerfResult() after each compress()/decompress().

◆ getLastPerfResult()

const PipelinePerfResult & fz::Pipeline::getLastPerfResult ( ) const
inline

Performance snapshot from the most recent compress() or decompress() call.

◆ getDAG()

CompressionDAG * fz::Pipeline::getDAG ( )
inline

The underlying DAG (for advanced/diagnostic use).

◆ getPoolThreshold()

size_t fz::Pipeline::getPoolThreshold ( ) const

Pool release threshold in bytes as configured by finalize().

◆ enableBoundsCheck()

void fz::Pipeline::enableBoundsCheck ( bool  enable)
inline

Enable runtime buffer-overwrite detection. After each stage executes, checks that actual output size ≤ allocated capacity. Always active in debug builds regardless of this flag.

◆ setColoringEnabled()

void fz::Pipeline::setColoringEnabled ( bool  enable)
inline

Enable or disable buffer coloring for PREALLOCATE mode (default: enabled). Disable when per-buffer memory inspection is needed (e.g. cuda-memcheck). Must be called before finalize().

◆ enableGraphMode()

void fz::Pipeline::enableGraphMode ( bool  enable)

Enable CUDA Graph mode. captureGraph() will record the forward compression pass as a replayable CUDA Graph, eliminating per-call CPU dispatch overhead.

Requirements: PREALLOCATE strategy, non-zero input_size_hint, all stages graph-compatible, single-source pipeline. Must be set before finalize().

◆ captureGraph()

void fz::Pipeline::captureGraph ( cudaStream_t  stream = 0)

Record the forward compression pass as a CUDA Graph.

After this call compress() uses cudaGraphLaunch() instead of dag_->execute(). The input pointer is baked into the graph via a stable internal buffer (d_graph_input_); compress() copies user input there before each launch.

Can be called again to re-capture. Must be called after finalize() and before the first compress().

◆ writeToFile()

void fz::Pipeline::writeToFile ( const std::string &  filename,
cudaStream_t  stream = 0 
)

Write compressed data to an FZM file. compress() must have been called first.

◆ readHeader()

static FZMFileHeader fz::Pipeline::readHeader ( const std::string &  filename)
static

Parse the FZM header from a file without decompressing the payload.

◆ buildHeader()

FZMFileHeader fz::Pipeline::buildHeader ( ) const

Build the FZM header from current pipeline state. Requires a prior compress().

◆ decompressFromFile()

static void fz::Pipeline::decompressFromFile ( const std::string &  filename,
void **  d_output,
size_t *  output_size,
cudaStream_t  stream = 0,
PipelinePerfResult perf_out = nullptr,
size_t  pool_override_bytes = 0 
)
static

One-shot decompress from an FZM file. Reconstructs the pipeline from the file header, allocates a pool, and runs decompression.

Parameters
filenamePath to the .fzm file.
d_outputReceives the decompressed device pointer (caller must cudaFree).
output_sizeReceives the decompressed size in bytes.
streamCUDA stream for all GPU operations.
perf_outOptional timing result (GPU compute only, excludes I/O).
pool_override_bytesOverride automatic pool sizing (0 = automatic). Formula: C + 2.5×max_stage_uncompressed + 32 MiB.