这是indexloc提供的服务,不要输入任何密码
Skip to content

nxdir-s/pipelines

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

53 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Go Reference Go Report Card Go Coverage

pipelines

Pipelines contains generic functions that help with concurrent processing

Usage

A pipeline can be created from a slice or map

stream := pipelines.StreamSlice(ctx, data)

Or from a generator function

func GenerateData(ctx context.Context) int { return rand.Intn(10) }

stream := pipelines.GenerateStream(ctx, GenerateData)

FanOut

FanOut can be used to process data concurrently. Useful for I/O bound processes, but it can be used in any situation where you have a slice or map of data and want to introduce concurrency

const MaxFan int = 3

fanOutChannels := pipelines.FanOut(ctx, stream, ProcessFunc, MaxFan)

FanIn

FanIn can be used to merge data into one channel

fanInData := pipelines.FanIn(ctx, fanOutChannels...)

Example

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "strconv"
    "time"

    "github.com/nxdir-s/pipelines"
)

const (
    MaxFan int = 3
)

func GenerateData(ctx context.Context) int {
    return rand.Intn(5)
}

func Process(ctx context.Context, timeout int) string {
    select {
    case <-ctx.Done():
        return "context cancelled"
    case <-time.After(time.Second * time.Duration(timeout)):
        return "slept for " + strconv.Itoa(timeout) + " seconds!"
    }
}

func Read(ctx context.Context, messages <-chan string) {
    for msg := range messages {
        select {
        case <-ctx.Done():
            return
        default:
            fmt.Fprintf(os.Stdout, "%s\n", msg)
        }
    }
}

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    stream := pipelines.GenerateStream(ctx, GenerateData)
    fanOutChannels := pipelines.FanOut(ctx, stream, Process, MaxFan)
    messages := pipelines.FanIn(ctx, fanOutChannels...)

    go Read(ctx, messages)

    select {
    case <-ctx.Done():
        fmt.Fprint(os.Stdout, "context canceled, exiting...\n")
        os.Exit(0)
    }
}

About

pipelines contains generic functions for concurrent processing

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages