42 size_t input_data_size = 0,
44 float pool_multiplier = 3.0f
54 explicit Pipeline(
const std::string& config_path);
72 void setDims(
size_t x,
size_t y = 1,
size_t z = 1) { dims_ = {x, y, z}; }
73 void setDims(std::array<size_t, 3> dims) { dims_ = dims; }
74 std::array<size_t, 3> getDims()
const {
return dims_; }
82 template<
typename StageT,
typename... Args>
95 int connect(
Stage* dependent,
const std::vector<Stage*>& producers);
113 bool isWarmupOnFinalizeEnabled()
const {
return warmup_on_finalize_; }
122 bool isPoolManagedDecompOutput()
const {
return pool_managed_decomp_; }
159 return original_input_size_ > 0 ? original_input_size_ : input_size_;
180 cudaStream_t stream = 0
213 size_t output_buf_capacity,
214 size_t* actual_output_size,
215 cudaStream_t stream = 0
237 cudaStream_t stream = 0
264 size_t output_buf_capacity,
265 size_t* actual_output_size,
266 cudaStream_t stream = 0
270 void reset(cudaStream_t stream = 0);
279 bool isProfilingEnabled()
const {
return profiling_enabled_; }
307 bool isBoundsCheckEnabled()
const {
return dag_->isBoundsCheckEnabled(); }
315 bool isColoringEnabled()
const {
return dag_->isColoringEnabled(); }
316 size_t getColorRegionCount()
const {
return dag_->getColorRegionCount(); }
328 bool isGraphModeEnabled()
const {
return graph_mode_enabled_; }
341 bool isGraphCaptured()
const {
return graph_captured_; }
343 size_t getPeakMemoryUsage()
const;
344 size_t getCurrentMemoryUsage()
const;
345 void printPipeline()
const;
352 std::vector<FZMStageInfo> stages;
353 std::vector<FZMBufferEntry> buffers;
357 void writeToFile(
const std::string& filename, cudaStream_t stream = 0);
380 const std::string& filename,
383 cudaStream_t stream = 0,
385 size_t pool_override_bytes = 0
406 const std::string& filename,
409 cudaStream_t stream = 0,
449 ~PoolBuffer() { free(0); }
450 PoolBuffer() =
default;
451 PoolBuffer(
const PoolBuffer&) =
delete;
452 PoolBuffer& operator=(
const PoolBuffer&) =
delete;
454 void free(cudaStream_t s) {
455 if (ptr && pool) { pool->
free(ptr, s); ptr =
nullptr; capacity = 0; }
457 bool allocate(MemoryPool* p,
size_t bytes, cudaStream_t s,
458 const char* tag,
bool persistent =
false) {
461 ptr = pool->
allocate(bytes, s, tag, persistent);
462 if (ptr) capacity = bytes;
463 return ptr !=
nullptr;
468 struct PinnedBuffer {
472 ~PinnedBuffer() {
if (ptr) cudaFreeHost(ptr); }
473 PinnedBuffer() =
default;
474 PinnedBuffer(
const PinnedBuffer&) =
delete;
475 PinnedBuffer& operator=(
const PinnedBuffer&) =
delete;
478 bool ensureCapacity(
size_t bytes) {
479 if (capacity >= bytes)
return true;
480 if (ptr) { cudaFreeHost(ptr); ptr =
nullptr; capacity = 0; }
481 if (cudaHostAlloc(&ptr, bytes, cudaHostAllocDefault) != cudaSuccess)
return false;
488 struct DeviceBuffer {
492 ~DeviceBuffer() {
if (ptr) cudaFree(ptr); }
493 DeviceBuffer() =
default;
494 DeviceBuffer(
const DeviceBuffer&) =
delete;
495 DeviceBuffer& operator=(
const DeviceBuffer&) =
delete;
498 bool ensureCapacity(
size_t bytes) {
499 if (capacity >= bytes)
return true;
500 if (ptr) { cudaFree(ptr); ptr =
nullptr; capacity = 0; }
501 if (cudaMalloc(&ptr, bytes) != cudaSuccess)
return false;
509 Stage* addRawStage(Stage* stage);
511 struct OutputBuffer {
514 size_t allocated_size;
518 std::vector<OutputBuffer> getOutputBuffers()
const;
520 static void* loadCompressedData(
521 const std::string& filename,
522 const FZMFileHeader& header,
523 cudaStream_t stream = 0,
524 MemoryPool* pool =
nullptr
528 std::pair<std::vector<Stage*>, std::vector<Stage*>> identifyTopology();
529 void setupInputBuffers(
const std::vector<Stage*>& sources);
530 int autoDetectUnconnectedOutputs();
531 void detectMultiOutputScenario(
int pipeline_outputs);
532 void configureStreamsIfNeeded();
535 void typeCheckConnections();
536 void computeInputAlignment();
537 void notifyStagesFinalizeHooks();
538 void refinePoolSize();
539 void setupGraphModeInput();
540 void preallocatePadBuffer();
541 void preallocateConcatBuffers();
545 std::pair<const void*, size_t> prepareInputSource(
546 const void* d_input,
size_t input_size, cudaStream_t stream);
553 void propagateBufferSizes(
bool force_from_current_inputs =
false);
555 std::vector<Stage*> getSourceStages()
const;
556 std::vector<Stage*> getSinkStages()
const;
561 struct FwdStageDesc {
563 std::vector<int> output_buf_ids;
564 std::vector<int> input_buf_ids;
568 using PipelineOutputMap = std::unordered_map<int, std::pair<void*, size_t>>;
571 void buildOrReuseInvCache(
572 const PipelineOutputMap& po_map,
575 cudaStream_t stream);
578 static size_t computeFilePoolSize(
const FZMFileHeader& fh,
size_t pool_override_bytes);
579 static std::pair<std::vector<std::unique_ptr<Stage>>, std::vector<FwdStageDesc>>
580 reconstructForwardTopology(
const FZMFileHeader& fh);
581 static std::unordered_map<Stage*, size_t> buildSourceSizesFromHeader(
582 const FZMFileHeader& fh,
const std::vector<FwdStageDesc>& fwd_topology);
589 static std::pair<std::unique_ptr<CompressionDAG>,
590 std::unordered_map<Stage*, int>>
592 const std::vector<FwdStageDesc>& fwd_stages,
593 const PipelineOutputMap& pipeline_outputs,
596 const std::unordered_map<Stage*, size_t>& source_sizes,
597 bool enable_profiling
602 struct OutputBufferInfo {
606 std::string stage_name;
607 std::string output_name;
610 std::vector<OutputBufferInfo> collectOutputBuffers()
const;
613 size_t calculateConcatSize(
const std::vector<OutputBufferInfo>& outputs)
const;
615 size_t writeConcatBuffer(
616 const std::vector<OutputBufferInfo>& outputs,
617 uint8_t* d_concat_bytes,
621 void concatOutputs(
void** d_output,
size_t* output_size, cudaStream_t stream);
625 std::unique_ptr<MemoryPool> mem_pool_;
626 std::unique_ptr<CompressionDAG> dag_;
629 std::vector<std::unique_ptr<Stage>> stages_;
630 std::unordered_map<Stage*, DAGNode*> stage_to_node_;
632 struct ConnectionInfo {
635 std::string output_name;
638 std::vector<ConnectionInfo> connections_;
642 bool warmup_on_finalize_;
643 bool pool_managed_decomp_;
648 bool was_compressed_;
650 bool profiling_enabled_;
651 PipelinePerfResult last_perf_result_;
653 std::vector<DAGNode*> input_nodes_;
654 std::vector<DAGNode*> output_nodes_;
655 std::vector<int> input_buffer_ids_;
656 std::vector<int> output_buffer_ids_;
658 PoolBuffer d_concat_buffer_;
663 std::vector<void*> d_decomp_outputs_;
666 PinnedBuffer h_concat_header_;
668 PinnedBuffer h_copy_descs_;
669 DeviceBuffer d_copy_descs_;
675 std::vector<size_t> source_input_sizes_;
679 size_t input_alignment_bytes_;
680 PoolBuffer d_pad_buf_;
684 size_t original_input_size_;
686 size_t input_size_hint_;
687 float pool_multiplier_;
691 std::array<size_t, 3> dims_;
702 std::unique_ptr<CompressionDAG> inv_dag;
703 std::unordered_map<Stage*, int> inv_result_map;
704 std::unordered_map<int, int> fwd_to_inv_ext_buf;
705 std::unordered_map<Stage*, size_t> source_sizes;
707 std::unique_ptr<InvDAGCache> inv_cache_;
709 struct BufferMetadata {
712 size_t allocated_size;
717 std::vector<BufferMetadata> buffer_metadata_;
719 bool graph_mode_enabled_;
720 bool graph_captured_;
724 PoolBuffer d_graph_input_;
725 size_t d_graph_input_size_;
727 cudaGraph_t captured_graph_;
728 cudaGraphExec_t graph_exec_;