FZGPUModules 1.0
GPU-accelerated modular compression pipeline
Loading...
Searching...
No Matches
rle.h
Go to the documentation of this file.
1#pragma once
2
8#include "stage/stage.h"
9#include "fzm_format.h"
10#include "log.h"
11#include <cuda_runtime.h>
12#include <cstdint>
13#include <cstring>
14#include <type_traits>
15
16namespace fz {
17
30template<typename T = uint16_t>
31class RLEStage : public Stage {
32public:
33 RLEStage() : is_inverse_(false) {}
34 ~RLEStage() override;
35
36 void setInverse(bool inverse) override { is_inverse_ = inverse; }
37 bool isInverse() const override { return is_inverse_; }
38
39 void execute(
40 cudaStream_t stream,
41 MemoryPool* pool,
42 const std::vector<void*>& inputs,
43 const std::vector<void*>& outputs,
44 const std::vector<size_t>& sizes
45 ) override;
46
52 void postStreamSync(cudaStream_t stream) override;
53
54 std::string getName() const override { return "RLE"; }
55 size_t getNumInputs() const override { return 1; }
56 size_t getNumOutputs() const override { return 1; }
57
69 const std::vector<size_t>& input_sizes
70 ) const override {
71 if (is_inverse_ || input_sizes.empty()) return 0;
72 const size_t n = input_sizes[0] / sizeof(T);
73 // is_boundary(1B) + boundary_scan(4B) + boundary_positions(4B)
74 // + values_scratch(sizeof(T)) + lengths_scratch(4B)
75 return n * (1 + 4 + 4 + sizeof(T) + 4);
76 }
77
78 std::vector<size_t> estimateOutputSizes(
79 const std::vector<size_t>& input_sizes
80 ) const override {
81 if (is_inverse_) {
82 // Use the element count cached from the forward pass (or deserialized
83 // from the file header) for an exact estimate. Falls back to a
84 // conservative 2× bound only when no prior forward pass has run.
85 if (cached_num_elements_ > 0)
86 return {static_cast<size_t>(cached_num_elements_) * sizeof(T)};
87 return {input_sizes[0] * 2};
88 } else {
89 // Compression: worst case is every element is unique.
90 // Wire format: [num_runs:u32][values:T×n, 4B-aligned][lengths:u32×n]
91 // The values section is padded to a 4-byte boundary (matching
92 // rle_pack_kernel), so the estimate must include that padding or
93 // the allocated buffer will be too small and the lengths write OOBs.
94 size_t n = input_sizes[0] / sizeof(T);
95 size_t values_bytes = n * sizeof(T);
96 size_t values_aligned = (values_bytes + 3u) & ~3u;
97 return {sizeof(uint32_t) + values_aligned + n * sizeof(uint32_t)};
98 }
99 }
100
101 std::unordered_map<std::string, size_t> getActualOutputSizesByName() const override {
102 completePendingSync();
103 return {{"output", actual_output_sizes_.empty() ? 0 : actual_output_sizes_[0]}};
104 }
105 size_t getActualOutputSize(int index) const override {
106 completePendingSync();
107 return (index == 0 && !actual_output_sizes_.empty()) ? actual_output_sizes_[0] : 0;
108 }
109
110 uint16_t getStageTypeId() const override {
111 return static_cast<uint16_t>(StageType::RLE);
112 }
113
114 uint8_t getOutputDataType(size_t output_index) const override {
115 (void)output_index;
116 return static_cast<uint8_t>(getDataTypeEnum());
117 }
118
119 uint8_t getInputDataType(size_t /*input_index*/) const override {
120 return static_cast<uint8_t>(getDataTypeEnum());
121 }
122
123 size_t serializeHeader(size_t output_index, uint8_t* header_buffer, size_t max_size) const override {
124 (void)output_index;
125 const size_t needed = sizeof(DataType) + sizeof(uint32_t);
126 if (max_size < needed) return 0;
127 DataType dt = getDataTypeEnum();
128 std::memcpy(header_buffer, &dt, sizeof(DataType));
129 std::memcpy(header_buffer + sizeof(DataType), &cached_num_elements_, sizeof(uint32_t));
130 return needed;
131 }
132
133 void deserializeHeader(const uint8_t* header_buffer, size_t size) override {
134 if (size >= sizeof(DataType) + sizeof(uint32_t))
135 std::memcpy(&cached_num_elements_, header_buffer + sizeof(DataType), sizeof(uint32_t));
136 }
137
138 size_t getMaxHeaderSize(size_t output_index) const override {
139 (void)output_index;
140 return sizeof(DataType) + sizeof(uint32_t);
141 }
142
143private:
144 bool is_inverse_;
145
149 uint32_t cached_num_elements_ = 0;
150
151 // ── Persistent forward-path scratch ──────────────────────────────────────
152 // Allocated lazily on the first forward execute(); grown if n increases.
153 uint8_t* d_is_boundary_ = nullptr;
154 uint32_t* d_boundary_scan_ = nullptr;
155 uint32_t* d_boundary_positions_ = nullptr;
156 T* d_values_scratch_ = nullptr;
157 uint32_t* d_lengths_scratch_ = nullptr;
158 size_t fwd_scratch_n_ = 0;
159 MemoryPool* fwd_scratch_pool_ = nullptr;
160 bool fwd_from_pool_ = false;
161
162 // Pinned host buffer for async D2H of num_runs.
163 // mutable so getActualOutputSizesByName() can complete the pending
164 // readback even when called on a const Stage reference.
165 mutable uint32_t* h_num_runs_ = nullptr;
166 mutable bool fwd_sync_pending_ = false;
167 mutable cudaStream_t fwd_last_stream_ = nullptr;
168 mutable std::vector<size_t> actual_output_sizes_;
169
170 // Complete a pending forward-path readback (if any) by syncing the stream
171 // that was used and computing actual_output_sizes_. Safe to call from
172 // const methods; all state it touches is mutable.
173 void completePendingSync() const {
174 if (!fwd_sync_pending_) return;
175 cudaStreamSynchronize(fwd_last_stream_);
176 const uint32_t num_runs = *h_num_runs_;
177 const size_t values_bytes = num_runs * sizeof(T);
178 const size_t values_aligned = (values_bytes + 3) & ~3;
179 actual_output_sizes_ = {
180 sizeof(uint32_t) + values_aligned + num_runs * sizeof(uint32_t)
181 };
182 fwd_sync_pending_ = false;
183 // Log run count and effective compression ratio.
184 const size_t in_bytes = static_cast<size_t>(cached_num_elements_) * sizeof(T);
185 const size_t out_bytes = actual_output_sizes_[0];
186 const float ratio = in_bytes > 0
187 ? static_cast<float>(in_bytes) / static_cast<float>(out_bytes) : 0.0f;
188 FZ_LOG(DEBUG, "RLE encode: %u runs / %u elems %.1f KB -> %.1f KB ratio %.2fx",
189 num_runs, cached_num_elements_,
190 in_bytes / 1024.0f, out_bytes / 1024.0f, ratio);
191 }
192
193 // Helper to map template type T to DataType enum
194 DataType getDataTypeEnum() const {
195 if (std::is_same<T, uint8_t>::value) return DataType::UINT8;
196 if (std::is_same<T, uint16_t>::value) return DataType::UINT16;
197 if (std::is_same<T, uint32_t>::value) return DataType::UINT32;
198 if (std::is_same<T, uint64_t>::value) return DataType::UINT64;
199 if (std::is_same<T, int8_t>::value) return DataType::INT8;
200 if (std::is_same<T, int16_t>::value) return DataType::INT16;
201 if (std::is_same<T, int32_t>::value) return DataType::INT32;
202 if (std::is_same<T, int64_t>::value) return DataType::INT64;
203 if (std::is_same<T, float>::value) return DataType::FLOAT32;
204 if (std::is_same<T, double>::value) return DataType::FLOAT64;
205 return DataType::UINT8; // Fallback
206 }
207};
208
209extern template class RLEStage<uint8_t>;
210extern template class RLEStage<uint16_t>;
211extern template class RLEStage<uint32_t>;
212extern template class RLEStage<int32_t>;
213
214} // namespace fz
Definition mempool.h:62
Definition rle.h:31
void setInverse(bool inverse) override
Definition rle.h:36
void postStreamSync(cudaStream_t stream) override
void deserializeHeader(const uint8_t *header_buffer, size_t size) override
Definition rle.h:133
std::unordered_map< std::string, size_t > getActualOutputSizesByName() const override
Definition rle.h:101
std::string getName() const override
Definition rle.h:54
size_t serializeHeader(size_t output_index, uint8_t *header_buffer, size_t max_size) const override
Definition rle.h:123
uint16_t getStageTypeId() const override
Definition rle.h:110
size_t getMaxHeaderSize(size_t output_index) const override
Definition rle.h:138
uint8_t getOutputDataType(size_t output_index) const override
Definition rle.h:114
std::vector< size_t > estimateOutputSizes(const std::vector< size_t > &input_sizes) const override
Definition rle.h:78
void execute(cudaStream_t stream, MemoryPool *pool, const std::vector< void * > &inputs, const std::vector< void * > &outputs, const std::vector< size_t > &sizes) override
uint8_t getInputDataType(size_t) const override
Definition rle.h:119
size_t getActualOutputSize(int index) const override
Definition rle.h:105
size_t estimateScratchBytes(const std::vector< size_t > &input_sizes) const override
Definition rle.h:68
Definition stage.h:28
FZM binary file format definitions — structs, enums, and helpers.
Definition fzm_format.h:25
@ RLE
RLEStage — run-length encoding.
DataType
Element data type identifiers used in buffer and stage descriptors.
Definition fzm_format.h:103
@ DEBUG
Pipeline construction, buffer allocation, data stats.