FZGPUModules 1.0
GPU-accelerated modular compression pipeline
Loading...
Searching...
No Matches
compressor.h
1#pragma once
2
3#include "pipeline/dag.h"
4#include "pipeline/perf.h"
5#include "stage/stage.h"
7#include "mem/mempool.h"
8#include "fzm_format.h"
9
10#include <array>
11#include <memory>
12#include <stdexcept>
13#include <unordered_map>
14
15namespace fz {
16
31class Pipeline {
32public:
38 explicit Pipeline(
39 size_t input_data_size = 0,
41 float pool_multiplier = 3.0f
42 );
43
44 ~Pipeline();
45
46 // ── Configuration ─────────────────────────────────────────────────────────
47
50
52 void setNumStreams(int num_streams);
53
59 void setDims(size_t x, size_t y = 1, size_t z = 1) { dims_ = {x, y, z}; }
60 void setDims(std::array<size_t, 3> dims) { dims_ = dims; }
61 std::array<size_t, 3> getDims() const { return dims_; }
62
63 // ── Builder API ───────────────────────────────────────────────────────────
64
69 template<typename StageT, typename... Args>
70 StageT* addStage(Args&&... args);
71
79 int connect(Stage* dependent, Stage* producer, const std::string& output_name = "output");
80
82 int connect(Stage* dependent, const std::vector<Stage*>& producers);
83
89 void finalize();
90
96 void warmup(cudaStream_t stream = 0);
97
99 void setWarmupOnFinalize(bool enable) { warmup_on_finalize_ = enable; }
100 bool isWarmupOnFinalizeEnabled() const { return warmup_on_finalize_; }
101
108 void setPoolManagedDecompOutput(bool enable) { pool_managed_decomp_ = enable; }
109 bool isPoolManagedDecompOutput() const { return pool_managed_decomp_; }
110
111 // ── Execution ─────────────────────────────────────────────────────────────
112
119 struct InputSpec {
120 Stage* source;
121 const void* d_data;
122 size_t size;
123 };
124
130 const void* d_input,
131 size_t input_size,
132 void** d_output,
133 size_t* output_size,
134 cudaStream_t stream = 0
135 );
136
142 const std::vector<InputSpec>& inputs,
143 void** d_output,
144 size_t* output_size,
145 cudaStream_t stream = 0
146 );
147
152 void setInputSizeHint(Stage* source, size_t size) {
153 per_source_hints_[source] = size;
154 }
155
173 const void* d_input,
174 size_t input_size,
175 void** d_output,
176 size_t* output_size,
177 cudaStream_t stream = 0
178 );
179
185 std::vector<std::pair<void*, size_t>> decompressMulti(
186 const void* d_input = nullptr,
187 size_t input_size = 0,
188 cudaStream_t stream = 0
189 );
190
192 void reset(cudaStream_t stream = 0);
193
194 // ── Profiling ─────────────────────────────────────────────────────────────
195
200 void enableProfiling(bool enable);
201 bool isProfilingEnabled() const { return profiling_enabled_; }
202
204 const PipelinePerfResult& getLastPerfResult() const { return last_perf_result_; }
205
207 CompressionDAG* getDAG() { return dag_.get(); }
208
210 size_t getPoolThreshold() const;
211
217 void enableBoundsCheck(bool enable) { dag_->enableBoundsCheck(enable); }
218 bool isBoundsCheckEnabled() const { return dag_->isBoundsCheckEnabled(); }
219
225 void setColoringEnabled(bool enable) { dag_->setColoringEnabled(enable); }
226 bool isColoringEnabled() const { return dag_->isColoringEnabled(); }
227 size_t getColorRegionCount() const { return dag_->getColorRegionCount(); }
228
229 // ── CUDA Graph Capture (compression-only) ─────────────────────────────────
230
238 void enableGraphMode(bool enable);
239 bool isGraphModeEnabled() const { return graph_mode_enabled_; }
240
251 void captureGraph(cudaStream_t stream = 0);
252 bool isGraphCaptured() const { return graph_captured_; }
253
254 size_t getPeakMemoryUsage() const;
255 size_t getCurrentMemoryUsage() const;
256 void printPipeline() const;
257
258 // ── File Serialization ────────────────────────────────────────────────────
259
262 FZMHeaderCore core;
263 std::vector<FZMStageInfo> stages;
264 std::vector<FZMBufferEntry> buffers;
265 };
266
268 void writeToFile(const std::string& filename, cudaStream_t stream = 0);
269
271 static FZMFileHeader readHeader(const std::string& filename);
272
275
289 const std::string& filename,
290 void** d_output,
291 size_t* output_size,
292 cudaStream_t stream = 0,
293 PipelinePerfResult* perf_out = nullptr,
294 size_t pool_override_bytes = 0
295 );
296
297private:
298 // ── Internal helpers ──────────────────────────────────────────────────────
299
300 Stage* addRawStage(Stage* stage);
301
302 struct OutputBuffer {
303 void* d_ptr;
304 size_t actual_size;
305 size_t allocated_size;
306 std::string name;
307 int buffer_id;
308 };
309 std::vector<OutputBuffer> getOutputBuffers() const;
310
311 static void* loadCompressedData(
312 const std::string& filename,
313 const FZMFileHeader& header,
314 cudaStream_t stream = 0,
315 MemoryPool* pool = nullptr
316 );
317
318 void validate();
319 std::pair<std::vector<Stage*>, std::vector<Stage*>> identifyTopology();
320 void setupInputBuffers(const std::vector<Stage*>& sources);
321 int autoDetectUnconnectedOutputs();
322 void detectMultiOutputScenario(int pipeline_outputs);
323 void configureStreamsIfNeeded();
324
330 void propagateBufferSizes(bool force_from_current_inputs = false);
331
332 std::vector<Stage*> getSourceStages() const;
333 std::vector<Stage*> getSinkStages() const;
334
335 // ── Inverse DAG helpers ───────────────────────────────────────────────────
336
338 struct FwdStageDesc {
339 Stage* stage;
340 std::vector<int> output_buf_ids;
341 std::vector<int> input_buf_ids;
342 };
343
345 using PipelineOutputMap = std::unordered_map<int, std::pair<void*, size_t>>;
346
352 static std::pair<std::unique_ptr<CompressionDAG>,
353 std::unordered_map<Stage*, int>>
354 buildInverseDAG(
355 const std::vector<FwdStageDesc>& fwd_stages,
356 const PipelineOutputMap& pipeline_outputs,
357 MemoryPool* pool,
358 MemoryStrategy strategy,
359 const std::unordered_map<Stage*, size_t>& source_sizes,
360 bool enable_profiling
361 );
362
363 // ── Concat helpers ────────────────────────────────────────────────────────
364
365 struct OutputBufferInfo {
366 int buffer_id;
367 void* d_ptr;
368 size_t actual_size;
369 std::string stage_name;
370 std::string output_name;
371 };
372
373 std::vector<OutputBufferInfo> collectOutputBuffers() const;
374
376 size_t calculateConcatSize(const std::vector<OutputBufferInfo>& outputs) const;
377
378 size_t writeConcatBuffer(
379 const std::vector<OutputBufferInfo>& outputs,
380 uint8_t* d_concat_bytes,
381 cudaStream_t stream
382 ) const;
383
384 void concatOutputs(void** d_output, size_t* output_size, cudaStream_t stream);
385
386 // ── Member variables ──────────────────────────────────────────────────────
387
388 std::unique_ptr<MemoryPool> mem_pool_;
389 std::unique_ptr<CompressionDAG> dag_;
390 MemoryStrategy strategy_;
391
392 std::vector<std::unique_ptr<Stage>> stages_;
393 std::unordered_map<Stage*, DAGNode*> stage_to_node_;
394
395 struct ConnectionInfo {
396 Stage* dependent;
397 Stage* producer;
398 std::string output_name;
399 int output_index;
400 };
401 std::vector<ConnectionInfo> connections_;
402
403 int num_streams_;
404 bool is_finalized_;
405 bool warmup_on_finalize_;
406 bool pool_managed_decomp_;
407
408 // is_compressed_: true after the first successful compress() (gates writeToFile).
409 // was_compressed_: true between compress() and the next reset() (gates captureGraph).
410 bool is_compressed_;
411 bool was_compressed_;
412
413 bool profiling_enabled_;
414 PipelinePerfResult last_perf_result_;
415
416 std::vector<DAGNode*> input_nodes_;
417 std::vector<DAGNode*> output_nodes_;
418 std::vector<int> input_buffer_ids_;
419 std::vector<int> output_buffer_ids_;
420
421 void* d_concat_buffer_;
422 size_t concat_buffer_capacity_;
423 bool needs_concat_;
424
425 // Pool-persistent decompress output buffers (one per source stage).
426 // Only used when pool_managed_decomp_ == true.
427 std::vector<void*> d_decomp_outputs_;
428
429 // Pinned host buffer for concat header. Allocated once at finalize() to
430 // avoid N+1 tiny H2D copies — CPU packs the header then issues one H2D copy.
431 void* h_concat_header_;
432 size_t h_concat_header_capacity_;
433
434 // Persistent pinned host + device descriptor buffers for the gather kernel.
435 // Sized at finalize() for max_outputs; reused every compress call.
436 void* h_copy_descs_;
437 void* d_copy_descs_;
438 size_t copy_descs_capacity_;
439
440 size_t input_size_;
441
442 // Per-source input sizes from the most recent compress(), ordered to match
443 // input_nodes_. Used by decompress() to size each inverse result buffer.
444 std::vector<size_t> source_input_sizes_;
445
446 // Input alignment in bytes — LCM of all stage getRequiredInputAlignment() values,
447 // computed at finalize(). compress() zero-pads to this boundary transparently.
448 size_t input_alignment_bytes_;
449 void* d_pad_buf_;
450 size_t d_pad_buf_size_;
451
452 // Original (pre-padding) input size. decompress() uses this to trim the
453 // reported output back to what the caller provided. 0 when no padding.
454 size_t original_input_size_;
455
456 size_t input_size_hint_;
457 float pool_multiplier_;
458
459 // Per-source size hints (set via setInputSizeHint()). Override input_size_hint_
460 // for the matching source during propagateBufferSizes().
461 std::unordered_map<Stage*, size_t> per_source_hints_;
462
463 // Dataset dimensions (x=fast, y, z). Used by convenience.h addLorenzo() to
464 // select 1-D/2-D/3-D automatically. Default {0,1,1} = 1-D, infer x from input.
465 std::array<size_t, 3> dims_;
466
475 struct InvDAGCache {
476 std::unique_ptr<CompressionDAG> inv_dag;
477 std::unordered_map<Stage*, int> inv_result_map;
478 std::unordered_map<int, int> fwd_to_inv_ext_buf;
479 std::unordered_map<Stage*, size_t> source_sizes;
480 };
481 std::unique_ptr<InvDAGCache> inv_cache_;
482
483 struct BufferMetadata {
484 int buffer_id;
485 size_t actual_size;
486 size_t allocated_size;
487 std::string name;
488 DAGNode* producer;
489 int output_index;
490 };
491 std::vector<BufferMetadata> buffer_metadata_;
492
493 bool graph_mode_enabled_;
494 bool graph_captured_;
495
496 // Fixed device input buffer whose address is baked into the captured graph.
497 // compress() copies user input here before cudaGraphLaunch().
498 void* d_graph_input_;
499 size_t d_graph_input_size_;
500
501 cudaGraph_t captured_graph_;
502 cudaGraphExec_t graph_exec_;
503};
504
505// ── Template implementation ───────────────────────────────────────────────────
506
507template<typename StageT, typename... Args>
508StageT* Pipeline::addStage(Args&&... args) {
509 if (is_finalized_) {
510 throw std::runtime_error("Cannot add stages after finalization");
511 }
512
513 auto stage_ptr = std::make_unique<StageT>(std::forward<Args>(args)...);
514 StageT* stage = stage_ptr.get();
515
516 DAGNode* node = dag_->addStage(stage, stage->getName());
517 size_t num_outputs = stage->getNumOutputs();
518 auto output_names = stage->getOutputNames();
519
520 // Pre-allocate all output slots as unconnected (size=1 placeholder).
521 // connect() will promote any that get wired to downstream stages.
522 for (size_t i = 0; i < num_outputs; i++) {
523 std::string out_name = i < output_names.size() ? output_names[i] : std::to_string(i);
524 dag_->addUnconnectedOutput(node, 1, i, stage->getName() + "." + out_name + "_unconnected");
525 }
526
527 stage_to_node_[stage] = node;
528 stages_.push_back(std::move(stage_ptr));
529 return stage;
530}
531
532} // namespace fz
Definition dag.h:88
Definition mempool.h:62
Definition compressor.h:31
void setDims(size_t x, size_t y=1, size_t z=1)
Definition compressor.h:59
static void decompressFromFile(const std::string &filename, void **d_output, size_t *output_size, cudaStream_t stream=0, PipelinePerfResult *perf_out=nullptr, size_t pool_override_bytes=0)
int connect(Stage *dependent, const std::vector< Stage * > &producers)
void setPoolManagedDecompOutput(bool enable)
Definition compressor.h:108
size_t getPoolThreshold() const
void warmup(cudaStream_t stream=0)
void enableBoundsCheck(bool enable)
Definition compressor.h:217
void setWarmupOnFinalize(bool enable)
Definition compressor.h:99
CompressionDAG * getDAG()
Definition compressor.h:207
std::vector< std::pair< void *, size_t > > decompressMulti(const void *d_input=nullptr, size_t input_size=0, cudaStream_t stream=0)
StageT * addStage(Args &&... args)
Definition compressor.h:508
void compress(const std::vector< InputSpec > &inputs, void **d_output, size_t *output_size, cudaStream_t stream=0)
const PipelinePerfResult & getLastPerfResult() const
Definition compressor.h:204
void writeToFile(const std::string &filename, cudaStream_t stream=0)
int connect(Stage *dependent, Stage *producer, const std::string &output_name="output")
Pipeline(size_t input_data_size=0, MemoryStrategy strategy=MemoryStrategy::MINIMAL, float pool_multiplier=3.0f)
void setMemoryStrategy(MemoryStrategy strategy)
static FZMFileHeader readHeader(const std::string &filename)
void setColoringEnabled(bool enable)
Definition compressor.h:225
void finalize()
void enableGraphMode(bool enable)
void enableProfiling(bool enable)
FZMFileHeader buildHeader() const
void setInputSizeHint(Stage *source, size_t size)
Definition compressor.h:152
void setNumStreams(int num_streams)
void captureGraph(cudaStream_t stream=0)
void reset(cudaStream_t stream=0)
void decompress(const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
void compress(const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
Definition stage.h:28
FZM binary file format definitions — structs, enums, and helpers.
Stream-ordered CUDA memory pool for pipeline buffer management.
Definition fzm_format.h:25
MemoryStrategy
Definition dag.h:19
@ MINIMAL
Allocate on-demand, free at last consumer. Lowest peak memory.
Pipeline and per-stage profiling result types.
Factory function for reconstructing pipeline stages from serialized FZM headers.
Definition dag.h:48
Fixed-size FZM file header core (80 bytes).
Definition fzm_format.h:225
Definition perf.h:70
Definition compressor.h:261
Definition compressor.h:119