diff --git a/command_test.go b/command_test.go index 2af018d..68680bb 100644 --- a/command_test.go +++ b/command_test.go @@ -141,7 +141,8 @@ func TestRunAndAggregate(t *testing.T) { return dst.Write(bytes.ReplaceAll(line, []byte("hello"), []byte("goodbye"))) }) }, - expect: "goodbye world", + expect: "goodbye world", + expectError: true, // io.EOF }, { name: "multiple mapped output", @@ -154,7 +155,8 @@ func TestRunAndAggregate(t *testing.T) { return dst.Write(bytes.ReplaceAll(line, []byte("world"), []byte("jh"))) }) }, - expect: "goodbye jh", + expect: "goodbye jh", + expectError: true, // io.EOF }, } { c.Run(tc.name, func(c *qt.C) { diff --git a/go.mod b/go.mod index 1adfbf5..09515f0 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,8 @@ require ( github.com/djherbis/buffer v1.2.0 github.com/djherbis/nio/v3 v3.0.1 github.com/frankban/quicktest v1.14.3 - github.com/itchyny/gojq v0.12.9 + github.com/itchyny/gojq v0.12.11 + go.bobheadxi.dev/streamline v1.2.1 go.opentelemetry.io/otel v1.11.0 go.opentelemetry.io/otel/sdk v1.11.0 go.opentelemetry.io/otel/trace v1.11.0 @@ -16,10 +17,10 @@ require ( require ( github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/google/go-cmp v0.5.8 // indirect - github.com/itchyny/timefmt-go v0.1.4 // indirect + github.com/google/go-cmp v0.5.9 // indirect + github.com/itchyny/timefmt-go v0.1.5 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/rogpeppe/go-internal v1.6.1 // indirect - golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect + golang.org/x/sys v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index ee30f23..535d5e0 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,13 @@ bitbucket.org/creachadair/shell v0.0.7 h1:Z96pB6DkSb7F3Y3BBnJeOZH2gazyMTWlvecSD4vDqfk= bitbucket.org/creachadair/shell v0.0.7/go.mod h1:oqtXSSvSYr4624lnnabXHaBsYW6RD80caLi2b3hJk0U= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/djherbis/buffer v1.1.0/go.mod h1:VwN8VdFkMY0DCALdY8o00d3IZ6Amz/UNVMWcSaJT44o= github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ= github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE= github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4= github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg= +github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -14,19 +15,17 @@ github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= -github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/itchyny/gojq v0.12.7 h1:hYPTpeWfrJ1OT+2j6cvBScbhl0TkdwGM4bc66onUSOQ= -github.com/itchyny/gojq v0.12.7/go.mod h1:ZdvNHVlzPgUf8pgjnuDTmGfHA/21KoutQUJ3An/xNuw= -github.com/itchyny/gojq v0.12.9 h1:biKpbKwMxVYhCU1d6mR7qMr3f0Hn9F5k5YykCVb3gmM= -github.com/itchyny/gojq v0.12.9/go.mod h1:T4Ip7AETUXeGpD+436m+UEl3m3tokRgajd5pRfsR5oE= -github.com/itchyny/timefmt-go v0.1.3 h1:7M3LGVDsqcd0VZH2U+x393obrzZisp7C0uEe921iRkU= -github.com/itchyny/timefmt-go v0.1.3/go.mod h1:0osSSCQSASBJMsIZnhAaF1C2fCBTJZXrnj37mG8/c+A= -github.com/itchyny/timefmt-go v0.1.4 h1:hFEfWVdwsEi+CY8xY2FtgWHGQaBaC3JeHd+cve0ynVM= -github.com/itchyny/timefmt-go v0.1.4/go.mod h1:nEP7L+2YmAbT2kZ2HfSs1d8Xtw9LY8D2stDBckWakZ8= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hexops/autogold v1.3.1 h1:YgxF9OHWbEIUjhDbpnLhgVsjUDsiHDTyDfy2lrfdlzo= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/valast v1.4.3 h1:oBoGERMJh6UZdRc6cduE1CTPK+VAdXA59Y1HFgu3sm0= +github.com/itchyny/gojq v0.12.11 h1:YhLueoHhHiN4mkfM+3AyJV6EPcCxKZsOnYf+aVSwaQw= +github.com/itchyny/gojq v0.12.11/go.mod h1:o3FT8Gkbg/geT4pLI0tF3hvip5F3Y/uskjRz9OYa38g= +github.com/itchyny/timefmt-go v0.1.5 h1:G0INE2la8S6ru/ZI5JecgyzbbJNs5lG1RcBqa7Jm6GE= +github.com/itchyny/timefmt-go v0.1.5/go.mod h1:nEP7L+2YmAbT2kZ2HfSs1d8Xtw9LY8D2stDBckWakZ8= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -34,25 +33,27 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= -github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/nightlyone/lockfile v1.0.0 h1:RHep2cFKK4PonZJDdEl4GmkabuhbsRMgk/k3uAmxBiA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +go.bobheadxi.dev/streamline v1.2.1 h1:IqKSA1TbeuDqCzYNAwtlh8sqf3tsQus8XgJdkCWFT8c= +go.bobheadxi.dev/streamline v1.2.1/go.mod h1:yJsVXOSBFLgAKvsnf6WmIzmB2A65nWqkR/sRNxJPa74= go.opentelemetry.io/otel v1.11.0 h1:kfToEGMDq6TrVrJ9Vht84Y8y9enykSZzDDZglV0kIEk= go.opentelemetry.io/otel v1.11.0/go.mod h1:H2KtuEphyMvlhZ+F7tg9GRhAOe60moNx61Ex+WmiKkk= go.opentelemetry.io/otel/sdk v1.11.0 h1:ZnKIL9V9Ztaq+ME43IUi/eo22mNsb6a7tGfzaOWB5fo= go.opentelemetry.io/otel/sdk v1.11.0/go.mod h1:REusa8RsyKaq0OlyangWXaw97t2VogoO4SSEeKkSTAk= go.opentelemetry.io/otel/trace v1.11.0 h1:20U/Vj42SX+mASlXLmSGBg6jpI1jQtv682lZtTAOVFI= go.opentelemetry.io/otel/trace v1.11.0/go.mod h1:nyYjis9jy0gytE9LXGU+/m1sHTKbRY0fX0hulNNDP1U= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc= -golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= +golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= +golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/tools v0.4.0 h1:7mTAgkunk3fr4GAloyyCasadO6h9zSsQZbwvcaIciV4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +mvdan.cc/gofumpt v0.4.0 h1:JVf4NN1mIpHogBj7ABpgOyZc65/UUOkKQFkoURsz4MM= diff --git a/map.go b/map.go index 196437e..15afa87 100644 --- a/map.go +++ b/map.go @@ -1,10 +1,11 @@ package run import ( - "bufio" "bytes" "context" "io" + + "go.bobheadxi.dev/streamline/pipeline" ) // LineMap allows modifications of individual lines from Output and enables callbacks @@ -37,59 +38,37 @@ func MapJQ(query string) (LineMap, error) { }, nil } -type lineMaps []LineMap - -// Pipe applies lineMaps sequentially to dst from src, and returns the number of bytes -// read. -func (m lineMaps) Pipe(ctx context.Context, src io.Reader, dst io.Writer, close func()) (int64, error) { - if close != nil { - defer close() - } - - scanner := bufio.NewScanner(src) - - var buf bytes.Buffer - var totalWritten int64 - for scanner.Scan() { - line := scanner.Bytes() +type lineMapPipelineAdapter struct { + ctx context.Context + buffer *bytes.Buffer + lineMap LineMap +} - // Defaults to true because if no map funcs unset this, then we will write the - // entire line. - writeCalled := true +var _ pipeline.Pipeline = &lineMapPipelineAdapter{} - for _, f := range m { - tb := &tracedBuffer{Buffer: &buf} - buffered, err := f(ctx, line, tb) - if err != nil { - return totalWritten, err - } - writeCalled = tb.writeCalled +func (l *lineMapPipelineAdapter) Inactive() bool { return l == nil || l.lineMap == nil } - // Nothing written => end - if buffered == 0 { - break - } +func (l *lineMapPipelineAdapter) ProcessLine(line []byte) ([]byte, error) { + // Use a shared buffer when applying this LineMap - it gets reset on each + // line, and lines are processed synchronously. + l.buffer.Reset() - // Copy bytes and reset for the next map - line = make([]byte, buf.Len()) - copy(line, buf.Bytes()) - buf.Reset() - } + buf := tracedBuffer{Buffer: l.buffer} + _, err := l.lineMap(l.ctx, line, &buf) + if !buf.writeCalled || err != nil { + return nil, err // omit the line or return the error + } + return buf.Bytes(), nil +} - // If anything was written, or a write was called even with an ending, treat it as - // a line and add a line ending for convenience, unless it already has a line - // ending. - if writeCalled && !bytes.HasSuffix(line, []byte("\n")) { - written, err := dst.Write(append(line, '\n')) - totalWritten += int64(written) - if err != nil { - return totalWritten, err - } - } +type tracedBuffer struct { + // writeCalled indicates that Write was called at all, even with empty input. + writeCalled bool - // Reset for next line - buf.Reset() - } + *bytes.Buffer +} - return totalWritten, scanner.Err() +func (t *tracedBuffer) Write(b []byte) (int, error) { + t.writeCalled = true + return t.Buffer.Write(b) } diff --git a/output.go b/output.go index f57123c..b9aac8e 100644 --- a/output.go +++ b/output.go @@ -1,14 +1,16 @@ package run import ( + "bytes" "context" "fmt" "io" "os/exec" - "strings" "sync" "github.com/djherbis/nio/v3" + "go.bobheadxi.dev/streamline" + "go.bobheadxi.dev/streamline/pipeline" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -19,11 +21,17 @@ import ( // of outputs, such as multi-outputs and error-only outputs, without complicating the core // commandOutput implementation. type Output interface { - // Map adds a LineMap function to be applied to this Output. It is only applied at - // aggregation time using e.g. Stream, Lines, and so on. Multiple LineMaps are applied - // sequentially, with the result of previous LineMaps propagated to subsequent - // LineMaps. + // Map adds a LineMap function to be applied to this Output. + // + // Deprecated: Use Pipeline instead. Map(f LineMap) Output + // Pipeline is similar to Map, but uses a new interface that provides more flexible + // ways of manipulating output on the stream. It is only applied at aggregation time + // using e.g. Stream, Lines, and so on. Multiple Pipelines are applied sequentially, + // with the result of previous Pipelines propagated to subsequent Pipelines. + // + // For more details, refer to the pipeline.Pipeline documentation. + Pipeline(p pipeline.Pipeline) Output // TODO wishlist functionality // Mode(mode OutputMode) Output @@ -64,16 +72,9 @@ type Output interface { type commandOutput struct { ctx context.Context - // reader is set to the reader side of the output pipe. It does not have mapFuncs - // applied, they are applied at aggregation time. When the command exits, the error - // should be raised from the reader after the reader's buffer is exhausted. - reader io.ReadCloser - - // mapFuncs define LineMaps to be applied at aggregation time. - mapFuncs lineMaps - - // mappedData is set by incremental aggregators like Read, and holds mapped results. - mappedData io.Reader + // stream is the underlying output aggregation implementation. It reads from a + // read side of a pipe which receives output from a command. + stream *streamline.Stream // waitAndCloseFunc should only be called via doWaitOnce(). It should wait for command // exit and handle setting an error such that once reads from reader are complete, the @@ -157,7 +158,7 @@ func attachAndRun( output := &commandOutput{ ctx: ctx, - reader: outputReader, + stream: streamline.New(outputReader), } output.waitAndCloseFunc = func() error { @@ -183,7 +184,15 @@ func attachAndRun( } func (o *commandOutput) Map(f LineMap) Output { - o.mapFuncs = append(o.mapFuncs, f) + return o.Pipeline(&lineMapPipelineAdapter{ + ctx: o.ctx, + buffer: &bytes.Buffer{}, + lineMap: f, + }) +} + +func (o *commandOutput) Pipeline(p pipeline.Pipeline) Output { + o.stream = o.stream.WithPipeline(p) return o } @@ -199,10 +208,7 @@ func (o *commandOutput) StreamLines(dst func(line string)) error { go o.waitAndClose() - _, err := o.mapFuncs.Pipe(o.ctx, o.reader, newLineWriter(func(b []byte) { - dst(string(b)) - }), nil) - return err + return o.stream.Stream(dst) } func (o *commandOutput) Lines() ([]string, error) { @@ -210,22 +216,7 @@ func (o *commandOutput) Lines() ([]string, error) { go o.waitAndClose() - // export lines - linesC := make(chan string, 3) - errC := make(chan error) - go func() { - dst := newLineWriter(func(line []byte) { linesC <- string(line) }) - _, err := o.mapFuncs.Pipe(o.ctx, o.reader, dst, func() { close(linesC) }) - errC <- err - }() - - // aggregate lines from results - lines := make([]string, 0, 10) - for line := range linesC { - lines = append(lines, line) - } - - return lines, <-errC + return o.stream.Lines() } func (o *commandOutput) JQ(query string) ([]byte, error) { @@ -244,11 +235,9 @@ func (o *commandOutput) JQ(query string) ([]byte, error) { func (o *commandOutput) String() (string, error) { trace.SpanFromContext(o.ctx).AddEvent("String") - var sb strings.Builder - if err := o.Stream(&sb); err != nil { - return sb.String(), err - } - return strings.TrimSuffix(sb.String(), "\n"), nil + go o.waitAndClose() + + return o.stream.String() } func (o *commandOutput) Read(p []byte) (int, error) { @@ -256,23 +245,7 @@ func (o *commandOutput) Read(p []byte) (int, error) { go o.waitAndClose() - if len(o.mapFuncs) == 0 { - // Happy path, just read - return o.reader.Read(p) - } - - // Otherwise, we can only really read the whole thing and send the data back bit by - // bit as read requests come in. - if o.mappedData == nil { - reader, writer := nio.Pipe(makeUnboundedBuffer()) - go func() { - _, err := o.mapFuncs.Pipe(o.ctx, o.reader, writer, nil) - writer.CloseWithError(err) - }() - o.mappedData = reader - } - - return o.mappedData.Read(p) + return o.stream.Read(p) } // WriteTo implements io.WriterTo, and returns int64 instead of int because of: @@ -282,21 +255,13 @@ func (o *commandOutput) WriteTo(dst io.Writer) (int64, error) { go o.waitAndClose() - if len(o.mapFuncs) == 0 { - // Happy path, directly pipe output - return io.Copy(dst, o.reader) - } - - return o.mapFuncs.Pipe(o.ctx, o.reader, dst, nil) + return o.stream.WriteTo(dst) } func (o *commandOutput) Wait() error { trace.SpanFromContext(o.ctx).AddEvent("Wait") - err := o.waitAndClose() - // Wait does not consume output, so prevent further reads from occuring. - o.reader.Close() - return err + return o.waitAndClose() } // waitAndClose waits for command completion and closes the write half of the reader. Most diff --git a/output_error.go b/output_error.go index 4a0218b..76fcacf 100644 --- a/output_error.go +++ b/output_error.go @@ -1,6 +1,10 @@ package run -import "io" +import ( + "io" + + "go.bobheadxi.dev/streamline/pipeline" +) type errorOutput struct{ err error } @@ -9,9 +13,10 @@ type errorOutput struct{ err error } // before command execution. func NewErrorOutput(err error) Output { return &errorOutput{err: err} } -func (o *errorOutput) StdErr() Output { return o } -func (o *errorOutput) StdOut() Output { return o } -func (o *errorOutput) Map(LineMap) Output { return o } +func (o *errorOutput) StdErr() Output { return o } +func (o *errorOutput) StdOut() Output { return o } +func (o *errorOutput) Map(LineMap) Output { return o } +func (o *errorOutput) Pipeline(pipeline.Pipeline) Output { return o } func (o *errorOutput) Stream(io.Writer) error { return o.err } func (o *errorOutput) StreamLines(func(string)) error { return o.err } diff --git a/writer.go b/writer.go deleted file mode 100644 index f7710e6..0000000 --- a/writer.go +++ /dev/null @@ -1,38 +0,0 @@ -package run - -import ( - "bufio" - "bytes" - "io" -) - -type lineWriter struct { - handler func([]byte) -} - -func newLineWriter(handler func([]byte)) io.Writer { - return &lineWriter{handler: handler} -} - -func (lw *lineWriter) Write(b []byte) (int, error) { - n := len(b) - - scanner := bufio.NewScanner(bytes.NewReader(b)) - for scanner.Scan() { - lw.handler(scanner.Bytes()) - } - - return n, nil -} - -type tracedBuffer struct { - // writeCalled indicates that Write was called at all, even with empty input. - writeCalled bool - - *bytes.Buffer -} - -func (t *tracedBuffer) Write(b []byte) (int, error) { - t.writeCalled = true - return t.Buffer.Write(b) -}