FZGPUModules 2.0
GPU-accelerated modular compression pipelines
Loading...
Searching...
No Matches
compressor.h
Go to the documentation of this file.
1
5#pragma once
6
7#include "pipeline/dag.h"
8#include "pipeline/perf.h"
9#include "pipeline/config.h"
10#include "stage/stage.h"
11#include "stage/stage_factory.h"
12#include "mem/mempool.h"
13#include "fzm_format.h"
14
15#include <array>
16#include <memory>
17#include <stdexcept>
18#include <unordered_map>
19
20namespace fz {
21
34class Pipeline {
35public:
41 explicit Pipeline(
42 size_t input_data_size = 0,
43 MemoryStrategy strategy = MemoryStrategy::MINIMAL,
44 float pool_multiplier = 3.0f
45 );
46
54 explicit Pipeline(const std::string& config_path);
55
56 ~Pipeline();
57
58 // ── Configuration ─────────────────────────────────────────────────────────
59
62
64 void setNumStreams(int num_streams);
65
72 void setDims(size_t x, size_t y = 1, size_t z = 1) { dims_ = {x, y, z}; }
73 void setDims(std::array<size_t, 3> dims) { dims_ = dims; }
74 std::array<size_t, 3> getDims() const { return dims_; }
75
76 // ── Builder API ───────────────────────────────────────────────────────────
77
82 template<typename StageT, typename... Args>
83 StageT* addStage(Args&&... args);
84
92 int connect(Stage* dependent, Stage* producer, const std::string& output_name = "output");
93
95 int connect(Stage* dependent, const std::vector<Stage*>& producers);
96
102 void finalize();
103
109 void warmup(cudaStream_t stream = 0);
110
112 void setWarmupOnFinalize(bool enable) { warmup_on_finalize_ = enable; }
113 bool isWarmupOnFinalizeEnabled() const { return warmup_on_finalize_; }
114
121 void setPoolManagedDecompOutput(bool enable) { pool_managed_decomp_ = enable; }
122 bool isPoolManagedDecompOutput() const { return pool_managed_decomp_; }
123
137 size_t getMaxCompressedSize(size_t input_bytes) const;
138
158 size_t getLastUncompressedSize() const {
159 return original_input_size_ > 0 ? original_input_size_ : input_size_;
160 }
161
162 // ── Execution ─────────────────────────────────────────────────────────────
163
176 const void* d_input,
177 size_t input_size,
178 void** d_output,
179 size_t* output_size,
180 cudaStream_t stream = 0
181 );
182
210 const void* d_input,
211 size_t input_size,
212 void* d_output_buf,
213 size_t output_buf_capacity,
214 size_t* actual_output_size,
215 cudaStream_t stream = 0
216 );
217
233 const void* d_input,
234 size_t input_size,
235 void** d_output,
236 size_t* output_size,
237 cudaStream_t stream = 0
238 );
239
261 const void* d_input,
262 size_t input_size,
263 void* d_output_buf,
264 size_t output_buf_capacity,
265 size_t* actual_output_size,
266 cudaStream_t stream = 0
267 );
268
270 void reset(cudaStream_t stream = 0);
271
272 // ── Profiling ─────────────────────────────────────────────────────────────
273
278 void enableProfiling(bool enable);
279 bool isProfilingEnabled() const { return profiling_enabled_; }
280
282 const PipelinePerfResult& getLastPerfResult() const { return last_perf_result_; }
283
285 CompressionDAG* getDAG() { return dag_.get(); }
286
288 size_t getPoolThreshold() const;
289
300
306 void enableBoundsCheck(bool enable) { dag_->enableBoundsCheck(enable); }
307 bool isBoundsCheckEnabled() const { return dag_->isBoundsCheckEnabled(); }
308
314 void setColoringEnabled(bool enable) { dag_->setColoringEnabled(enable); }
315 bool isColoringEnabled() const { return dag_->isColoringEnabled(); }
316 size_t getColorRegionCount() const { return dag_->getColorRegionCount(); }
317
318 // ── CUDA Graph Capture (compression-only) ─────────────────────────────────
319
327 void enableGraphMode(bool enable);
328 bool isGraphModeEnabled() const { return graph_mode_enabled_; }
329
340 void captureGraph(cudaStream_t stream = 0);
341 bool isGraphCaptured() const { return graph_captured_; }
342
343 size_t getPeakMemoryUsage() const;
344 size_t getCurrentMemoryUsage() const;
345 void printPipeline() const;
346
347 // ── File Serialization ────────────────────────────────────────────────────
348
351 FZMHeaderCore core;
352 std::vector<FZMStageInfo> stages;
353 std::vector<FZMBufferEntry> buffers;
354 };
355
357 void writeToFile(const std::string& filename, cudaStream_t stream = 0);
358
360 static FZMFileHeader readHeader(const std::string& filename);
361
364
380 const std::string& filename,
381 void** d_output,
382 size_t* output_size,
383 cudaStream_t stream = 0,
384 PipelinePerfResult* perf_out = nullptr,
385 size_t pool_override_bytes = 0
386 );
387
406 const std::string& filename,
407 void** d_output,
408 size_t* output_size,
409 cudaStream_t stream = 0,
410 PipelinePerfResult* perf_out = nullptr
411 );
412
413 // ── Config File ───────────────────────────────────────────────────────────
414
428 void loadConfig(const std::string& path);
429
438 void saveConfig(const std::string& path) const;
439
440private:
441 // ── RAII buffer wrappers (private implementation detail) ─────────────────
442
443 // Pool-allocated persistent device buffer.
444 struct PoolBuffer {
445 void* ptr = nullptr;
446 size_t capacity = 0;
447 MemoryPool* pool = nullptr;
448
449 ~PoolBuffer() { free(0); }
450 PoolBuffer() = default;
451 PoolBuffer(const PoolBuffer&) = delete;
452 PoolBuffer& operator=(const PoolBuffer&) = delete;
453
454 void free(cudaStream_t s) {
455 if (ptr && pool) { pool->free(ptr, s); ptr = nullptr; capacity = 0; }
456 }
457 bool allocate(MemoryPool* p, size_t bytes, cudaStream_t s,
458 const char* tag, bool persistent = false) {
459 free(s);
460 pool = p;
461 ptr = pool->allocate(bytes, s, tag, persistent);
462 if (ptr) capacity = bytes;
463 return ptr != nullptr;
464 }
465 };
466
467 // cudaHostAlloc pinned host buffer — grows on demand, never shrinks.
468 struct PinnedBuffer {
469 void* ptr = nullptr;
470 size_t capacity = 0;
471
472 ~PinnedBuffer() { if (ptr) cudaFreeHost(ptr); }
473 PinnedBuffer() = default;
474 PinnedBuffer(const PinnedBuffer&) = delete;
475 PinnedBuffer& operator=(const PinnedBuffer&) = delete;
476
477 // Returns false on CUDA allocation failure.
478 bool ensureCapacity(size_t bytes) {
479 if (capacity >= bytes) return true;
480 if (ptr) { cudaFreeHost(ptr); ptr = nullptr; capacity = 0; }
481 if (cudaHostAlloc(&ptr, bytes, cudaHostAllocDefault) != cudaSuccess) return false;
482 capacity = bytes;
483 return true;
484 }
485 };
486
487 // cudaMalloc device buffer — grows on demand, never shrinks.
488 struct DeviceBuffer {
489 void* ptr = nullptr;
490 size_t capacity = 0;
491
492 ~DeviceBuffer() { if (ptr) cudaFree(ptr); }
493 DeviceBuffer() = default;
494 DeviceBuffer(const DeviceBuffer&) = delete;
495 DeviceBuffer& operator=(const DeviceBuffer&) = delete;
496
497 // Returns false on CUDA allocation failure.
498 bool ensureCapacity(size_t bytes) {
499 if (capacity >= bytes) return true;
500 if (ptr) { cudaFree(ptr); ptr = nullptr; capacity = 0; }
501 if (cudaMalloc(&ptr, bytes) != cudaSuccess) return false;
502 capacity = bytes;
503 return true;
504 }
505 };
506
507 // ── Internal helpers ──────────────────────────────────────────────────────
508
509 Stage* addRawStage(Stage* stage);
510
511 struct OutputBuffer {
512 void* d_ptr;
513 size_t actual_size;
514 size_t allocated_size;
515 std::string name;
516 int buffer_id;
517 };
518 std::vector<OutputBuffer> getOutputBuffers() const;
519
520 static void* loadCompressedData(
521 const std::string& filename,
522 const FZMFileHeader& header,
523 cudaStream_t stream = 0,
524 MemoryPool* pool = nullptr
525 );
526
527 void validate();
528 std::pair<std::vector<Stage*>, std::vector<Stage*>> identifyTopology();
529 void setupInputBuffers(const std::vector<Stage*>& sources);
530 int autoDetectUnconnectedOutputs();
531 void detectMultiOutputScenario(int pipeline_outputs);
532 void configureStreamsIfNeeded();
533
534 // finalize() sub-steps
535 void typeCheckConnections();
536 void computeInputAlignment();
537 void refinePoolSize();
538 void setupGraphModeInput();
539 void preallocatePadBuffer();
540 void preallocateConcatBuffers();
541
542 // compress() helper: handles graph-mode copy or alignment padding.
543 // Returns the effective source pointer and padded source size.
544 std::pair<const void*, size_t> prepareInputSource(
545 const void* d_input, size_t input_size, cudaStream_t stream);
546
552 void propagateBufferSizes(bool force_from_current_inputs = false);
553
554 std::vector<Stage*> getSourceStages() const;
555 std::vector<Stage*> getSinkStages() const;
556
557 // ── Inverse DAG helpers ───────────────────────────────────────────────────
558
560 struct FwdStageDesc {
561 Stage* stage;
562 std::vector<int> output_buf_ids;
563 std::vector<int> input_buf_ids;
564 };
565
567 using PipelineOutputMap = std::unordered_map<int, std::pair<void*, size_t>>;
568
569 // decompress() helper: builds or reuses the inverse DAG cache.
570 void buildOrReuseInvCache(
571 const PipelineOutputMap& po_map,
572 Stage* src_stage,
573 size_t src_sz,
574 cudaStream_t stream);
575
576 // decompressFromFile() helpers.
577 static size_t computeFilePoolSize(const FZMFileHeader& fh, size_t pool_override_bytes);
578 static std::pair<std::vector<std::unique_ptr<Stage>>, std::vector<FwdStageDesc>>
579 reconstructForwardTopology(const FZMFileHeader& fh);
580 static std::unordered_map<Stage*, size_t> buildSourceSizesFromHeader(
581 const FZMFileHeader& fh, const std::vector<FwdStageDesc>& fwd_topology);
582
588 static std::pair<std::unique_ptr<CompressionDAG>,
589 std::unordered_map<Stage*, int>>
590 buildInverseDAG(
591 const std::vector<FwdStageDesc>& fwd_stages,
592 const PipelineOutputMap& pipeline_outputs,
593 MemoryPool* pool,
594 MemoryStrategy strategy,
595 const std::unordered_map<Stage*, size_t>& source_sizes,
596 bool enable_profiling
597 );
598
599 // ── Concat helpers ────────────────────────────────────────────────────────
600
601 struct OutputBufferInfo {
602 int buffer_id;
603 void* d_ptr;
604 size_t actual_size;
605 std::string stage_name;
606 std::string output_name;
607 };
608
609 std::vector<OutputBufferInfo> collectOutputBuffers() const;
610
612 size_t calculateConcatSize(const std::vector<OutputBufferInfo>& outputs) const;
613
614 size_t writeConcatBuffer(
615 const std::vector<OutputBufferInfo>& outputs,
616 uint8_t* d_concat_bytes,
617 cudaStream_t stream
618 ) const;
619
620 void concatOutputs(void** d_output, size_t* output_size, cudaStream_t stream);
621
622 // ── Member variables ──────────────────────────────────────────────────────
623
624 std::unique_ptr<MemoryPool> mem_pool_;
625 std::unique_ptr<CompressionDAG> dag_;
626 MemoryStrategy strategy_;
627
628 std::vector<std::unique_ptr<Stage>> stages_;
629 std::unordered_map<Stage*, DAGNode*> stage_to_node_;
630
631 struct ConnectionInfo {
632 Stage* dependent;
633 Stage* producer;
634 std::string output_name;
635 int output_index;
636 };
637 std::vector<ConnectionInfo> connections_;
638
639 int num_streams_;
640 bool is_finalized_;
641 bool warmup_on_finalize_;
642 bool pool_managed_decomp_;
643
644 // is_compressed_: true after the first successful compress() (gates writeToFile).
645 // was_compressed_: true between compress() and the next reset() (gates captureGraph).
646 bool is_compressed_;
647 bool was_compressed_;
648
649 bool profiling_enabled_;
650 PipelinePerfResult last_perf_result_;
651
652 std::vector<DAGNode*> input_nodes_;
653 std::vector<DAGNode*> output_nodes_;
654 std::vector<int> input_buffer_ids_;
655 std::vector<int> output_buffer_ids_;
656
657 PoolBuffer d_concat_buffer_;
658 bool needs_concat_;
659
660 // Pool-persistent decompress output buffers (one per source stage).
661 // Only used when pool_managed_decomp_ == true.
662 std::vector<void*> d_decomp_outputs_;
663
664 // Pinned host buffer for concat header (one H2D copy instead of N).
665 PinnedBuffer h_concat_header_;
666 // Persistent pinned host + device descriptor buffers for the gather kernel.
667 PinnedBuffer h_copy_descs_;
668 DeviceBuffer d_copy_descs_;
669
670 size_t input_size_;
671
672 // Per-source input sizes from the most recent compress(), ordered to match
673 // input_nodes_. Used by decompress() to size each inverse result buffer.
674 std::vector<size_t> source_input_sizes_;
675
676 // Input alignment in bytes — LCM of all stage getRequiredInputAlignment() values.
677 // compress() zero-pads to this boundary transparently.
678 size_t input_alignment_bytes_;
679 PoolBuffer d_pad_buf_;
680
681 // Original (pre-padding) input size. decompress() uses this to trim the
682 // reported output back to what the caller provided. 0 when no padding.
683 size_t original_input_size_;
684
685 size_t input_size_hint_;
686 float pool_multiplier_;
687
688 // Dataset dimensions (x=fast, y, z). Pushed to each stage on addStage() and
689 // again at finalize(). Default {0,1,1} = 1-D, infer x from input size.
690 std::array<size_t, 3> dims_;
691
700 struct InvDAGCache {
701 std::unique_ptr<CompressionDAG> inv_dag;
702 std::unordered_map<Stage*, int> inv_result_map;
703 std::unordered_map<int, int> fwd_to_inv_ext_buf;
704 std::unordered_map<Stage*, size_t> source_sizes;
705 };
706 std::unique_ptr<InvDAGCache> inv_cache_;
707
708 struct BufferMetadata {
709 int buffer_id;
710 size_t actual_size;
711 size_t allocated_size;
712 std::string name;
713 DAGNode* producer;
714 int output_index;
715 };
716 std::vector<BufferMetadata> buffer_metadata_;
717
718 bool graph_mode_enabled_;
719 bool graph_captured_;
720
721 // Fixed device input buffer whose address is baked into the captured graph.
722 // compress() copies user input here before cudaGraphLaunch().
723 PoolBuffer d_graph_input_;
724 size_t d_graph_input_size_;
725
726 cudaGraph_t captured_graph_;
727 cudaGraphExec_t graph_exec_;
728};
729
730// ── Template implementation ───────────────────────────────────────────────────
731
732template<typename StageT, typename... Args>
733StageT* Pipeline::addStage(Args&&... args) {
734 if (is_finalized_) {
735 throw std::runtime_error("Cannot add stages after finalization");
736 }
737
738 auto stage_ptr = std::make_unique<StageT>(std::forward<Args>(args)...);
739 StageT* stage = stage_ptr.get();
740
741 stage->setDims(dims_);
742
743 DAGNode* node = dag_->addStage(stage, stage->getName());
744 size_t num_outputs = stage->getNumOutputs();
745 auto output_names = stage->getOutputNames();
746
747 // Pre-allocate all output slots as unconnected (size=1 placeholder).
748 // connect() will promote any that get wired to downstream stages.
749 for (size_t i = 0; i < num_outputs; i++) {
750 std::string out_name = i < output_names.size() ? output_names[i] : std::to_string(i);
751 dag_->addUnconnectedOutput(node, 1, i, stage->getName() + "." + out_name + "_unconnected");
752 }
753
754 stage_to_node_[stage] = node;
755 stages_.push_back(std::move(stage_ptr));
756 return stage;
757}
758
759} // namespace fz
Definition dag.h:92
Definition mempool.h:66
void free(void *ptr, cudaStream_t stream)
void * allocate(size_t size, cudaStream_t stream, const std::string &tag="", bool persistent=false)
Definition compressor.h:34
void setDims(size_t x, size_t y=1, size_t z=1)
Definition compressor.h:72
void decompressFromFileInstance(const std::string &filename, void **d_output, size_t *output_size, cudaStream_t stream=0, PipelinePerfResult *perf_out=nullptr)
static void decompressFromFile(const std::string &filename, void **d_output, size_t *output_size, cudaStream_t stream=0, PipelinePerfResult *perf_out=nullptr, size_t pool_override_bytes=0)
int connect(Stage *dependent, const std::vector< Stage * > &producers)
void setPoolManagedDecompOutput(bool enable)
Definition compressor.h:121
size_t getPoolThreshold() const
void warmup(cudaStream_t stream=0)
void enableBoundsCheck(bool enable)
Definition compressor.h:306
bool isMemPoolFallbackMode() const
void saveConfig(const std::string &path) const
size_t getLastUncompressedSize() const
Definition compressor.h:158
void loadConfig(const std::string &path)
void setWarmupOnFinalize(bool enable)
Definition compressor.h:112
void compress(const void *d_input, size_t input_size, void *d_output_buf, size_t output_buf_capacity, size_t *actual_output_size, cudaStream_t stream=0)
CompressionDAG * getDAG()
Definition compressor.h:285
StageT * addStage(Args &&... args)
Definition compressor.h:733
void decompress(const void *d_input, size_t input_size, void *d_output_buf, size_t output_buf_capacity, size_t *actual_output_size, cudaStream_t stream=0)
const PipelinePerfResult & getLastPerfResult() const
Definition compressor.h:282
Pipeline(const std::string &config_path)
void writeToFile(const std::string &filename, cudaStream_t stream=0)
int connect(Stage *dependent, Stage *producer, const std::string &output_name="output")
Pipeline(size_t input_data_size=0, MemoryStrategy strategy=MemoryStrategy::MINIMAL, float pool_multiplier=3.0f)
void setMemoryStrategy(MemoryStrategy strategy)
static FZMFileHeader readHeader(const std::string &filename)
void setColoringEnabled(bool enable)
Definition compressor.h:314
size_t getMaxCompressedSize(size_t input_bytes) const
void finalize()
void enableGraphMode(bool enable)
void enableProfiling(bool enable)
FZMFileHeader buildHeader() const
void setNumStreams(int num_streams)
void captureGraph(cudaStream_t stream=0)
void reset(cudaStream_t stream=0)
void decompress(const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
void compress(const void *d_input, size_t input_size, void **d_output, size_t *output_size, cudaStream_t stream=0)
Definition stage.h:30
TOML-based pipeline configuration file support.
Compression DAG wiring, execution, and memory strategy types.
MemoryStrategy
Definition dag.h:23
FZM binary file format definitions — structs, enums, and helpers.
Stream-ordered CUDA memory pool for pipeline buffer management.
Pipeline and per-stage profiling result types.
Base class interface for all compression stages.
Factory function for reconstructing pipeline stages from serialized FZM headers.
Definition dag.h:52
Fixed-size FZM file header core (80 bytes).
Definition fzm_format.h:217
Definition perf.h:70
Definition compressor.h:350