Skip to content
nicerobot edited this page Nov 7, 2025 · 1 revision

pipe

Pipeline composition for gloo.foo commands

The pipe package connects multiple commands together, passing the output of one as input to the next - just like Unix shell pipes. This is a core component of the gloo.foo framework.

Installation

go get github.com/gloo-foo/pipe

Usage

Basic Pipeline

import (
    "github.com/gloo-foo/pipe"
    gloo "github.com/gloo-foo/framework"
)

pipeline := pipe.Pipeline(
    cmd1,
    cmd2,
    cmd3,
)

gloo.Run(pipeline)

Single Command

When given a single command, Pipeline() simply returns that command's executor with no piping overhead:

// No pipe created - just runs the command
pipeline := pipe.Pipeline(command)

Nested Pipelines

Since Pipeline() returns a gloo.Command, you can nest pipelines:

// Create a reusable sub-pipeline
subPipeline := pipe.Pipeline(cmd1, cmd2)

// Use it in a larger pipeline
main := pipe.Pipeline(
    sourceCmd,
    subPipeline,  // Nested pipeline
    outputCmd,
)

Options

PipeFail

Controls whether the pipeline fails on the first error or only if the last command fails:

// Default: fail only if last command fails
pipeline := pipe.Pipeline(cmd1, cmd2, cmd3)

// Fail if ANY command fails
pipeline := pipe.Pipeline(
    cmd1, cmd2, cmd3,
    pipe.PipeFail,
)

// Explicit: fail only on last command
pipeline := pipe.Pipeline(
    cmd1, cmd2, cmd3,
    pipe.NoPipeFail,
)

Behavior:

  • NoPipeFail (default): Only returns error from the last command (like shell |)
  • PipeFail: Returns error from any command that fails (like shell set -o pipefail)

How It Works

Concurrent Execution

Commands run concurrently in goroutines, connected by io.Pipe:

// All three commands run simultaneously
pipeline := pipe.Pipeline(
    sourceCommand,   // Produces data
    filterCommand,   // Processes as data arrives
    outputCommand,   // Writes as results arrive
)

This provides true streaming behavior - downstream commands start processing before upstream commands finish.

Context Cancellation

The pipeline respects context cancellation and propagates it to all commands:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err := gloo.RunWithContext(ctx, pipeline)

When cancelled:

  • All commands receive the cancelled context
  • Pipes are closed to unblock any waiting commands
  • Cleanup happens gracefully

Early Termination

When a downstream command finishes early, the pipeline handles it gracefully:

// Works correctly - producer stops when limiter finishes
pipeline := pipe.Pipeline(
    infiniteProducer,  // Produces infinite output
    takeTen,          // Takes only 10 items
)

Upstream commands receive context cancellation and io.ErrClosedPipe, which the pipeline treats as normal completion (not an error).

Examples

Three-Stage Pipeline

pipeline := pipe.Pipeline(
    readCommand,
    processCommand,
    writeCommand,
)

gloo.MustRun(pipeline)

With Error Handling

pipeline := pipe.Pipeline(
    cmd1,
    cmd2,
    cmd3,
    pipe.PipeFail,  // Fail fast on any error
)

err := gloo.Run(pipeline)
if err != nil {
    log.Fatalf("Pipeline failed: %v", err)
}

Reusable Sub-Pipelines

// Define reusable processing steps
func transformPipeline() gloo.Command {
    return pipe.Pipeline(
        filterCommand,
        transformCommand,
        aggregateCommand,
    )
}

// Use in different contexts
process1 := pipe.Pipeline(
    source1,
    transformPipeline(),
    output1,
)

process2 := pipe.Pipeline(
    source2,
    transformPipeline(),
    output2,
)

Building Pipelines Dynamically

commands := []gloo.Command{
    readCommand,
}

if verbose {
    commands = append(commands, logCommand)
}

commands = append(commands,
    processCommand,
    writeCommand,
)

pipeline := pipe.Pipeline(commands...)

Error Handling

Default Behavior (NoPipeFail)

Like Unix shells, only the last command's error matters:

pipeline := pipe.Pipeline(
    cmd1,  // If this fails, error is ignored
    cmd2,  // If this fails, error is ignored
    cmd3,  // Only this command's error is returned
)

This is useful when intermediate commands might fail but you only care about the final result.

PipeFail Behavior

Returns the first error encountered:

pipeline := pipe.Pipeline(
    cmd1,  // Error here stops the pipeline
    cmd2,  // Error here stops the pipeline
    cmd3,  // Error here stops the pipeline
    pipe.PipeFail,
)

Use this when any failure should stop processing.

Technical Details

Goroutine Management

Each command runs in its own goroutine with proper synchronization:

  • sync.WaitGroup ensures all commands complete before returning
  • Pipes are closed in proper order to prevent deadlocks
  • Context cancellation propagates to all commands

Pipe Cleanup

The pipeline handles pipe lifecycle automatically:

  • Pipe readers closed when downstream finishes reading
  • Pipe writers closed when upstream finishes writing
  • Early termination handled gracefully (no broken pipe errors)

Error Collection

Errors are collected with their command index:

  • Thread-safe error collection with sync.Mutex
  • Preserves which command failed
  • Returns meaningful error messages with command index

API Reference

Pipeline(parameters ...any) gloo.Command

Creates a pipeline from commands and options.

Parameters:

  • gloo.Command - Commands to connect in sequence
  • pipe.PipeFail - Option to fail on any error
  • pipe.NoPipeFail - Option to fail only on last command error (default)

Returns: A gloo.Command that can be executed or composed into other pipelines

Example:

p := pipe.Pipeline(cmd1, cmd2, pipe.PipeFail)

Options

PipeFail / NoPipeFail

const PipeFail   pipeFailFlag = true
const NoPipeFail pipeFailFlag = false

Controls error handling behavior in the pipeline.

Integration with gloo.foo

Commands are First-Class

Since Pipeline() returns a gloo.Command, pipes work seamlessly with the framework:

// Assign to a variable
myPipeline := pipe.Pipeline(cmd1, cmd2)

// Pass to functions
func process(cmd gloo.Command) {
    gloo.Run(cmd)
}
process(myPipeline)

// Nest in other pipelines
outerPipeline := pipe.Pipeline(cmd0, myPipeline, cmd3)

Works with Any gloo.foo Command

The pipe package works with any command built using the gloo.foo framework:

import "github.com/yourorg/customcommand"

pipeline := pipe.Pipeline(
    customcommand.New(options),
    anotherCommand,
)

Best Practices

✅ Do

  • Use PipeFail for critical pipelines where any error matters
  • Nest pipelines to create reusable processing components
  • Let context cancellation handle timeouts
  • Use single-command pipelines when you want consistent interface
  • Build pipelines dynamically when needed

❌ Don't

  • Mix error handling expectations (be clear about PipeFail vs NoPipeFail)
  • Forget that commands run concurrently (side effects may happen out of order)
  • Create circular pipeline dependencies

Performance Characteristics

  • Memory: Streaming architecture - processes data as it flows
  • Concurrency: All commands run concurrently in goroutines
  • Latency: Low - downstream starts processing immediately
  • Throughput: High - parallel processing of pipeline stages

See Also

License

GNU Affero General Public License v3.0 (AGPL-3.0) - see LICENSE file for details.