FZGPUModules 2.0
GPU-accelerated modular compression pipelines
Loading...
Searching...
No Matches
compressor.h
Go to the documentation of this file.
1
5#pragma once
6
7#include "pipeline/dag.h"
8#include "pipeline/perf.h"
9#include "pipeline/config.h"
10#include "stage/stage.h"
11#include "stage/stage_factory.h"
12#include "mem/mempool.h"
13#include "fzm_format.h"
14
15#include <array>
16#include <memory>
17#include <stdexcept>
18#include <unordered_map>
19
20namespace fz {
21
34class Pipeline {
35public:
41 explicit Pipeline(
42 size_t input_data_size = 0,
44 float pool_multiplier = 3.0f
45 );
46
54 explicit Pipeline(const std::string& config_path);
55
56 ~Pipeline();
57
58 // ── Configuration ─────────────────────────────────────────────────────────
59
62
64 void setNumStreams(int num_streams);
65
72 void setDims(size_t x, size_t y = 1, size_t z = 1) { dims_ = {x, y, z}; }
73 void setDims(std::array<size_t, 3> dims) { dims_ = dims; }
74 std::array<size_t, 3> getDims() const { return dims_; }
75
76 // ── Builder API ───────────────────────────────────────────────────────────
77
82 template<typename StageT, typename... Args>
83 StageT* addStage(Args&&... args);
84
92 int connect(Stage* dependent, Stage* producer, const std::string& output_name = "output");
93
95 int connect(Stage* dependent, const std::vector<Stage*>& producers);
96
102 void finalize();
103
109 void warmup(cudaStream_t stream = 0);
110
112 void setWarmupOnFinalize(bool enable) { warmup_on_finalize_ = enable; }
113 bool isWarmupOnFinalizeEnabled() const { return warmup_on_finalize_; }
114
121 void setPoolManagedDecompOutput(bool enable) { pool_managed_decomp_ = enable; }
122 bool isPoolManagedDecompOutput() const { return pool_managed_decomp_; }
123
137 size_t getMaxCompressedSize(size_t input_bytes) const;
138
158 size_t getLastUncompressedSize() const {
159 return original_input_size_ > 0 ? original_input_size_ : input_size_;
160 }
161
162 // ── Execution ─────────────────────────────────────────────────────────────
163
176 const void* d_input,
177 size_t input_size,
178 void** d_output,
179 size_t* output_size,
180 cudaStream_t stream = 0
181 );
182
210 const void* d_input,
211 size_t input_size,
212 void* d_output_buf,
213 size_t output_buf_capacity,
214 size_t* actual_output_size,
215 cudaStream_t stream = 0
216 );
217
233 const void* d_input,
234 size_t input_size,
235 void** d_output,
236 size_t* output_size,
237 cudaStream_t stream = 0
238 );
239
261 const void* d_input,
262 size_t input_size,
263 void* d_output_buf,
264 size_t output_buf_capacity,
265 size_t* actual_output_size,
266 cudaStream_t stream = 0
267 );
268
270 void reset(cudaStream_t stream = 0);
271
272 // ── Profiling ─────────────────────────────────────────────────────────────
273
278 void enableProfiling(bool enable);
279 bool isProfilingEnabled() const { return profiling_enabled_; }
280
282 const PipelinePerfResult& getLastPerfResult() const { return last_perf_result_; }
283
285 CompressionDAG* getDAG() { return dag_.get(); }
286
288 size_t getPoolThreshold() const;
289
300
306 void enableBoundsCheck(bool enable) { dag_->enableBoundsCheck(enable); }
307 bool isBoundsCheckEnabled() const { return dag_->isBoundsCheckEnabled(); }
308
314 void setColoringEnabled(bool enable) { dag_->setColoringEnabled(enable); }
315 bool isColoringEnabled() const { return dag_->isColoringEnabled(); }
316 size_t getColorRegionCount() const { return dag_->getColorRegionCount(); }
317
318 // ── CUDA Graph Capture (compression-only) ─────────────────────────────────
319
327 void enableGraphMode(bool enable);
328 bool isGraphModeEnabled() const { return graph_mode_enabled_; }
329
340 void captureGraph(cudaStream_t stream = 0);
341 bool isGraphCaptured() const { return graph_captured_; }
342
343 size_t getPeakMemoryUsage() const;
344 size_t getCurrentMemoryUsage() const;
345 void printPipeline() const;
346
347 // ── File Serialization ────────────────────────────────────────────────────
348
351 FZMHeaderCore core;
352 std::vector<FZMStageInfo> stages;
353 std::vector<FZMBufferEntry> buffers;
354 };
355
357 void writeToFile(const std::string& filename, cudaStream_t stream = 0);
358
360 static FZMFileHeader readHeader(const std::string& filename);
361
364
380 const std::string& filename,
381 void** d_output,
382 size_t* output_size,
383 cudaStream_t stream = 0,
384 PipelinePerfResult* perf_out = nullptr,
385 size_t pool_override_bytes = 0
386 );
387
406 const std::string& filename,
407 void** d_output,
408 size_t* output_size,
409 cudaStream_t stream = 0,
410 PipelinePerfResult* perf_out = nullptr
411 );
412
413 // ── Config File ───────────────────────────────────────────────────────────
414
428 void loadConfig(const std::string& path);
429
438 void saveConfig(const std::string& path) const;
439
440private:
441 // ── RAII buffer wrappers (private implementation detail) ─────────────────
442
443 // Pool-allocated persistent device buffer.
444 struct PoolBuffer {
445 void* ptr = nullptr;
446 size_t capacity = 0;
447 MemoryPool* pool = nullptr;
448
449 ~PoolBuffer() { free(0); }
450 PoolBuffer() = default;
451 PoolBuffer(const PoolBuffer&) = delete;
452 PoolBuffer& operator=(const PoolBuffer&) = delete;
453
454 void free(cudaStream_t s) {
455 if (ptr && pool) { pool->free(ptr, s); ptr = nullptr; capacity = 0; }
456 }
457 bool allocate(MemoryPool* p, size_t bytes, cudaStream_t s,
458 const char* tag, bool persistent = false) {
459 free(s);
460 pool = p;
461 ptr = pool->allocate(bytes, s, tag, persistent);
462 if (ptr) capacity = bytes;
463 return ptr != nullptr;
464 }
465 };
466
467 // cudaHostAlloc pinned host buffer — grows on demand, never shrinks.
468 struct PinnedBuffer {
469 void* ptr = nullptr;
470 size_t capacity = 0;
471
472 ~PinnedBuffer() { if (ptr) cudaFreeHost(ptr); }
473 PinnedBuffer() = default;
474 PinnedBuffer(const PinnedBuffer&) = delete;
475 PinnedBuffer& operator=(const PinnedBuffer&) = delete;
476
477 // Returns false on CUDA allocation failure.
478 bool ensureCapacity(size_t bytes) {
479 if (capacity >= bytes) return true;
480 if (ptr) { cudaFreeHost(ptr); ptr = nullptr; capacity = 0; }
481 if (cudaHostAlloc(&ptr, bytes, cudaHostAllocDefault) != cudaSuccess) return false;
482 capacity = bytes;
483 return true;
484 }
485 };
486
487 // cudaMalloc device buffer — grows on demand, never shrinks.
488 struct DeviceBuffer {
489 void* ptr = nullptr;
490 size_t capacity = 0;
491
492 ~DeviceBuffer() { if (ptr) cudaFree(ptr); }
493 DeviceBuffer() = default;
494 DeviceBuffer(const DeviceBuffer&) = delete;
495 DeviceBuffer& operator=(const DeviceBuffer&) = delete;
496
497 // Returns false on CUDA allocation failure.
498 bool ensureCapacity(size_t bytes) {
499 if (capacity >= bytes) return true;
500 if (ptr) { cudaFree(ptr); ptr = nullptr; capacity = 0; }
501 if (cudaMalloc(&ptr, bytes) != cudaSuccess) return false;
502 capacity = bytes;
503 return true;
504 }
505 };
506
507 // ── Internal helpers ──────────────────────────────────────────────────────
508
509 Stage* addRawStage(Stage* stage);
510
511 struct OutputBuffer {
512 void* d_ptr;
513 size_t actual_size;
514 size_t allocated_size;
515 std::string name;
516 int buffer_id;
517 };
518 std::vector<OutputBuffer> getOutputBuffers() const;
519
520 static void* loadCompressedData(
521 const std::string& filename,
522 const FZMFileHeader& header,
523 cudaStream_t stream = 0,
524 MemoryPool* pool = nullptr
525 );
526
527 void validate();
528 std::pair<std::vector<Stage*>, std::vector<Stage*>> identifyTopology();
529 void setupInputBuffers(const std::vector<Stage*>& sources);
530 int autoDetectUnconnectedOutputs();
531 void detectMultiOutputScenario(int pipeline_outputs);
532 void configureStreamsIfNeeded();
533
534 // finalize() sub-steps
535 void typeCheckConnections();
536 void computeInputAlignment();
537 void notifyStagesFinalizeHooks();
538 void refinePoolSize();
539 void setupGraphModeInput();
540 void preallocatePadBuffer();
541 void preallocateConcatBuffers();
542
543 // compress() helper: handles graph-mode copy or alignment padding.
544 // Returns the effective source pointer and padded source size.
545 std::pair<const void*, size_t> prepareInputSource(
546 const void* d_input, size_t input_size, cudaStream_t stream);
547
553 void propagateBufferSizes(bool force_from_current_inputs = false);
554
555 std::vector<Stage*> getSourceStages() const;
556 std::vector<Stage*> getSinkStages() const;
557
558 // ── Inverse DAG helpers ───────────────────────────────────────────────────
559
561 struct FwdStageDesc {
562 Stage* stage;
563 std::vector<int> output_buf_ids;
564 std::vector<int> input_buf_ids;
565 };
566
568 using PipelineOutputMap = std::unordered_map<int, std::pair<void*, size_t>>;
569
570 // decompress() helper: builds or reuses the inverse DAG cache.
571 void buildOrReuseInvCache(
572 const PipelineOutputMap& po_map,
573 Stage* src_stage,
574 size_t src_sz,
575 cudaStream_t stream);
576
577 // decompressFromFile() helpers.
578 static size_t computeFilePoolSize(const FZMFileHeader& fh, size_t pool_override_bytes);
579 static std::pair<std::vector<std::unique_ptr<Stage>>, std::vector<FwdStageDesc>>
580 reconstructForwardTopology(const FZMFileHeader& fh);
581 static std::unordered_map<Stage*, size_t> buildSourceSizesFromHeader(
582 const FZMFileHeader& fh, const std::vector<FwdStageDesc>& fwd_topology);
583
589 static std::pair<std::unique_ptr<CompressionDAG>,
590 std::unordered_map<Stage*, int>>
591 buildInverseDAG(
592 const std::vector<FwdStageDesc>& fwd_stages,
593 const PipelineOutputMap& pipeline_outputs,
594 MemoryPool* pool,
595 MemoryStrategy strategy,
596 const std::unordered_map<Stage*, size_t>& source_sizes,
597 bool enable_profiling
598 );
599
600 // ── Concat helpers ────────────────────────────────────────────────────────
601
602 struct OutputBufferInfo {
603 int buffer_id;
604 void* d_ptr;
605 size_t actual_size;
606 std::string stage_name;
607 std::string output_name;
608 };
609
610 std::vector<OutputBufferInfo> collectOutputBuffers() const;
611
613 size_t calculateConcatSize(const std::vector<OutputBufferInfo>& outputs) const;
614
615 size_t writeConcatBuffer(
616 const std::vector<OutputBufferInfo>& outputs,
617 uint8_t* d_concat_bytes,
618 cudaStream_t stream
619 ) const;
620
621 void concatOutputs(void** d_output, size_t* output_size, cudaStream_t stream);
622
623 // ── Member variables ──────────────────────────────────────────────────────
624
625 std::unique_ptr<MemoryPool> mem_pool_;
626 std::unique_ptr<CompressionDAG> dag_;
627 MemoryStrategy strategy_;
628
629 std::vector<std::unique_ptr<Stage>> stages_;
630 std::unordered_map<Stage*, DAGNode*> stage_to_node_;
631
632 struct ConnectionInfo {
633 Stage* dependent;
634 Stage* producer;
635 std::string output_name;
636 int output_index;
637 };
638 std::vector<ConnectionInfo> connections_;
639
640 int num_streams_;
641 bool is_finalized_;
642 bool warmup_on_finalize_;
643 bool pool_managed_decomp_;
644
645 // is_compressed_: true after the first successful compress() (gates writeToFile).
646 // was_compressed_: true between compress() and the next reset() (gates captureGraph).
647 bool is_compressed_;
648 bool was_compressed_;
649
650 bool profiling_enabled_;
651 PipelinePerfResult last_perf_result_;
652
653 std::vector<DAGNode*> input_nodes_;
654 std::vector<DAGNode*> output_nodes_;
655 std::vector<int> input_buffer_ids_;
656 std::vector<int> output_buffer_ids_;
657
658 PoolBuffer d_concat_buffer_;
659 bool needs_concat_;
660
661 // Pool-persistent decompress output buffers (one per source stage).
662 // Only used when pool_managed_decomp_ == true.
663 std::vector<void*> d_decomp_outputs_;
664
665 // Pinned host buffer for concat header (one H2D copy instead of N).
666 PinnedBuffer h_concat_header_;
667 // Persistent pinned host + device descriptor buffers for the gather kernel.
668 PinnedBuffer h_copy_descs_;
669 DeviceBuffer d_copy_descs_;
670
671 size_t input_size_;
672
673 // Per-source input sizes from the most recent compress(), ordered to match
674 // input_nodes_. Used by decompress() to size each inverse result buffer.
675 std::vector<size_t> source_input_sizes_;
676
677 // Input alignment in bytes — LCM of all stage getRequiredInputAlignment() values.
678 // compress() zero-pads to this boundary transparently.
679 size_t input_alignment_bytes_;
680 PoolBuffer d_pad_buf_;
681
682 // Original (pre-padding) input size. decompress() uses this to trim the
683 // reported output back to what the caller provided. 0 when no padding.
684 size_t original_input_size_;
685
686 size_t input_size_hint_;
687 float pool_multiplier_;
688
689 // Dataset dimensions (x=fast, y, z). Pushed to each stage on addStage() and
690 // again at finalize(). Default {0,1,1} = 1-D, infer x from input size.
691 std::array<size_t, 3> dims_;
692
701 struct InvDAGCache {
702 std::unique_ptr<CompressionDAG> inv_dag;
703 std::unordered_map<Stage*, int> inv_result_map;
704 std::unordered_map<int, int> fwd_to_inv_ext_buf;
705 std::unordered_map<Stage*, size_t> source_sizes;
706 };
707 std::unique_ptr<InvDAGCache> inv_cache_;
708
709 struct BufferMetadata {
710 int buffer_id;
711 size_t actual_size;
712 size_t allocated_size;
713 std::string name;
714 DAGNode* producer;
715 int output_index;
716 };
717 std::vector<BufferMetadata> buffer_metadata_;
718
719 bool graph_mode_enabled_;
720 bool graph_captured_;
721
722 // Fixed device input buffer whose address is baked into the captured graph.
723 // compress() copies user input here before cudaGraphLaunch().
724 PoolBuffer d_graph_input_;
725 size_t d_graph_input_size_;
726
727 cudaGraph_t captured_graph_;
728 cudaGraphExec_t graph_exec_;
729};
730
731// ── Template implementation ───────────────────────────────────────────────────
732
733template<typename StageT, typename... Args>
734StageT* Pipeline::addStage(Args&&... args) {
735 if (is_finalized_) {
736 throw std::runtime_error("Cannot add stages after finalization");
737 }
738
739 auto stage_ptr = std::make_unique<StageT>(std::forward<Args>(args)...);
740 StageT* stage = stage_ptr.get();
741
742 stage->setDims(dims_);
743
744 DAGNode* node = dag_->addStage(stage, stage->getName());
745 size_t num_outputs = stage->getNumOutputs();
746 auto output_names = stage->getOutputNames();
747
748 // Pre-allocate all output slots as unconnected (size=1 placeholder).
749 // connect() will promote any that get wired to downstream stages.
750 for (size_t i = 0; i < num_outputs; i++) {
751 std::string out_name = i < output_names.size() ? output_names[i] : std::to_string(i);
752 dag_->addUnconnectedOutput(node, 1, i, stage->getName() + "." + out_name + "_unconnected");
753 }
754
755 stage_to_node_[stage] = node;
756 stages_.push_back(std::move(stage_ptr));
757 return stage;
758}
759
760} // namespace fz
Definition dag.h:92
Definition mempool.h:82
void free(void *ptr, cudaStream_t stream)
void * allocate(size_t size, cudaStream_t stream, const std::string &tag="", bool persistent=false)
Definition compressor.h:34
void setDims(size_t x, size_t y=1, size_t z=1)
Definition compressor.h:72
void decompressFromFileInstance(const std::string &filename, void **d_output, size_t *output_size, cudaStream_t stream=0, PipelinePerfResult *perf_out=nullptr)
static void decompressFromFile(const std::string &filename, void **d_output, size_t *output_size, cudaStream_t stream=0, PipelinePerfResult *perf_out=nullptr, size_t pool_override_bytes=0)
int connect(Stage *dependent, const std::vector< Stage * > &producers)
void setPoolManagedDecompOutput(bool enable)
Definition compressor.h:121
size_t getPoolThreshold() const
void warmup(cudaStream_t stream=0)
void enableBoundsCheck(bool enable)
Definition compressor.h:306
bool isMemPoolFallbackMode() const
void saveConfig(const std::string &path) const
size_t getLastUncompressedSize() const
Definition compressor.h:158
void loadConfig(const std::string &path)
void setWarmupOnFinalize(bool enable)
Definition compressor.h:112
void compress(const void *d_input, size_t input_size, void *d_output_buf, size_t output_buf_capacity, size_t *actual_output_size, cudaStream_t stream=0)
CompressionDAG * getDAG()
Definition compressor.h:285
StageT * addStage(Args &&... args)
Definition compressor.h:734
void decompress(const void *d_input, size_t input_size, void *d_output_buf, size_t output_buf_capacity, size_t *actual_output_size, cudaStream_t stream=0)
const PipelinePerfResult & getLastPerfResult() const
Definition compressor.h:282
Pipeline(const std::string &config_path)
void writeToFile(const std::string &filename, cudaStream_t stream=0)
int connect(Stage *dependent, Stage *producer, const std::string &output_name="output")
Pipeline(size_t input_data_size=0, MemoryStrategy strategy=MemoryStrategy::MINIMAL, float pool_multiplier=3.0f)
void setMemoryStrategy(MemoryStrategy strategy)
static FZMFileHeader readHeader(const std::string &filename)
void setColoringEnabled(bool enable)
Definition compressor.h:314
size_t getMaxCompressedSize(size_t input_bytes) const
void finalize()
void enableGraphMode(bool enable)
void enableProfiling(bool enable)
FZMFileHeader buildHeader() const
void setNumStreams(int num_streams)
void captureGraph(cudaStream_t stream=0)
void reset(cudaStream_t stream=0)
void decompress(const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
void compress(const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
Definition stage.h:30
TOML-based pipeline configuration file support.
Compression DAG wiring, execution, and memory strategy types.
FZM binary file format definitions — structs, enums, and helpers.
Stream-ordered CUDA memory pool for pipeline buffer management.
Definition fzm_format.h:25
MemoryStrategy
Definition dag.h:23
@ MINIMAL
Allocate on-demand, free at last consumer. Lowest peak memory.
Pipeline and per-stage profiling result types.
Base class interface for all compression stages.
Factory function for reconstructing pipeline stages from serialized FZM headers.
Definition dag.h:52
Fixed-size FZM file header core (80 bytes).
Definition fzm_format.h:219
Definition perf.h:70
Definition compressor.h:350