FZGPUModules 1.0
GPU-accelerated modular compression pipeline
Loading...
Searching...
No Matches
dag.h
1#pragma once
2
3#include "pipeline/perf.h"
4
5#include <cuda_runtime.h>
6#include <memory>
7#include <string>
8#include <unordered_map>
9#include <unordered_set>
10#include <vector>
11
12namespace fz {
13
14// Forward declarations
15class Stage;
16class MemoryPool;
17
19enum class MemoryStrategy {
20 MINIMAL,
22};
23
25struct BufferInfo {
26 size_t size;
27 size_t initial_size;
28 size_t allocated_size;
29 void* d_ptr;
30 std::string tag;
31
32 int remaining_consumers;
33 std::vector<int> consumer_stage_ids;
34 int producer_stage_id;
35 int producer_output_index;
36
37 bool is_allocated;
40
42 : size(0), initial_size(0), allocated_size(0), d_ptr(nullptr), tag(""),
43 remaining_consumers(0), producer_stage_id(-1), producer_output_index(0),
44 is_allocated(false), is_persistent(false), is_external(false) {}
45};
46
48struct DAGNode {
49 int id;
50 Stage* stage;
51 std::string name;
52
53 std::vector<int> input_buffer_ids;
54 std::vector<int> output_buffer_ids;
55 std::unordered_map<int, int> output_index_to_buffer_id;
56
57 std::vector<DAGNode*> dependencies;
58 std::vector<DAGNode*> dependents;
59
60 int level;
61 int execution_order;
62 cudaStream_t stream;
63
64 bool is_executed;
65 cudaEvent_t completion_event;
66 cudaEvent_t start_event;
67
68 // Pre-sized vectors for execute() — allocated at finalize(), reused every call
69 // to avoid per-call heap allocations of the input/output/sizes arrays.
70 std::vector<void*> exec_inputs;
71 std::vector<void*> exec_outputs;
72 std::vector<size_t> exec_sizes;
73
74 DAGNode(Stage* s = nullptr)
75 : id(-1), stage(s), level(-1), execution_order(-1),
76 stream(nullptr), is_executed(false), completion_event(nullptr), start_event(nullptr) {}
77};
78
89public:
92
93 // ── Construction ──────────────────────────────────────────────────────────
94
96 DAGNode* addStage(Stage* stage, const std::string& name = "");
97
106 int addDependency(DAGNode* dependent, DAGNode* dependency,
107 size_t buffer_size = 0, int output_index = 0);
108
109 void setInputBuffer(DAGNode* node, size_t size, const std::string& tag = "input");
110 void setOutputBuffer(DAGNode* node, size_t size, const std::string& tag = "output");
111
116 int addUnconnectedOutput(DAGNode* node, size_t size, int output_index, const std::string& tag);
117
122 bool connectExistingOutput(DAGNode* producer, DAGNode* consumer, int output_index);
123
124 void updateBufferTag(int buffer_id, const std::string& tag);
125 void setBufferPersistent(int buffer_id, bool persistent);
126
128 void finalize();
129
131 void configureStreams(int num_streams);
132
133 // ── Execution ─────────────────────────────────────────────────────────────
134
135 void execute(cudaStream_t stream);
136
141 void preallocateBuffers(cudaStream_t stream = 0);
142
144 void reset(cudaStream_t stream = 0);
145
146 // ── Buffer access ─────────────────────────────────────────────────────────
147
148 void* getBuffer(int buffer_id) const;
149
154 void setExternalPointer(int buffer_id, void* external_ptr);
155
156 void updateBufferSize(int buffer_id, size_t new_size);
157
158 // ── Query & debug ─────────────────────────────────────────────────────────
159
161 size_t getTotalBufferSize() const;
162
171 size_t computeTopoPoolSize() const;
172
173 size_t getPeakMemoryUsage() const { return peak_memory_usage_; }
174 size_t getCurrentMemoryUsage() const { return current_memory_usage_; }
175 size_t getBufferSize(int buffer_id) const;
176 const BufferInfo& getBufferInfo(int buffer_id) const;
177 const std::vector<std::vector<DAGNode*>>& getLevels() const { return levels_; }
178 const std::vector<DAGNode*>& getNodes() const { return nodes_; }
179
181 int getMaxParallelism() const;
182
184 size_t getStreamCount() const { return streams_.size(); }
185
186 void printDAG() const;
187 void printBufferLifetimes() const;
188
194 void enableBoundsCheck(bool enable) { bounds_check_enabled_ = enable; }
195 bool isBoundsCheckEnabled() const { return bounds_check_enabled_; }
196
202 void setColoringEnabled(bool enable) { coloring_disabled_ = !enable; }
203 bool isColoringEnabled() const { return coloring_applied_; }
204 size_t getColorRegionCount() const { return color_region_sizes_.size(); }
205
213 void setCaptureMode(bool capture);
214 bool isCaptureMode() const { return capture_mode_; }
215
216 // ── Profiling ─────────────────────────────────────────────────────────────
217
222 void enableProfiling(bool enable);
223 bool isProfilingEnabled() const { return profiling_enabled_; }
224
226 std::vector<StageTimingResult> collectTimings();
227
228private:
229 MemoryPool* mem_pool_;
230 MemoryStrategy strategy_;
231
232 std::vector<DAGNode*> nodes_;
233 std::unordered_map<int, BufferInfo> buffers_;
234
235 int next_buffer_id_;
236 bool is_finalized_;
237
238 std::vector<cudaStream_t> streams_;
239 bool owns_streams_;
240
241 std::vector<std::vector<DAGNode*>> levels_;
242 int max_level_;
243
244 size_t current_memory_usage_;
245 size_t peak_memory_usage_;
246
247 bool profiling_enabled_;
248 bool bounds_check_enabled_;
249 bool capture_mode_;
250
251 // Buffer coloring (PREALLOCATE only). Non-overlapping buffers share a color
252 // and are aliased into one pool region. color_region_ptrs_ owns the allocations.
253 bool coloring_disabled_;
254 bool coloring_applied_;
255 std::unordered_map<int, int> buffer_color_;
256 std::vector<size_t> color_region_sizes_;
257 std::vector<void*> color_region_ptrs_;
258
259 void assignLevels();
260 void assignStreams();
261 void allocateBuffer(int buffer_id, cudaStream_t stream);
262 void freeBuffer(int buffer_id, cudaStream_t stream);
263 void planPreallocation();
264 void colorBuffers();
265};
266
267} // namespace fz
Definition dag.h:88
void preallocateBuffers(cudaStream_t stream=0)
void setCaptureMode(bool capture)
int addDependency(DAGNode *dependent, DAGNode *dependency, size_t buffer_size=0, int output_index=0)
void setColoringEnabled(bool enable)
Definition dag.h:202
size_t computeTopoPoolSize() const
size_t getTotalBufferSize() const
int addUnconnectedOutput(DAGNode *node, size_t size, int output_index, const std::string &tag)
void setExternalPointer(int buffer_id, void *external_ptr)
size_t getStreamCount() const
Definition dag.h:184
void enableBoundsCheck(bool enable)
Definition dag.h:194
void enableProfiling(bool enable)
std::vector< StageTimingResult > collectTimings()
void configureStreams(int num_streams)
int getMaxParallelism() const
DAGNode * addStage(Stage *stage, const std::string &name="")
bool connectExistingOutput(DAGNode *producer, DAGNode *consumer, int output_index)
void reset(cudaStream_t stream=0)
Definition mempool.h:62
Definition stage.h:28
Definition fzm_format.h:25
MemoryStrategy
Definition dag.h:19
@ MINIMAL
Allocate on-demand, free at last consumer. Lowest peak memory.
@ PREALLOCATE
Allocate everything upfront at finalize(). Required for graph mode.
Pipeline and per-stage profiling result types.
Definition dag.h:25
bool is_external
If true, pointer is caller-owned — DAG never allocs or frees.
Definition dag.h:39
bool is_persistent
If true, survives reset() until DAG destruction.
Definition dag.h:38
Definition dag.h:48
cudaEvent_t start_event
Non-null only when profiling is enabled.
Definition dag.h:66