39 size_t input_data_size = 0,
41 float pool_multiplier = 3.0f
59 void setDims(
size_t x,
size_t y = 1,
size_t z = 1) { dims_ = {x, y, z}; }
60 void setDims(std::array<size_t, 3> dims) { dims_ = dims; }
61 std::array<size_t, 3> getDims()
const {
return dims_; }
69 template<
typename StageT,
typename... Args>
82 int connect(
Stage* dependent,
const std::vector<Stage*>& producers);
96 void warmup(cudaStream_t stream = 0);
100 bool isWarmupOnFinalizeEnabled()
const {
return warmup_on_finalize_; }
109 bool isPoolManagedDecompOutput()
const {
return pool_managed_decomp_; }
134 cudaStream_t stream = 0
142 const std::vector<InputSpec>& inputs,
145 cudaStream_t stream = 0
153 per_source_hints_[source] = size;
177 cudaStream_t stream = 0
186 const void* d_input =
nullptr,
187 size_t input_size = 0,
188 cudaStream_t stream = 0
192 void reset(cudaStream_t stream = 0);
201 bool isProfilingEnabled()
const {
return profiling_enabled_; }
218 bool isBoundsCheckEnabled()
const {
return dag_->isBoundsCheckEnabled(); }
226 bool isColoringEnabled()
const {
return dag_->isColoringEnabled(); }
227 size_t getColorRegionCount()
const {
return dag_->getColorRegionCount(); }
239 bool isGraphModeEnabled()
const {
return graph_mode_enabled_; }
252 bool isGraphCaptured()
const {
return graph_captured_; }
254 size_t getPeakMemoryUsage()
const;
255 size_t getCurrentMemoryUsage()
const;
256 void printPipeline()
const;
263 std::vector<FZMStageInfo> stages;
264 std::vector<FZMBufferEntry> buffers;
268 void writeToFile(
const std::string& filename, cudaStream_t stream = 0);
289 const std::string& filename,
292 cudaStream_t stream = 0,
294 size_t pool_override_bytes = 0
302 struct OutputBuffer {
305 size_t allocated_size;
309 std::vector<OutputBuffer> getOutputBuffers()
const;
311 static void* loadCompressedData(
312 const std::string& filename,
313 const FZMFileHeader& header,
314 cudaStream_t stream = 0,
319 std::pair<std::vector<Stage*>, std::vector<Stage*>> identifyTopology();
320 void setupInputBuffers(
const std::vector<Stage*>& sources);
321 int autoDetectUnconnectedOutputs();
322 void detectMultiOutputScenario(
int pipeline_outputs);
323 void configureStreamsIfNeeded();
330 void propagateBufferSizes(
bool force_from_current_inputs =
false);
332 std::vector<Stage*> getSourceStages()
const;
333 std::vector<Stage*> getSinkStages()
const;
338 struct FwdStageDesc {
340 std::vector<int> output_buf_ids;
341 std::vector<int> input_buf_ids;
345 using PipelineOutputMap = std::unordered_map<int, std::pair<void*, size_t>>;
352 static std::pair<std::unique_ptr<CompressionDAG>,
353 std::unordered_map<Stage*, int>>
355 const std::vector<FwdStageDesc>& fwd_stages,
356 const PipelineOutputMap& pipeline_outputs,
359 const std::unordered_map<Stage*, size_t>& source_sizes,
360 bool enable_profiling
365 struct OutputBufferInfo {
369 std::string stage_name;
370 std::string output_name;
373 std::vector<OutputBufferInfo> collectOutputBuffers()
const;
376 size_t calculateConcatSize(
const std::vector<OutputBufferInfo>& outputs)
const;
378 size_t writeConcatBuffer(
379 const std::vector<OutputBufferInfo>& outputs,
380 uint8_t* d_concat_bytes,
384 void concatOutputs(
void** d_output,
size_t* output_size, cudaStream_t stream);
388 std::unique_ptr<MemoryPool> mem_pool_;
389 std::unique_ptr<CompressionDAG> dag_;
392 std::vector<std::unique_ptr<Stage>> stages_;
393 std::unordered_map<Stage*, DAGNode*> stage_to_node_;
395 struct ConnectionInfo {
398 std::string output_name;
401 std::vector<ConnectionInfo> connections_;
405 bool warmup_on_finalize_;
406 bool pool_managed_decomp_;
411 bool was_compressed_;
413 bool profiling_enabled_;
414 PipelinePerfResult last_perf_result_;
416 std::vector<DAGNode*> input_nodes_;
417 std::vector<DAGNode*> output_nodes_;
418 std::vector<int> input_buffer_ids_;
419 std::vector<int> output_buffer_ids_;
421 void* d_concat_buffer_;
422 size_t concat_buffer_capacity_;
427 std::vector<void*> d_decomp_outputs_;
431 void* h_concat_header_;
432 size_t h_concat_header_capacity_;
438 size_t copy_descs_capacity_;
444 std::vector<size_t> source_input_sizes_;
448 size_t input_alignment_bytes_;
450 size_t d_pad_buf_size_;
454 size_t original_input_size_;
456 size_t input_size_hint_;
457 float pool_multiplier_;
461 std::unordered_map<Stage*, size_t> per_source_hints_;
465 std::array<size_t, 3> dims_;
476 std::unique_ptr<CompressionDAG> inv_dag;
477 std::unordered_map<Stage*, int> inv_result_map;
478 std::unordered_map<int, int> fwd_to_inv_ext_buf;
479 std::unordered_map<Stage*, size_t> source_sizes;
481 std::unique_ptr<InvDAGCache> inv_cache_;
483 struct BufferMetadata {
486 size_t allocated_size;
491 std::vector<BufferMetadata> buffer_metadata_;
493 bool graph_mode_enabled_;
494 bool graph_captured_;
498 void* d_graph_input_;
499 size_t d_graph_input_size_;
501 cudaGraph_t captured_graph_;
502 cudaGraphExec_t graph_exec_;
510 throw std::runtime_error(
"Cannot add stages after finalization");
513 auto stage_ptr = std::make_unique<StageT>(std::forward<Args>(args)...);
514 StageT* stage = stage_ptr.get();
516 DAGNode* node = dag_->addStage(stage, stage->getName());
517 size_t num_outputs = stage->getNumOutputs();
518 auto output_names = stage->getOutputNames();
522 for (
size_t i = 0; i < num_outputs; i++) {
523 std::string out_name = i < output_names.size() ? output_names[i] : std::to_string(i);
524 dag_->addUnconnectedOutput(node, 1, i, stage->getName() +
"." + out_name +
"_unconnected");
527 stage_to_node_[stage] = node;
528 stages_.push_back(std::move(stage_ptr));