8000 Merge pull request #266 from pkg/issue265 · etherscan-io/sftp@718e647 · GitHub
[go: up one dir, main page]

Skip to content

Commit 718e647

Browse files
authored
Merge pull request pkg#266 from pkg/issue265
fix race w/ open packet and stat
2 parents fe93131 + b0f20f9 commit 718e647

File tree

2 files changed

+59
-11
lines changed

2 files changed

+59
-11
lines changed

packet-manager.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,33 +105,32 @@ func (s *packetManager) close() {
105105
func (s *packetManager) workerChan(runWorker func(chan orderedRequest),
106106
) chan orderedRequest {
107107

108+
// multiple workers for faster read/writes
108109
rwChan := make(chan orderedRequest, SftpServerWorkerCount)
109110
for i := 0; i < SftpServerWorkerCount; i++ {
110111
runWorker(rwChan)
111112
}
112113

114+
// single worker to enforce sequential processing of everything else
113115
cmdChan := make(chan orderedRequest)
114116
runWorker(cmdChan)
115117

116118
pktChan := make(chan orderedRequest, SftpServerWorkerCount)
117119
go func() {
118-
// start with cmdChan
119-
curChan := cmdChan
120120
for pkt := range pktChan {
121-
// on file open packet, switch to rwChan
122121
switch pkt.requestPacket.(type) {
123-
case *sshFxpOpenPacket:
124-
curChan = rwChan
125-
// on file close packet, switch back to cmdChan
126-
// after waiting for any reads/writes to finish
122+
case *sshFxpReadPacket, *sshFxpWritePacket:
123+
s.incomingPacket(pkt)
124+
rwChan <- pkt
125+
continue
127126
case *sshFxpClosePacket:
128-
// wait for rwChan to finish
127+
// wait for reads/writes to finish when file is closed
128+
// incomingPacket() call must occur after this
129129
s.working.Wait()
130-
// stop using rwChan
131-
curChan = cmdChan
132130
}
133131
s.incomingPacket(pkt)
134-
curChan <- pkt
132+
// all non-RW use sequential cmdChan
133+
cmdChan <- pkt
135134
}
136135
close(rwChan)
137136
close(cmdChan)

server_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sftp
33
import (
44
"io"
55
"os"
6+
"path"
67
"regexp"
78
"sync"
89
"syscall"
@@ -278,3 +279,51 @@ func TestStatusFromError(t *testing.T) {
278279
assert.Equal(t, tc.pkt, statusFromError(tc.pkt, tc.err))
279280
}
280281
}
282+
283+
// This was written to test a race b/w open immediately followed by a stat.
284+
// Previous to this the Open would trigger the use of a worker pool, then the
285+
// stat packet would come in an hit the pool and return faster than the open
286+
// (returning a file-not-found error).
287+
// The below by itself wouldn't trigger the race however, I needed to add a
288+
// small sleep in the openpacket code to trigger the issue. I wanted to add a
289+
// way to inject that in the code but right now there is no good place for it.
290+
// I'm thinking after I convert the server into a request-server backend I
291+
// might be able to do something with the runWorker method passed into the
292+
// packet manager. But with the 2 implementations fo the server it just doesn't
293+
// fit well right now.
294+
func TestOpenStatRace(t *testing.T) {
295+
client, server := clientServerPair(t)
296+
defer client.Close()
297+
defer server.Close()
298+
299+
// openpacket finishes to fast to trigger race in tests
300+
// need to add a small sleep on server to openpackets somehow
301+
tmppath := path.Join(os.TempDir(), "stat_race")
302+
pflags := flags(os.O_RDWR | os.O_CREATE | os.O_TRUNC)
303+
ch := make(chan result, 3)
304+
id1 := client.nextID()
305+
client.dispatchRequest(ch, sshFxpOpenPacket{
306+
ID: id1,
307+
Path: tmppath,
308+
Pflags: pflags,
309+
})
310+
id2 := client.nextID()
311+
client.dispatchRequest(ch, sshFxpLstatPacket{
312+
ID: id2,
313+
Path: tmppath,
314+
})
315+
testreply := func(id uint32, ch chan result) {
316+
r := <-ch
317+
switch r.typ {
318+
case ssh_FXP_ATTRS, ssh_FXP_HANDLE: // ignore
319+
case ssh_FXP_STATUS:
320+
err := normaliseError(unmarshalStatus(id, r.data))
321+
assert.NoError(t, err, "race hit, stat before open")
322+
default:
323+
assert.Fail(t, "Unexpected type")
324+
}
325+
}
326+
testreply(id1, ch)
327+
testreply(id2, ch)
328+
os.Remove(tmppath)
329+
}

0 commit comments

Comments
 (0)
0