42 size_t input_data_size = 0,
44 float pool_multiplier = 3.0f
54 explicit Pipeline(
const std::string& config_path);
72 void setDims(
size_t x,
size_t y = 1,
size_t z = 1) { dims_ = {x, y, z}; }
73 void setDims(std::array<size_t, 3> dims) { dims_ = dims; }
74 std::array<size_t, 3> getDims()
const {
return dims_; }
82 template<
typename StageT,
typename... Args>
95 int connect(
Stage* dependent,
const std::vector<Stage*>& producers);
113 bool isWarmupOnFinalizeEnabled()
const {
return warmup_on_finalize_; }
122 bool isPoolManagedDecompOutput()
const {
return pool_managed_decomp_; }
159 return original_input_size_ > 0 ? original_input_size_ : input_size_;
180 cudaStream_t stream = 0
213 size_t output_buf_capacity,
214 size_t* actual_output_size,
215 cudaStream_t stream = 0
237 cudaStream_t stream = 0
264 size_t output_buf_capacity,
265 size_t* actual_output_size,
266 cudaStream_t stream = 0
270 void reset(cudaStream_t stream = 0);
279 bool isProfilingEnabled()
const {
return profiling_enabled_; }
307 bool isBoundsCheckEnabled()
const {
return dag_->isBoundsCheckEnabled(); }
315 bool isColoringEnabled()
const {
return dag_->isColoringEnabled(); }
316 size_t getColorRegionCount()
const {
return dag_->getColorRegionCount(); }
328 bool isGraphModeEnabled()
const {
return graph_mode_enabled_; }
341 bool isGraphCaptured()
const {
return graph_captured_; }
343 size_t getPeakMemoryUsage()
const;
344 size_t getCurrentMemoryUsage()
const;
345 void printPipeline()
const;
352 std::vector<FZMStageInfo> stages;
353 std::vector<FZMBufferEntry> buffers;
357 void writeToFile(
const std::string& filename, cudaStream_t stream = 0);
380 const std::string& filename,
383 cudaStream_t stream = 0,
385 size_t pool_override_bytes = 0
406 const std::string& filename,
409 cudaStream_t stream = 0,
449 ~PoolBuffer() { free(0); }
450 PoolBuffer() =
default;
451 PoolBuffer(
const PoolBuffer&) =
delete;
452 PoolBuffer& operator=(
const PoolBuffer&) =
delete;
454 void free(cudaStream_t s) {
455 if (ptr && pool) { pool->
free(ptr, s); ptr =
nullptr; capacity = 0; }
457 bool allocate(MemoryPool* p,
size_t bytes, cudaStream_t s,
458 const char* tag,
bool persistent =
false) {
461 ptr = pool->
allocate(bytes, s, tag, persistent);
462 if (ptr) capacity = bytes;
463 return ptr !=
nullptr;
468 struct PinnedBuffer {
472 ~PinnedBuffer() {
if (ptr) cudaFreeHost(ptr); }
473 PinnedBuffer() =
default;
474 PinnedBuffer(
const PinnedBuffer&) =
delete;
475 PinnedBuffer& operator=(
const PinnedBuffer&) =
delete;
478 bool ensureCapacity(
size_t bytes) {
479 if (capacity >= bytes)
return true;
480 if (ptr) { cudaFreeHost(ptr); ptr =
nullptr; capacity = 0; }
481 if (cudaHostAlloc(&ptr, bytes, cudaHostAllocDefault) != cudaSuccess)
return false;
488 struct DeviceBuffer {
492 ~DeviceBuffer() {
if (ptr) cudaFree(ptr); }
493 DeviceBuffer() =
default;
494 DeviceBuffer(
const DeviceBuffer&) =
delete;
495 DeviceBuffer& operator=(
const DeviceBuffer&) =
delete;
498 bool ensureCapacity(
size_t bytes) {
499 if (capacity >= bytes)
return true;
500 if (ptr) { cudaFree(ptr); ptr =
nullptr; capacity = 0; }
501 if (cudaMalloc(&ptr, bytes) != cudaSuccess)
return false;
509 Stage* addRawStage(Stage* stage);
511 struct OutputBuffer {
514 size_t allocated_size;
518 std::vector<OutputBuffer> getOutputBuffers()
const;
520 static void* loadCompressedData(
521 const std::string& filename,
522 const FZMFileHeader& header,
523 cudaStream_t stream = 0,
524 MemoryPool* pool =
nullptr
528 std::pair<std::vector<Stage*>, std::vector<Stage*>> identifyTopology();
529 void setupInputBuffers(
const std::vector<Stage*>& sources);
530 int autoDetectUnconnectedOutputs();
531 void detectMultiOutputScenario(
int pipeline_outputs);
532 void configureStreamsIfNeeded();
535 void typeCheckConnections();
536 void computeInputAlignment();
537 void refinePoolSize();
538 void setupGraphModeInput();
539 void preallocatePadBuffer();
540 void preallocateConcatBuffers();
544 std::pair<const void*, size_t> prepareInputSource(
545 const void* d_input,
size_t input_size, cudaStream_t stream);
552 void propagateBufferSizes(
bool force_from_current_inputs =
false);
554 std::vector<Stage*> getSourceStages()
const;
555 std::vector<Stage*> getSinkStages()
const;
560 struct FwdStageDesc {
562 std::vector<int> output_buf_ids;
563 std::vector<int> input_buf_ids;
567 using PipelineOutputMap = std::unordered_map<int, std::pair<void*, size_t>>;
570 void buildOrReuseInvCache(
571 const PipelineOutputMap& po_map,
574 cudaStream_t stream);
577 static size_t computeFilePoolSize(
const FZMFileHeader& fh,
size_t pool_override_bytes);
578 static std::pair<std::vector<std::unique_ptr<Stage>>, std::vector<FwdStageDesc>>
579 reconstructForwardTopology(
const FZMFileHeader& fh);
580 static std::unordered_map<Stage*, size_t> buildSourceSizesFromHeader(
581 const FZMFileHeader& fh,
const std::vector<FwdStageDesc>& fwd_topology);
588 static std::pair<std::unique_ptr<CompressionDAG>,
589 std::unordered_map<Stage*, int>>
591 const std::vector<FwdStageDesc>& fwd_stages,
592 const PipelineOutputMap& pipeline_outputs,
594 MemoryStrategy strategy,
595 const std::unordered_map<Stage*, size_t>& source_sizes,
596 bool enable_profiling
601 struct OutputBufferInfo {
605 std::string stage_name;
606 std::string output_name;
609 std::vector<OutputBufferInfo> collectOutputBuffers()
const;
612 size_t calculateConcatSize(
const std::vector<OutputBufferInfo>& outputs)
const;
614 size_t writeConcatBuffer(
615 const std::vector<OutputBufferInfo>& outputs,
616 uint8_t* d_concat_bytes,
620 void concatOutputs(
void** d_output,
size_t* output_size, cudaStream_t stream);
624 std::unique_ptr<MemoryPool> mem_pool_;
625 std::unique_ptr<CompressionDAG> dag_;
628 std::vector<std::unique_ptr<Stage>> stages_;
629 std::unordered_map<Stage*, DAGNode*> stage_to_node_;
631 struct ConnectionInfo {
634 std::string output_name;
637 std::vector<ConnectionInfo> connections_;
641 bool warmup_on_finalize_;
642 bool pool_managed_decomp_;
647 bool was_compressed_;
649 bool profiling_enabled_;
650 PipelinePerfResult last_perf_result_;
652 std::vector<DAGNode*> input_nodes_;
653 std::vector<DAGNode*> output_nodes_;
654 std::vector<int> input_buffer_ids_;
655 std::vector<int> output_buffer_ids_;
657 PoolBuffer d_concat_buffer_;
662 std::vector<void*> d_decomp_outputs_;
665 PinnedBuffer h_concat_header_;
667 PinnedBuffer h_copy_descs_;
668 DeviceBuffer d_copy_descs_;
674 std::vector<size_t> source_input_sizes_;
678 size_t input_alignment_bytes_;
679 PoolBuffer d_pad_buf_;
683 size_t original_input_size_;
685 size_t input_size_hint_;
686 float pool_multiplier_;
690 std::array<size_t, 3> dims_;
701 std::unique_ptr<CompressionDAG> inv_dag;
702 std::unordered_map<Stage*, int> inv_result_map;
703 std::unordered_map<int, int> fwd_to_inv_ext_buf;
704 std::unordered_map<Stage*, size_t> source_sizes;
706 std::unique_ptr<InvDAGCache> inv_cache_;
708 struct BufferMetadata {
711 size_t allocated_size;
716 std::vector<BufferMetadata> buffer_metadata_;
718 bool graph_mode_enabled_;
719 bool graph_captured_;
723 PoolBuffer d_graph_input_;
724 size_t d_graph_input_size_;
726 cudaGraph_t captured_graph_;
727 cudaGraphExec_t graph_exec_;