@@ -76,6 +76,19 @@ func MaxPacket(size int) ClientOption {
7676 return MaxPacketChecked (size )
7777}
7878
79+ // MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
80+ //
<
10BC0
td data-grid-cell-id="diff-4b667feae66c9d46b21b9ecc19e8958cf4472d162ce0a47ac3e8386af8bbd8cf-78-81-0" data-selected="false" role="gridcell" style="background-color:var(--diffBlob-additionNum-bgColor, var(--diffBlob-addition-bgColor-num));text-align:center" tabindex="-1" valign="top" class="focusable-grid-cell diff-line-number position-relative left-side">81+ // The default maximum concurrent requests is 64.
82+ func MaxConcurrentRequestsPerFile (n int ) ClientOption {
83+ return func (c * Client ) error {
84+ if n < 1 {
85+ return errors .Errorf ("n must be greater or equal to 1" )
86+ }
87+ c .maxConcurrentRequests = n
88+ return nil
89+ }
90+ }
91+
7992// NewClient creates a new SFTP client on conn, using zero or more option
8093// functions.
8194func NewClient (conn * ssh.Client , opts ... ClientOption ) (* Client , error ) {
@@ -110,7 +123,8 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Clie
110123 },
111124 inflight : make (map [uint32 ]chan <- result ),
112125 },
113- maxPacket : 1 << 15 ,
126+ maxPacket : 1 << 15 ,
127+ maxConcurrentRequests : 64 ,
114128 }
115129 if err := sftp .applyOptions (opts ... ); err != nil {
116130 wr .Close ()
@@ -137,8 +151,9 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Clie
137151type Client struct {
138152 clientConn
139153
140- maxPacket int // max packet size read or written.
141- nextid uint32
154+ maxPacket int // max packet size read or written.
155+ nextid uint32
156+ maxConcurrentRequests int
142157}
143158
144159// Create creates the named file mode 0666 (before umask), truncating it if it
@@ -759,8 +774,6 @@ func (f *File) Name() string {
759774 return f .path
760775}
761776
762- const maxConcurrentRequests = 64
763-
764777// Read reads up to len(b) bytes from the File. It returns the number of bytes
765778// read and an error, if any. Read follows io.Reader semantics, so when Read
766779// encounters an error or EOF condition after successfully reading n > 0 bytes,
@@ -780,7 +793,7 @@ func (f *File) Read(b []byte) (int, error) {
780793 offset := f .offset
781794 // maxConcurrentRequests buffer to deal with broadcastErr() floods
782795 // also must have a buffer of max value of (desiredInFlight - inFlight)
783- ch := make (chan result , maxConcurrentRequests + 1 )
796+ ch := make (chan result , f . c . maxConcurrentRequests + 1 )
784797 type inflightRead struct {
785798 b []byte
786799 offset uint64
@@ -845,7 +858,7 @@ func (f *File) Read(b []byte) (int, error) {
845858 if n < len (req .b ) {
846859 sendReq (req .b [l :], req .offset + uint64 (l ))
847860 }
848- if desiredInFlight < maxConcurrentRequests {
861+ if desiredInFlight < f . c . maxConcurrentRequests {
849862 desiredInFlight ++
850863 }
851864 default :
@@ -880,7 +893,7 @@ func (f *File) WriteTo(w io.Writer) (int64, error) {
880893 writeOffset := offset
881894 fileSize := uint64 (fi .Size ())
882895 // see comment on same line in Read() above
883- ch := make (chan result , maxConcurrentRequests + 1 )
896+ ch := make (chan result , f . c . maxConcurrentRequests + 1 )
884897 type inflightRead struct {
885898 b []byte
886899 offset uint64
@@ -960,7 +973,7 @@ func (f *File) WriteTo(w io.Writer) (int64, error) {
960973 switch {
961974 case offset > fileSize :
962975 desiredInFlight = 1
963- case desiredInFlight < maxConcurrentRequests :
976+ case desiredInFlight < f . c . maxConcurrentRequests :
964977 desiredInFlight ++
965978 }
966979 writeOffset += uint64 (nbytes )
@@ -1028,7 +1041,7 @@ func (f *File) Write(b []byte) (int, error) {
10281041 desiredInFlight := 1
10291042 offset := f .offset
10301043 // see comment on same line in Read() above
1031- ch := make (chan result , maxConcurrentRequests + 1 )
1044+ ch := make (chan result , f . c . maxConcurrentRequests + 1 )
10321045 var firstErr error
10331046 written := len (b )
10341047 for len (b ) > 0 || inFlight > 0 {
@@ -1064,7 +1077,7 @@ func (f *File) Write(b []byte) (int, error) {
10641077 firstErr = err
10651078 break
10661079 }
1067- if desiredInFlight < maxConcurrentRequests {
1080+ if desiredInFlight < f . c . maxConcurrentRequests {
10681081 desiredInFlight ++
10691082 }
10701083 default :
@@ -1093,7 +1106,7 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
10931106 desiredInFlight := 1
10941107 offset := f .offset
10951108 // see comment on same line in Read() above
1096- ch := make (chan result , maxConcurrentRequests + 1 )
1109+ ch := make (chan result , f . c . maxConcurrentRequests + 1 )
10971110 var firstErr error
10981111 read := int64 (0 )
10991112 b := make ([]byte , f .c .maxPacket )
@@ -1132,7 +1145,7 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
11321145 firstErr = err
11331146 break
11341147 }
1135- if desiredInFlight < maxConcurrentRequests {
1148+ if desiredInFlight < f . c . maxConcurrentRequests {
11361149 desiredInFlight ++
11371150 }
11381151 default :
0 commit comments