FZGPUModules 2.0
GPU-accelerated modular compression pipelines
Loading...
Searching...
No Matches
rze_stage.h
Go to the documentation of this file.
1#pragma once
2
25#include "stage/stage.h"
26#include "fzm_format.h"
27#include <cuda_runtime.h>
28#include <cstdint>
29#include <cstring>
30#include <stdexcept>
31#include <string>
32#include <unordered_map>
33#include <vector>
34
35namespace fz {
36
51class RZEStage : public Stage {
52public:
53 RZEStage()
54 : is_inverse_(false)
55 , chunk_size_(16384)
56 , levels_(4)
57 , actual_output_size_(0)
58 , cached_orig_bytes_(0)
59 // Persistent scratch buffers — allocated on first use, reused thereafter.
60 // Eliminates the blocking cudaMalloc/cudaFree pair that appeared in every
61 // execute() call and was clearly visible as the two cudaMalloc events in nsys.
62 , d_scratch_(nullptr)
63 , d_sizes_dev_(nullptr)
64 , d_clean_dev_(nullptr)
65 , d_dst_off_dev_(nullptr)
66 , d_inv_in_off_(nullptr)
67 , d_inv_comp_sz_(nullptr)
68 , d_inv_out_off_(nullptr)
69 , d_inv_orig_sz_(nullptr)
70 , scratch_capacity_(0) // # chunks the current scratch allocation can hold
71 , inv_capacity_(0) // # chunks the current inverse table allocation holds
72 {}
73
74 ~RZEStage() override;
75
76 // ── Stage control ──────────────────────────────────────────────────────
77 void setInverse(bool inv) override { is_inverse_ = inv; }
78 bool isInverse() const override { return is_inverse_; }
79
93 bool isGraphCompatible() const override { return !is_inverse_; }
94
95 void setChunkSize(size_t bytes) { chunk_size_ = static_cast<uint32_t>(bytes); }
96 void setLevels(int n) { levels_ = static_cast<uint8_t>(n); }
97
98 size_t getChunkSize() const { return chunk_size_; }
99 size_t getRequiredInputAlignment() const override { return chunk_size_; }
100 int getLevels() const { return static_cast<int>(levels_); }
101 uint32_t getCachedOrigBytes() const { return cached_orig_bytes_; }
102
103 // ── Execution ──────────────────────────────────────────────────────────
105 cudaStream_t stream,
106 MemoryPool* pool,
107 const std::vector<void*>& inputs,
108 const std::vector<void*>& outputs,
109 const std::vector<size_t>& sizes
110 ) override;
111 void postStreamSync(cudaStream_t stream) override;
112
113 // ── Metadata ───────────────────────────────────────────────────────────
114 std::string getName() const override { return "RZE"; }
115 size_t getNumInputs() const override { return 1; }
116 size_t getNumOutputs() const override { return 1; }
117
118 std::vector<size_t> estimateOutputSizes(
119 const std::vector<size_t>& input_sizes
120 ) const override {
121 if (is_inverse_) {
122 // The original byte count is stored in the first 4 bytes of the
123 // compressed stream and cached here after the forward pass (or
124 // restored via deserializeHeader for cold decompression).
125 if (cached_orig_bytes_ > 0)
126 return {static_cast<size_t>(cached_orig_bytes_)};
127 // Fallback: should not normally be reached; return compressed
128 // size as a lower bound (will likely trigger a buffer overwrite
129 // warning — indicates a missing forward pass or header restore).
130 return {input_sizes.empty() ? 0 : input_sizes[0]};
131 }
132 // Forward: worst case = original data + stream header.
133 // Header = 4 (orig_bytes) + 4 (num_chunks) + 4*n_chunks.
134 const size_t n_bytes = input_sizes.empty() ? 0 : input_sizes[0];
135 const size_t n_chunks = (n_bytes + chunk_size_ - 1) / chunk_size_;
136 const size_t hdr = 4 + 4 + 4 * n_chunks;
137 return {n_bytes + hdr};
138 }
139
140 std::unordered_map<std::string, size_t>
142 size_t getActualOutputSize(int index) const override;
143
156 const std::vector<size_t>& input_sizes
157 ) const override {
158 if (is_inverse_ || input_sizes.empty()) return 0;
159 const size_t in_bytes = input_sizes[0];
160 const size_t n_chunks = (in_bytes + chunk_size_ - 1) / chunk_size_;
161 return n_chunks * (static_cast<size_t>(chunk_size_) + 3 * sizeof(uint32_t));
162 }
163
164 uint16_t getStageTypeId() const override {
165 return static_cast<uint16_t>(StageType::RZE);
166 }
167
168 uint8_t getOutputDataType(size_t) const override {
169 return static_cast<uint8_t>(DataType::UINT8);
170 }
171
172 // ── Serialization ──────────────────────────────────────────────────────
173 // Header layout (9 bytes):
174 // [0..3] chunk_size (uint32_t LE)
175 // [4] levels (uint8_t)
176 // [5..8] cached_orig_bytes_ (uint32_t LE) — original uncompressed size,
177 // used by the inverse estimateOutputSizes() to
178 // pre-allocate the correct output buffer.
180 size_t output_index, uint8_t* buf, size_t max_size
181 ) const override {
182 (void)output_index;
183 if (max_size < 9) return 0;
184 std::memcpy(buf, &chunk_size_, sizeof(uint32_t));
185 buf[4] = levels_;
186 std::memcpy(buf + 5, &cached_orig_bytes_, sizeof(uint32_t));
187 return 9;
188 }
189
190 void deserializeHeader(const uint8_t* buf, size_t size) override {
191 if (size >= 4) std::memcpy(&chunk_size_, buf, sizeof(uint32_t));
192 if (size >= 5) levels_ = buf[4];
193 if (size >= 9) std::memcpy(&cached_orig_bytes_, buf + 5, sizeof(uint32_t));
194 }
195
196 size_t getMaxHeaderSize(size_t) const override { return 9; }
197
198 void saveState() override {
199 saved_chunk_size_ = chunk_size_;
200 saved_levels_ = levels_;
201 saved_cached_orig_bytes_ = cached_orig_bytes_;
202 }
203
204 void restoreState() override {
205 chunk_size_ = saved_chunk_size_;
206 levels_ = saved_levels_;
207 cached_orig_bytes_ = saved_cached_orig_bytes_;
208 }
209
210private:
211 bool is_inverse_;
212 uint32_t chunk_size_;
213 uint32_t saved_chunk_size_ = 0;
214 uint8_t levels_;
215 uint8_t saved_levels_ = 0;
216 size_t actual_output_size_;
217 // Cached original (uncompressed) byte count. Set by the forward execute()
218 // and persisted in the serialized header so that the inverse
219 // estimateOutputSizes() can return the right buffer size even when the
220 // inverse stage is constructed cold from a file-based pipeline.
221 uint32_t cached_orig_bytes_ = 0;
222 uint32_t saved_cached_orig_bytes_ = 0;
223
224 // ── Persistent scratch buffers ──────────────────────────────────────────
225 // Forward path:
226 // d_scratch_ : per-chunk worst-case output (n_chunks * chunk_size bytes)
227 // d_sizes_dev_ : raw compressed sizes from rzeEncodeKernel (with flag bit)
228 // d_clean_dev_ : compressed sizes with flag stripped (for pack offsets)
229 // d_dst_off_dev_: exclusive-prefix-sum of clean sizes + header offset
230 // Inverse path:
231 // d_inv_{in_off,comp_sz,out_off,orig_sz}_: per-chunk decode tables
232 // All are allocated once (or grown on demand) and freed in the destructor.
233 uint8_t* d_scratch_;
234 uint32_t* d_sizes_dev_;
235 uint32_t* d_clean_dev_;
236 uint32_t* d_dst_off_dev_;
237 uint32_t* d_inv_in_off_;
238 uint32_t* d_inv_comp_sz_;
239 uint32_t* d_inv_out_off_;
240 uint32_t* d_inv_orig_sz_;
241 mutable bool tail_readback_pending_ = false;
242 mutable cudaStream_t tail_readback_stream_ = nullptr;
243 mutable uint32_t tail_last_index_ = 0;
244 // Output pointer saved by forward execute() so postStreamSync() can zero
245 // the trailing alignment padding (0–3 bytes at actual_output_size_ - total_out).
246 mutable uint8_t* tail_output_ptr_ = nullptr;
247 size_t scratch_capacity_; // # chunks forward scratch can hold
248 size_t inv_capacity_; // # chunks inverse tables can hold
249 MemoryPool* scratch_pool_owner_ = nullptr;
250 MemoryPool* inv_pool_owner_ = nullptr;
251 bool scratch_from_pool_ = false;
252 bool inv_from_pool_ = false;
253};
254
255} // namespace fz
Definition mempool.h:82
Definition rze_stage.h:51
void execute(cudaStream_t stream, MemoryPool *pool, const std::vector< void * > &inputs, const std::vector< void * > &outputs, const std::vector< size_t > &sizes) override
uint8_t getOutputDataType(size_t) const override
Definition rze_stage.h:168
void deserializeHeader(const uint8_t *buf, size_t size) override
Definition rze_stage.h:190
bool isGraphCompatible() const override
Definition rze_stage.h:93
void setInverse(bool inv) override
Definition rze_stage.h:77
size_t getMaxHeaderSize(size_t) const override
Definition rze_stage.h:196
size_t serializeHeader(size_t output_index, uint8_t *buf, size_t max_size) const override
Definition rze_stage.h:179
uint16_t getStageTypeId() const override
Definition rze_stage.h:164
std::unordered_map< std::string, size_t > getActualOutputSizesByName() const override
size_t getRequiredInputAlignment() const override
Definition rze_stage.h:99
void saveState() override
Definition rze_stage.h:198
std::string getName() const override
Definition rze_stage.h:114
std::vector< size_t > estimateOutputSizes(const std::vector< size_t > &input_sizes) const override
Definition rze_stage.h:118
size_t getActualOutputSize(int index) const override
size_t estimateScratchBytes(const std::vector< size_t > &input_sizes) const override
Definition rze_stage.h:155
void postStreamSync(cudaStream_t stream) override
Definition stage.h:30
FZM binary file format definitions — structs, enums, and helpers.
Definition fzm_format.h:25
Base class interface for all compression stages.