8000 [breaking] Simplified gRPC streams helpers (#1857) · arduino/arduino-cli@73b3b61 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 73b3b61

Browse files
authored
[breaking] Simplified gRPC streams helpers (#1857)
* Simplified gRPC streams helpers * Moved FeedStreamTo and ConsumeStreamFrom into deamon package and mde them private
1 parent 4a4d1fd commit 73b3b61

File tree

4 files changed

+51
-32
lines changed

4 files changed

+51
-32
lines changed

commands/daemon/daemon.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"io"
2323

2424
"github.com/arduino/arduino-cli/arduino"
25-
"github.com/arduino/arduino-cli/arduino/utils"
2625
"github.com/arduino/arduino-cli/commands"
2726
"github.com/arduino/arduino-cli/commands/board"
2827
"github.com/arduino/arduino-cli/commands/compile"
@@ -259,16 +258,14 @@ func (s *ArduinoCoreServerImpl) LoadSketch(ctx context.Context, req *rpc.LoadSke
259258

260259
// Compile FIXMEDOC
261260
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.ArduinoCoreService_CompileServer) error {
262-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
263-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
261+
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
262+
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
264263
compileResp, compileErr := compile.Compile(
265264
stream.Context(), req, outStream, errStream,
266265
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResponse{Progress: p}) },
267266
false) // Set debug to false
268267
outStream.Close()
269268
errStream.Close()
270-
<-outCtx.Done()
271-
<-errCtx.Done()
272269
var compileRespSendErr error
273270
if compileResp != nil {
274271
compileRespSendErr = stream.Send(compileResp)
@@ -346,31 +343,27 @@ func (s *ArduinoCoreServerImpl) PlatformList(ctx context.Context, req *rpc.Platf
346343

347344
// Upload FIXMEDOC
348345
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
349-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
350-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
346+
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
347+
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
351348
resp, err := upload.Upload(stream.Context(), req, outStream, errStream)
352349
outStream.Close()
353350
errStream.Close()
354351
if err != nil {
355352
return convertErrorToRPCStatus(err)
356353
}
357-
<-outCtx.Done()
358-
<-errCtx.Done()
359354
return stream.Send(resp)
360355
}
361356

362357
// UploadUsingProgrammer FIXMEDOC
363358
func (s *ArduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
364-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
365-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
359+
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
360+
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
366361
resp, err := upload.UsingProgrammer(stream.Context(), req, outStream, errStream)
367362
outStream.Close()
368363
errStream.Close()
369364
if err != nil {
370365
return convertErrorToRPCStatus(err)
371366
}
372-
<-outCtx.Done()
373-
<-errCtx.Done()
374367
return stream.Send(resp)
375368
}
376369

@@ -382,16 +375,14 @@ func (s *ArduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rp
382375

383376
// BurnBootloader FIXMEDOC
384377
func (s *ArduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
385-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
386-
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
378+
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
379+
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
387380
resp, err := upload.BurnBootloader(stream.Context(), req, outStream, errStream)
388381
outStream.Close()
389382
errStream.Close()
390383
if err != nil {
391384
return convertErrorToRPCStatus(err)
392385
}
393-
<-outCtx.Done()
394-
<-errCtx.Done()
395386
return stream.Send(resp)
396387
}
397388

commands/daemon/debug.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"context"
2020
"os"
2121

22-
"github.com/arduino/arduino-cli/arduino/utils"
2322
cmd "github.com/arduino/arduino-cli/commands/debug"
2423
dbg "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/debug/v1"
2524
"github.com/pkg/errors"
@@ -50,9 +49,9 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
5049
// Launch debug recipe attaching stdin and out to grpc streaming
5150
signalChan := make(chan os.Signal)
5251
defer close(signalChan)
53-
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
52+
outStream := feedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
5453
resp, debugErr := cmd.Debug(stream.Context(), req,
55-
utils.ConsumeStreamFrom(func() ([]byte, error) {
54+
consumeStreamFrom(func() ([]byte, error) {
5655
command, err := stream.Recv()
5756
if command.GetSendInterrupt() {
5857
signalChan <- os.Interrupt
@@ -65,7 +64,6 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
6564
if debugErr != nil {
6665
return debugErr
6766
}
68-
<-outCtx.Done()
6967
return stream.Send(resp)
7068
}
7169

arduino/utils/stream.go renamed to commands/daemon/stream.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,42 @@
1313
// Arduino software without disclosing the source code of your own applications.
1414
// To purchase a commercial license, send an email to license@arduino.cc.
1515

16-
package utils
16+
package daemon
1717

1818
import (
19-
"context"
2019
"io"
20+
"sync"
2121
"time"
2222

2323
"github.com/djherbis/buffer"
2424
"github.com/djherbis/nio/v3"
2525
)
2626

27-
// FeedStreamTo creates a pipe to pass data to the writer function.
28-
// FeedStreamTo returns the io.WriteCloser side of the pipe, on which the user can write data.
< 10000 /code>27+
// implWriteCloser is an helper struct to implement an anonymous io.WriteCloser
28+
type implWriteCloser struct {
29+
write func(buff []byte) (int, error)
30+
close func() error
31+
}
32+
33+
func (w *implWriteCloser) Write(buff []byte) (int, error) {
34+
return w.write(buff)
35+
}
36+
37+
func (w *implWriteCloser) Close() error {
38+
return w.close()
39+
}
40+
41+
// feedStreamTo creates a pipe to pass data to the writer function.
42+
// feedStreamTo returns the io.WriteCloser side of the pipe, on which the user can write data.
2943
// The user must call Close() on the returned io.WriteCloser to release all the resources.
3044
// If needed, the context can be used to detect when all the data has been processed after
3145
// closing the writer.
32-
func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
33-
ctx, cancel := context.WithCancel(context.Background())
46+
func feedStreamTo(writer func(data []byte)) io.WriteCloser {
3447
r, w := nio.Pipe(buffer.New(32 * 1024))
48+
var wg sync.WaitGroup
49+
wg.Add(1)
3550
go func() {
36-
defer cancel()
51+
defer wg.Done()
3752
data := make([]byte, 16384)
3853
for {
3954
if n, err := r.Read(data); err == nil {
@@ -50,12 +65,21 @@ func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
5065
}
5166
}
5267
}()
53-
return w, ctx
68+
return &implWriteCloser{
69+
write: w.Write,
70+
close: func() error {
71+
if err := w.Close(); err != nil {
72+
return err
73+
}
74+
wg.Wait()
75+
return nil
76+
},
77+
}
5478
}
5579

56-
// ConsumeStreamFrom creates a pipe to consume data from the reader function.
57-
// ConsumeStreamFrom returns the io.Reader side of the pipe, which the user can use to consume the data
58-
func ConsumeStreamFrom(reader func() ([]byte, error)) io.Reader {
80+
// consumeStreamFrom creates a pipe to consume data from the reader function.
81+
// consumeStreamFrom returns the io.Reader side of the pipe, which the user can use to consume the data
82+
func consumeStreamFrom(reader func() ([]byte, error)) io.Reader {
5983
r, w := io.Pipe()
6084
go func() {
6185
for {

docs/UPGRADING.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,12 @@ directory.
305305
- `github.com/arduino/arduino-cli/configuration.BundleToolsDirectories` has been renamed to `BuiltinToolsDirectories`
306306
- `github.com/arduino/arduino-cli/configuration.IDEBundledLibrariesDir` has been renamed to `IDEBuiltinLibrariesDir`
307307
308+
### Removed `utils.FeedStreamTo` and `utils.ConsumeStreamFrom`
309+
310+
`github.com/arduino/arduino-cli/arduino/utils.FeedStreamTo` and
311+
`github.com/arduino/arduino-cli/arduino/utils.ConsumeStreamFrom` are now private. They are mainly used internally for
312+
gRPC stream handling and are not suitable to be public API.
313+
308314
## 0.26.0
309315
310316
### `github.com/arduino/arduino-cli/commands.DownloadToolRelease`, and `InstallToolRelease` functions have been removed

0 commit comments

Comments
 (0)
0