FZGPUModules 1.0
GPU-accelerated modular compression pipeline
Loading...
Searching...
No Matches
rze_stage.h
Go to the documentation of this file.
1#pragma once
2
25#include "stage/stage.h"
26#include "fzm_format.h"
27#include <cuda_runtime.h>
28#include <cstdint>
29#include <cstring>
30#include <stdexcept>
31#include <string>
32#include <unordered_map>
33#include <vector>
34
35namespace fz {
36
47class RZEStage : public Stage {
48public:
49 RZEStage()
50 : is_inverse_(false)
51 , chunk_size_(16384)
52 , levels_(4)
53 , actual_output_size_(0)
54 , cached_orig_bytes_(0)
55 // Persistent scratch buffers — allocated on first use, reused thereafter.
56 // Eliminates the blocking cudaMalloc/cudaFree pair that appeared in every
57 // execute() call and was clearly visible as the two cudaMalloc events in nsys.
58 , d_scratch_(nullptr)
59 , d_sizes_dev_(nullptr)
60 , d_clean_dev_(nullptr)
61 , d_dst_off_dev_(nullptr)
62 , d_inv_in_off_(nullptr)
63 , d_inv_comp_sz_(nullptr)
64 , d_inv_out_off_(nullptr)
65 , d_inv_orig_sz_(nullptr)
66 , scratch_capacity_(0) // # chunks the current scratch allocation can hold
67 , inv_capacity_(0) // # chunks the current inverse table allocation holds
68 {}
69
70 ~RZEStage() override;
71
72 // ── Stage control ──────────────────────────────────────────────────────
73 void setInverse(bool inv) override { is_inverse_ = inv; }
74 bool isInverse() const override { return is_inverse_; }
75
89 bool isGraphCompatible() const override { return !is_inverse_; }
90
91 void setChunkSize(size_t bytes) { chunk_size_ = static_cast<uint32_t>(bytes); }
92 void setLevels(int n) { levels_ = static_cast<uint8_t>(n); }
93
94 size_t getChunkSize() const { return chunk_size_; }
95 size_t getRequiredInputAlignment() const override { return chunk_size_; }
96 int getLevels() const { return static_cast<int>(levels_); }
97 uint32_t getCachedOrigBytes() const { return cached_orig_bytes_; }
98
99 // ── Execution ──────────────────────────────────────────────────────────
101 cudaStream_t stream,
102 MemoryPool* pool,
103 const std::vector<void*>& inputs,
104 const std::vector<void*>& outputs,
105 const std::vector<size_t>& sizes
106 ) override;
107 void postStreamSync(cudaStream_t stream) override;
108
109 // ── Metadata ───────────────────────────────────────────────────────────
110 std::string getName() const override { return "RZE"; }
111 size_t getNumInputs() const override { return 1; }
112 size_t getNumOutputs() const override { return 1; }
113
114 std::vector<size_t> estimateOutputSizes(
115 const std::vector<size_t>& input_sizes
116 ) const override {
117 if (is_inverse_) {
118 // The original byte count is stored in the first 4 bytes of the
119 // compressed stream and cached here after the forward pass (or
120 // restored via deserializeHeader for cold decompression).
121 if (cached_orig_bytes_ > 0)
122 return {static_cast<size_t>(cached_orig_bytes_)};
123 // Fallback: should not normally be reached; return compressed
124 // size as a lower bound (will likely trigger a buffer overwrite
125 // warning — indicates a missing forward pass or header restore).
126 return {input_sizes.empty() ? 0 : input_sizes[0]};
127 }
128 // Forward: worst case = original data + stream header.
129 // Header = 4 (orig_bytes) + 4 (num_chunks) + 4*n_chunks.
130 const size_t n_bytes = input_sizes.empty() ? 0 : input_sizes[0];
131 const size_t n_chunks = (n_bytes + chunk_size_ - 1) / chunk_size_;
132 const size_t hdr = 4 + 4 + 4 * n_chunks;
133 return {n_bytes + hdr};
134 }
135
136 std::unordered_map<std::string, size_t>
138 size_t getActualOutputSize(int index) const override;
139
152 const std::vector<size_t>& input_sizes
153 ) const override {
154 if (is_inverse_ || input_sizes.empty()) return 0;
155 const size_t in_bytes = input_sizes[0];
156 const size_t n_chunks = (in_bytes + chunk_size_ - 1) / chunk_size_;
157 return n_chunks * (static_cast<size_t>(chunk_size_) + 3 * sizeof(uint32_t));
158 }
159
160 uint16_t getStageTypeId() const override {
161 return static_cast<uint16_t>(StageType::RZE);
162 }
163
164 uint8_t getOutputDataType(size_t) const override {
165 return static_cast<uint8_t>(DataType::UINT8);
166 }
167
168 // ── Serialization ──────────────────────────────────────────────────────
169 // Header layout (9 bytes):
170 // [0..3] chunk_size (uint32_t LE)
171 // [4] levels (uint8_t)
172 // [5..8] cached_orig_bytes_ (uint32_t LE) — original uncompressed size,
173 // used by the inverse estimateOutputSizes() to
174 // pre-allocate the correct output buffer.
176 size_t output_index, uint8_t* buf, size_t max_size
177 ) const override {
178 (void)output_index;
179 if (max_size < 9) return 0;
180 std::memcpy(buf, &chunk_size_, sizeof(uint32_t));
181 buf[4] = levels_;
182 std::memcpy(buf + 5, &cached_orig_bytes_, sizeof(uint32_t));
183 return 9;
184 }
185
186 void deserializeHeader(const uint8_t* buf, size_t size) override {
187 if (size >= 4) std::memcpy(&chunk_size_, buf, sizeof(uint32_t));
188 if (size >= 5) levels_ = buf[4];
189 if (size >= 9) std::memcpy(&cached_orig_bytes_, buf + 5, sizeof(uint32_t));
190 }
191
192 size_t getMaxHeaderSize(size_t) const override { return 9; }
193
194 void saveState() override {
195 saved_chunk_size_ = chunk_size_;
196 saved_levels_ = levels_;
197 saved_cached_orig_bytes_ = cached_orig_bytes_;
198 }
199
200 void restoreState() override {
201 chunk_size_ = saved_chunk_size_;
202 levels_ = saved_levels_;
203 cached_orig_bytes_ = saved_cached_orig_bytes_;
204 }
205
206private:
207 bool is_inverse_;
208 uint32_t chunk_size_;
209 uint32_t saved_chunk_size_ = 0;
210 uint8_t levels_;
211 uint8_t saved_levels_ = 0;
212 size_t actual_output_size_;
213 // Cached original (uncompressed) byte count. Set by the forward execute()
214 // and persisted in the serialized header so that the inverse
215 // estimateOutputSizes() can return the right buffer size even when the
216 // inverse stage is constructed cold from a file-based pipeline.
217 uint32_t cached_orig_bytes_ = 0;
218 uint32_t saved_cached_orig_bytes_ = 0;
219
220 // ── Persistent scratch buffers ──────────────────────────────────────────
221 // Forward path:
222 // d_scratch_ : per-chunk worst-case output (n_chunks * chunk_size bytes)
223 // d_sizes_dev_ : raw compressed sizes from rzeEncodeKernel (with flag bit)
224 // d_clean_dev_ : compressed sizes with flag stripped (for pack offsets)
225 // d_dst_off_dev_: exclusive-prefix-sum of clean sizes + header offset
226 // Inverse path:
227 // d_inv_{in_off,comp_sz,out_off,orig_sz}_: per-chunk decode tables
228 // All are allocated once (or grown on demand) and freed in the destructor.
229 uint8_t* d_scratch_;
230 uint32_t* d_sizes_dev_;
231 uint32_t* d_clean_dev_;
232 uint32_t* d_dst_off_dev_;
233 uint32_t* d_inv_in_off_;
234 uint32_t* d_inv_comp_sz_;
235 uint32_t* d_inv_out_off_;
236 uint32_t* d_inv_orig_sz_;
237 mutable bool tail_readback_pending_ = false;
238 mutable cudaStream_t tail_readback_stream_ = nullptr;
239 mutable uint32_t tail_last_index_ = 0;
240 // Output pointer saved by forward execute() so postStreamSync() can zero
241 // the trailing alignment padding (0–3 bytes at actual_output_size_ - total_out).
242 mutable uint8_t* tail_output_ptr_ = nullptr;
243 size_t scratch_capacity_; // # chunks forward scratch can hold
244 size_t inv_capacity_; // # chunks inverse tables can hold
245 MemoryPool* scratch_pool_owner_ = nullptr;
246 MemoryPool* inv_pool_owner_ = nullptr;
247 bool scratch_from_pool_ = false;
248 bool inv_from_pool_ = false;
249};
250
251} // namespace fz
Definition mempool.h:62
Definition rze_stage.h:47
void execute(cudaStream_t stream, MemoryPool *pool, const std::vector< void * > &inputs, const std::vector< void * > &outputs, const std::vector< size_t > &sizes) override
uint8_t getOutputDataType(size_t) const override
Definition rze_stage.h:164
void deserializeHeader(const uint8_t *buf, size_t size) override
Definition rze_stage.h:186
bool isGraphCompatible() const override
Definition rze_stage.h:89
void setInverse(bool inv) override
Definition rze_stage.h:73
size_t getMaxHeaderSize(size_t) const override
Definition rze_stage.h:192
size_t serializeHeader(size_t output_index, uint8_t *buf, size_t max_size) const override
Definition rze_stage.h:175
uint16_t getStageTypeId() const override
Definition rze_stage.h:160
std::unordered_map< std::string, size_t > getActualOutputSizesByName() const override
size_t getRequiredInputAlignment() const override
Definition rze_stage.h:95
void saveState() override
Definition rze_stage.h:194
std::string getName() const override
Definition rze_stage.h:110
std::vector< size_t > estimateOutputSizes(const std::vector< size_t > &input_sizes) const override
Definition rze_stage.h:114
size_t getActualOutputSize(int index) const override
size_t estimateScratchBytes(const std::vector< size_t > &input_sizes) const override
Definition rze_stage.h:151
void postStreamSync(cudaStream_t stream) override
Definition stage.h:28
FZM binary file format definitions — structs, enums, and helpers.
Definition fzm_format.h:25
@ RZE
RZEStage — recursive zero-byte elimination.