Pipelines contains generic functions that help with concurrent processing
A pipeline can be created from a slice or map
stream := pipelines.StreamSlice(ctx, data)
Or from a generator function
func GenerateData(ctx context.Context) int { return rand.Intn(10) }
stream := pipelines.GenerateStream(ctx, GenerateData)
FanOut
can be used to process data concurrently. Useful for I/O bound processes, but it can be used in any situation where you have a slice or map of data and want to introduce concurrency
const MaxFan int = 3
fanOutChannels := pipelines.FanOut(ctx, stream, ProcessFunc, MaxFan)
FanIn
can be used to merge data into one channel
fanInData := pipelines.FanIn(ctx, fanOutChannels...)
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"strconv"
"time"
"github.com/nxdir-s/pipelines"
)
const (
MaxFan int = 3
)
func GenerateData(ctx context.Context) int {
return rand.Intn(5)
}
func Process(ctx context.Context, timeout int) string {
select {
case <-ctx.Done():
return "context cancelled"
case <-time.After(time.Second * time.Duration(timeout)):
return "slept for " + strconv.Itoa(timeout) + " seconds!"
}
}
func Read(ctx context.Context, messages <-chan string) {
for msg := range messages {
select {
case <-ctx.Done():
return
default:
fmt.Fprintf(os.Stdout, "%s\n", msg)
}
}
}
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
stream := pipelines.GenerateStream(ctx, GenerateData)
fanOutChannels := pipelines.FanOut(ctx, stream, Process, MaxFan)
messages := pipelines.FanIn(ctx, fanOutChannels...)
go Read(ctx, messages)
select {
case <-ctx.Done():
fmt.Fprint(os.Stdout, "context canceled, exiting...\n")
os.Exit(0)
}
}