FZGPUModules 2.0
GPU-accelerated modular compression pipelines
Loading...
Searching...
No Matches
dag.h
Go to the documentation of this file.
1
5#pragma once
6
7#include "pipeline/perf.h"
8
9#include <cuda_runtime.h>
10#include <memory>
11#include <string>
12#include <unordered_map>
13#include <unordered_set>
14#include <vector>
15
16namespace fz {
17
18// Forward declarations
19class Stage;
20class MemoryPool;
21
23enum class MemoryStrategy {
24 MINIMAL,
26};
27
29struct BufferInfo {
30 size_t size;
31 size_t initial_size;
32 size_t allocated_size;
33 void* d_ptr;
34 std::string tag;
35
36 int remaining_consumers;
37 std::vector<int> consumer_stage_ids;
38 int producer_stage_id;
39 int producer_output_index;
40
41 bool is_allocated;
44
46 : size(0), initial_size(0), allocated_size(0), d_ptr(nullptr), tag(""),
47 remaining_consumers(0), producer_stage_id(-1), producer_output_index(0),
48 is_allocated(false), is_persistent(false), is_external(false) {}
49};
50
52struct DAGNode {
53 int id;
54 Stage* stage;
55 std::string name;
56
57 std::vector<int> input_buffer_ids;
58 std::vector<int> output_buffer_ids;
59 std::unordered_map<int, int> output_index_to_buffer_id;
60
61 std::vector<DAGNode*> dependencies;
62 std::vector<DAGNode*> dependents;
63
64 int level;
65 int execution_order;
66 cudaStream_t stream;
67
68 bool is_executed;
69 cudaEvent_t completion_event;
70 cudaEvent_t start_event;
71
72 // Pre-sized vectors for execute() — allocated at finalize(), reused every call
73 // to avoid per-call heap allocations of the input/output/sizes arrays.
74 std::vector<void*> exec_inputs;
75 std::vector<void*> exec_outputs;
76 std::vector<size_t> exec_sizes;
77
78 DAGNode(Stage* s = nullptr)
79 : id(-1), stage(s), level(-1), execution_order(-1),
80 stream(nullptr), is_executed(false), completion_event(nullptr), start_event(nullptr) {}
81};
82
93public:
94 CompressionDAG(MemoryPool* mem_pool, MemoryStrategy strategy = MemoryStrategy::MINIMAL);
96
97 // ── Construction ──────────────────────────────────────────────────────────
98
100 DAGNode* addStage(Stage* stage, std::string name = "");
101
110 int addDependency(DAGNode* dependent, DAGNode* dependency,
111 size_t buffer_size = 0, int output_index = 0);
112
113 void setInputBuffer(DAGNode* node, size_t size, const std::string& tag = "input");
114 void setOutputBuffer(DAGNode* node, size_t size, const std::string& tag = "output");
115
120 int addUnconnectedOutput(DAGNode* node, size_t size, int output_index, const std::string& tag);
121
126 bool connectExistingOutput(DAGNode* producer, DAGNode* consumer, int output_index);
127
128 void updateBufferTag(int buffer_id, const std::string& tag);
129 void setBufferPersistent(int buffer_id, bool persistent);
130
132 void finalize();
133
135 void configureStreams(int num_streams);
136
137 // ── Execution ─────────────────────────────────────────────────────────────
138
139 void execute(cudaStream_t stream);
140
145 void preallocateBuffers(cudaStream_t stream = 0);
146
148 void reset(cudaStream_t stream = 0);
149
150 // ── Buffer access ─────────────────────────────────────────────────────────
151
152 void* getBuffer(int buffer_id) const;
153
158 void setExternalPointer(int buffer_id, void* external_ptr);
159
160 void updateBufferSize(int buffer_id, size_t new_size);
161
162 // ── Query & debug ─────────────────────────────────────────────────────────
163
165 size_t getTotalBufferSize() const;
166
175 size_t computeTopoPoolSize() const;
176
177 size_t getPeakMemoryUsage() const { return peak_memory_usage_; }
178 size_t getCurrentMemoryUsage() const { return current_memory_usage_; }
179 size_t getBufferSize(int buffer_id) const;
180 const BufferInfo& getBufferInfo(int buffer_id) const;
181 const std::vector<std::vector<DAGNode*>>& getLevels() const { return levels_; }
182 const std::vector<DAGNode*>& getNodes() const { return nodes_; }
183
185 int getMaxParallelism() const;
186
188 size_t getStreamCount() const { return streams_.size(); }
189
190 void printDAG() const;
191 void printBufferLifetimes() const;
192
198 void enableBoundsCheck(bool enable) { bounds_check_enabled_ = enable; }
199 bool isBoundsCheckEnabled() const { return bounds_check_enabled_; }
200
206 void setColoringEnabled(bool enable) { coloring_disabled_ = !enable; }
207 bool isColoringEnabled() const { return coloring_applied_; }
208 size_t getColorRegionCount() const { return color_region_sizes_.size(); }
209
217 void setCaptureMode(bool capture);
218 bool isCaptureMode() const { return capture_mode_; }
219
220 // ── Profiling ─────────────────────────────────────────────────────────────
221
226 void enableProfiling(bool enable);
227 bool isProfilingEnabled() const { return profiling_enabled_; }
228
230 std::vector<StageTimingResult> collectTimings();
231
232private:
233 MemoryPool* mem_pool_;
234 MemoryStrategy strategy_;
235
236 std::vector<DAGNode*> nodes_;
237 std::unordered_map<int, BufferInfo> buffers_;
238
239 int next_buffer_id_;
240 bool is_finalized_;
241
242 std::vector<cudaStream_t> streams_;
243 bool owns_streams_;
244
245 std::vector<std::vector<DAGNode*>> levels_;
246 int max_level_;
247
248 size_t current_memory_usage_;
249 size_t peak_memory_usage_;
250
251 bool profiling_enabled_;
252 bool bounds_check_enabled_;
253 bool capture_mode_;
254
255 // Buffer coloring (PREALLOCATE only). Non-overlapping buffers share a color
256 // and are aliased into one pool region. color_region_ptrs_ owns the allocations.
257 bool coloring_disabled_;
258 bool coloring_applied_;
259 std::unordered_map<int, int> buffer_color_;
260 std::vector<size_t> color_region_sizes_;
261 std::vector<void*> color_region_ptrs_;
262
263 void assignLevels();
264 void assignStreams();
265 void allocateBuffer(int buffer_id, cudaStream_t stream);
266 void freeBuffer(int buffer_id, cudaStream_t stream);
267 void planPreallocation();
268 void colorBuffers();
269};
270
271} // namespace fz
Definition dag.h:92
void preallocateBuffers(cudaStream_t stream=0)
DAGNode * addStage(Stage *stage, std::string name="")
void setCaptureMode(bool capture)
int addDependency(DAGNode *dependent, DAGNode *dependency, size_t buffer_size=0, int output_index=0)
void setColoringEnabled(bool enable)
Definition dag.h:206
size_t computeTopoPoolSize() const
size_t getTotalBufferSize() const
int addUnconnectedOutput(DAGNode *node, size_t size, int output_index, const std::string &tag)
void setExternalPointer(int buffer_id, void *external_ptr)
size_t getStreamCount() const
Definition dag.h:188
void enableBoundsCheck(bool enable)
Definition dag.h:198
void enableProfiling(bool enable)
std::vector< StageTimingResult > collectTimings()
void configureStreams(int num_streams)
int getMaxParallelism() const
bool connectExistingOutput(DAGNode *producer, DAGNode *consumer, int output_index)
void reset(cudaStream_t stream=0)
Definition mempool.h:66
Definition stage.h:30
MemoryStrategy
Definition dag.h:23
@ MINIMAL
Allocate on-demand, free at last consumer. Lowest peak memory.
@ PREALLOCATE
Allocate everything upfront at finalize(). Required for graph mode.
Pipeline and per-stage profiling result types.
Definition dag.h:29
bool is_external
If true, pointer is caller-owned — DAG never allocs or frees.
Definition dag.h:43
bool is_persistent
If true, survives reset() until DAG destruction.
Definition dag.h:42
Definition dag.h:52
cudaEvent_t start_event
Non-null only when profiling is enabled.
Definition dag.h:70