这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 27 additions & 231 deletions cmd/humioctl/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ package main

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"golang.org/x/sync/errgroup"
"github.com/humio/cli/shipper"
"io"
"io/ioutil"
"log"
"math"
"net/http"
"os"
"os/signal"
"regexp"
Expand All @@ -24,53 +20,7 @@ import (
"github.com/spf13/cobra"
)

type eventList struct {
Type string `json:"type"`
Fields map[string]string `json:"fields"`
Messages []string `json:"messages"`
}

type lineHandler interface {
handleLine(line string)
}

type multiLineHandlerMode int

const (
multiLineHandlerModeBeginsWith multiLineHandlerMode = iota
multiLineHandlerModeContinuesWith
)

type multiLineHandler struct {
lineHandler lineHandler
regex *regexp.Regexp
mode multiLineHandlerMode
buf bytes.Buffer
}

func (h *multiLineHandler) handleLine(line string) {
isMatch := h.regex.MatchString(line)

switch h.mode {
case multiLineHandlerModeBeginsWith:
if isMatch {
fullLine := h.buf.String()
h.buf.Reset()
h.lineHandler.handleLine(fullLine)
}

case multiLineHandlerModeContinuesWith:
if !isMatch {
fullLine := h.buf.String()
h.buf.Reset()
h.lineHandler.handleLine(fullLine)
}
}
h.buf.WriteString(line)
h.buf.WriteString("\n")
}

func tailFile(filepath string, quiet bool, seekToEnd bool, handler lineHandler) error {
func tailFile(filepath string, quiet bool, seekToEnd bool, handler shipper.LineHandler) error {
tailConfig := tail.Config{Follow: true}
if seekToEnd {
tailConfig.Location = &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}
Expand All @@ -83,7 +33,7 @@ func tailFile(filepath string, quiet bool, seekToEnd bool, handler lineHandler)
}

for line := range t.Lines {
handler.handleLine(line.Text)
handler.HandleLine(line.Text)
if !quiet {
fmt.Println(line.Text)
}
Expand All @@ -96,7 +46,7 @@ func tailFile(filepath string, quiet bool, seekToEnd bool, handler lineHandler)
return err
}

func streamStdin(repo string, quiet bool, handler lineHandler) error {
func streamStdin(repo string, quiet bool, handler shipper.LineHandler) error {
log.Println("Humio Attached to StdIn, Forwarding to '" + repo + "'")

var reader io.Reader = os.Stdin
Expand All @@ -106,7 +56,7 @@ func streamStdin(repo string, quiet bool, handler lineHandler) error {

scanner := bufio.NewScanner(reader)
for scanner.Scan() {
handler.handleLine(scanner.Text())
handler.HandleLine(scanner.Text())
}

return scanner.Err()
Expand Down Expand Up @@ -140,159 +90,6 @@ func waitForInterrupt() {
<-done
}

type logSenderErrorBehaviour int

const (
logSenderErrorBehaviourDrop logSenderErrorBehaviour = iota
logSenderErrorBehaviourCrash
)

type logSender struct {
apiClient *api.Client
url string
fields map[string]string
parserName string
events chan string
finishedSending chan struct{}
maxAttemptsPerBatch int
errorBehaviour logSenderErrorBehaviour
batchSizeLines int
batchSizeBytes int
batchTimeout time.Duration
}

func (s *logSender) handleLine(line string) {
s.events <- line
}

func (s *logSender) finish() {
close(s.events)
<-s.finishedSending
}

func (s *logSender) start() {
go func() {
defer func() { close(s.finishedSending) }()
batch := make([]string, 0, s.batchSizeLines)

for {
bytes := 0

e, more := <-s.events
if !more {
break
}

batch = append(batch, e)
bytes += len(e)

timeout := time.After(s.batchTimeout)

if s.batchSizeBytes > 0 && bytes > s.batchSizeBytes {
goto send
}

loop:
for {
select {
case e, more := <-s.events:
if !more {
break loop
}
batch = append(batch, e)
bytes += len(e)
if len(batch) >= s.batchSizeLines || (s.batchSizeBytes > 0 && bytes > s.batchSizeBytes) {
break loop
}
case <-timeout:
break loop
}
}

send:
s.sendBatch(batch)

batch = batch[:0]
bytes = 0
}
if len(batch) > 0 {
s.sendBatch(batch)
}
}()
}

func (s *logSender) sendBatch(messages []string) {
ship := func() error {
var eg errgroup.Group

pr, pw := io.Pipe()

jsonBody := []eventList{{
Type: s.parserName,
Fields: s.fields,
Messages: messages,
}}

eg.Go(func() error {
defer pw.Close()
return json.NewEncoder(pw).Encode(jsonBody)
})

var resp *http.Response

eg.Go(func() error {
var err error
resp, err = s.apiClient.HTTPRequest(http.MethodPost, s.url, pr)
return err
})

err := eg.Wait()

if err != nil {
return err
}

if resp.StatusCode > 400 {
responseData, err := ioutil.ReadAll(resp.Body)

if err != nil {
return fmt.Errorf("error reading http response body: %w", err)
}

return fmt.Errorf("bad response while sending events (status='%s'): %s", resp.Status, responseData)
} else {
// discard the response in order to re-use the connection
_, _ = io.Copy(ioutil.Discard, resp.Body)
_ = resp.Body.Close()
}

return nil
}

var err error
for i := 0; i < s.maxAttemptsPerBatch; i++ {
if i > 0 {
backOff := time.Duration(0.5*math.Pow(2, float64(i-1))*1000) * time.Millisecond
log.Printf("Backoff for %v...", backOff)
time.Sleep(backOff)
}
err = ship()
if err == nil {
break
}
log.Printf("Error while sending logs to Humio. Retrying %d more times. Error message: %v", s.maxAttemptsPerBatch-i-1, err)
}

if err != nil {
switch s.errorBehaviour {
case logSenderErrorBehaviourCrash:
log.Fatalf("Error sending logs to Humio: %v", err)
case logSenderErrorBehaviourDrop:
log.Printf("Error sending logs to Humio, dropping %d events: %v", len(messages), err)
}
}
}

func newIngestCmd() *cobra.Command {
var parserName, filepath, label, ingestToken, multiLineBeginsWith, multiLineContinuesWith, fieldsJson string
var openBrowser, noSession, quiet, failOnError, tailSeekToEnd bool
Expand Down Expand Up @@ -385,41 +182,40 @@ has the same effect.`,
url = "api/v1/repositories/" + repo + "/ingest-messages"
}

sender := logSender{
apiClient: client,
url: url,
fields: fields,
parserName: parserName,
maxAttemptsPerBatch: retries + 1,
events: make(chan string, batchSizeLines),
finishedSending: make(chan struct{}),
batchSizeLines: batchSizeLines,
batchSizeBytes: batchSizeBytes,
batchTimeout: time.Duration(batchTimeoutMs) * time.Millisecond,
sender := shipper.LogShipper{
APIClient: client,
URL: url,
Fields: fields,
ParserName: parserName,
MaxAttemptsPerBatch: retries + 1,
BatchSizeLines: batchSizeLines,
BatchSizeBytes: batchSizeBytes,
BatchTimeout: time.Duration(batchTimeoutMs) * time.Millisecond,
Logger: log.Printf,
}

if failOnError {
sender.errorBehaviour = logSenderErrorBehaviourCrash
sender.ErrorBehaviour = shipper.ErrorBehaviourPanic
}

sender.start()
sender.Start()

var lineHandler lineHandler = &sender
var lineHandler shipper.LineHandler = &sender

switch {
case multiLineBeginsWith != "" && multiLineContinuesWith != "":
log.Fatalf("Cannot specify both --multiline-begins-with and --multiline-continues-with")
case multiLineBeginsWith != "":
lineHandler = &multiLineHandler{
lineHandler: lineHandler,
regex: regexp.MustCompile(multiLineBeginsWith),
mode: multiLineHandlerModeBeginsWith,
lineHandler = &shipper.MultiLineHandler{
LineHandler: lineHandler,
Regex: regexp.MustCompile(multiLineBeginsWith),
Mode: shipper.MultiLineHandlerModeBeginsWith,
}
case multiLineContinuesWith != "":
lineHandler = &multiLineHandler{
lineHandler: lineHandler,
regex: regexp.MustCompile(multiLineContinuesWith),
mode: multiLineHandlerModeContinuesWith,
lineHandler = &shipper.MultiLineHandler{
LineHandler: lineHandler,
Regex: regexp.MustCompile(multiLineContinuesWith),
Mode: shipper.MultiLineHandlerModeContinuesWith,
}
}

Expand All @@ -430,7 +226,7 @@ has the same effect.`,
err = streamStdin(repo, quiet, lineHandler)
}

sender.finish()
sender.Finish()

if err != nil {
log.Fatal(err)
Expand Down
43 changes: 43 additions & 0 deletions shipper/multiline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package shipper

import (
"bytes"
"regexp"
)

type MultiLineHandlerMode int

const (
MultiLineHandlerModeBeginsWith MultiLineHandlerMode = iota
MultiLineHandlerModeContinuesWith
)

type MultiLineHandler struct {
LineHandler LineHandler
Regex *regexp.Regexp
Mode MultiLineHandlerMode

buf bytes.Buffer
}

func (h *MultiLineHandler) HandleLine(line string) {
isMatch := h.Regex.MatchString(line)

switch h.Mode {
case MultiLineHandlerModeBeginsWith:
if isMatch {
fullLine := h.buf.String()
h.buf.Reset()
h.LineHandler.HandleLine(fullLine)
}

case MultiLineHandlerModeContinuesWith:
if !isMatch {
fullLine := h.buf.String()
h.buf.Reset()
h.LineHandler.HandleLine(fullLine)
}
}
h.buf.WriteString(line)
h.buf.WriteString("\n")
}
Loading