@@ -7,62 +7,87 @@ import (
7
7
)
8
8
9
9
// The goal of the packetManager is to keep the outgoing packets in the same
10
- // order as the incoming. This is due to some sftp clients requiring this
11
- // behavior (eg. winscp).
10
+ // order as the incoming as is requires by section 7 of the RFC.
12
11
13
- type packetSender interface {
14
- sendPacket (encoding.BinaryMarshaler ) error
12
+ type packetManager struct {
13
+ requests chan orderedPacket
14
+ responses chan orderedPacket
15
+ fini chan struct {}
16
+ incoming orderedPackets
17
+ outgoing orderedPackets
18
+ sender packetSender // connection object
19
+ working * sync.WaitGroup
20
+ packetCount uint32
15
21
}
16
22
17
- type packetManager struct {
18
- requests chan requestPacket
19
- responses chan responsePacket
20
- fini chan struct {}
21
- incoming requestPacketIDs
22
- outgoing responsePackets
23
- sender packetSender // connection object
24
- working * sync.WaitGroup
23
+ type packetSender interface {
24
+ sendPacket (encoding.BinaryMarshaler ) error
25
25
}
26
26
27
27
func newPktMgr (sender packetSender ) * packetManager {
28
28
s := & packetManager {
29
- requests : make (chan requestPacket , SftpServerWorkerCount ),
30
- responses : make (chan responsePacket , SftpServerWorkerCount ),
29
+ requests : make (chan orderedPacket , SftpServerWorkerCount ),
30
+ responses : make (chan orderedPacket , SftpServerWorkerCount ),
31
31
fini : make (chan struct {}),
32
- incoming : make ([]uint32 , 0 , SftpServerWorkerCount ),
33
- outgoing : make ([]responsePacket , 0 , SftpServerWorkerCount ),
32
+ incoming : make ([]orderedPacket , 0 , SftpServerWorkerCount ),
33
+ outgoing : make ([]orderedPacket , 0 , SftpServerWorkerCount ),
34
34
sender : sender ,
35
35
working : & sync.WaitGroup {},
36
36
}
37
37
go s .controller ()
38
38
return s
39
39
}
40
40
41
- type responsePackets []responsePacket
41
+ //// packet ordering
42
+ func (s * packetManager ) newOrderId () uint32 {
43
+ s .packetCount ++
44
+ return s .packetCount
45
+ }
42
46
43
- func (r responsePackets ) Sort () {
44
- sort .Slice (r , func (i , j int ) bool {
45
- return r [i ].id () < r [j ].id ()
46
- })
47
+ type orderedRequest struct {
48
+ requestPacket
49
+ orderid uint32
47
50
}
48
51
49
- type requestPacketIDs []uint32
52
+ func (s * packetManager ) newOrderedRequest (p requestPacket ) orderedRequest {
53
+ return orderedRequest {requestPacket : p , orderid : s .newOrderId ()}
54
+ }
55
+ func (p orderedRequest ) orderId () uint32 { return p .orderid }
56
+ func (p orderedRequest ) setOrderId (oid uint32 ) { p .orderid = oid }
50
57
51
- func (r requestPacketIDs ) Sort () {
52
- sort .Slice (r , func (i , j int ) bool {
53
- return r [i ] < r [j ]
58
+ type orderedResponse struct {
59
+ responsePacket
60
+ orderid uint32
61
+ }
62
+
63
+ func (s * packetManager ) newOrderedResponse (p responsePacket , id uint32 ,
64
+ ) orderedResponse {
65
+ return orderedResponse {responsePacket : p , orderid : id }
66
+ }
67
+ func (p orderedResponse ) orderId () uint32 { return p .orderid }
68
+ func (p orderedResponse ) setOrderId (oid uint32 ) { p .orderid = oid }
69
+
70
+ type orderedPacket interface {
71
+ id () uint32
72
+ orderId () uint32
73
+ }
74
+ type orderedPackets []orderedPacket
75
+
76
+ func (o orderedPackets ) Sort () {
77
+ sort .Slice (o , func (i , j int ) bool {
78
+ return o [i ].orderId () < o [j ].orderId ()
54
79
})
55
80
}
56
81
82
+ //// packet registry
57
83
// register incoming packets to be handled
58
- // send id of 0 for packets without id
59
- func (s * packetManager ) incomingPacket (pkt requestPacket ) {
84
+ func (s * packetManager ) incomingPacket (pkt orderedRequest ) {
60
85
s .working .Add (1 )
61
- s .requests <- pkt // buffer == SftpServerWorkerCount
86
+ s .requests <- pkt
62
87
}
63
88
64
89
// register outgoing packets as being ready
65
- func (s * packetManager ) readyPacket (pkt responsePacket ) {
90
+ func (s * packetManager ) readyPacket (pkt orderedResponse ) {
66
91
s .responses <- pkt
67
92
s .working .Done ()
68
93
}
@@ -75,27 +100,26 @@ func (s *packetManager) close() {
75
100
}
76
101
77
102
// Passed a worker function, returns a channel for incoming packets.
78
- // The goal is to process packets in the order they are received as is
79
- // requires by section 7 of the RFC, while maximizing throughput of file
80
- // transfers.
81
- func (s * packetManager ) workerChan (runWorker func (chan requestPacket ),
82
- ) chan requestPacket {
103
+ // Keep process packet responses in the order they are received while
104
+ // maximizing throughput of file transfers.
105
+ func (s * packetManager ) workerChan (runWorker func (chan orderedRequest ),
106
+ ) chan orderedRequest {
83
107
84
- rwChan := make (chan requestPacket , SftpServerWorkerCount )
108
+ rwChan := make (chan orderedRequest , SftpServerWorkerCount )
85
109
for i := 0 ; i < SftpServerWorkerCount ; i ++ {
86
110
runWorker (rwChan )
87
111
}
88
112
89
- cmdChan := make (chan requestPacket )
113
+ cmdChan := make (chan orderedRequest )
90
114
runWorker (cmdChan )
91
115
92
- pktChan := make (chan requestPacket , SftpServerWorkerCount )
116
+ pktChan := make (chan orderedRequest , SftpServerWorkerCount )
93
117
go func () {
94
118
// start with cmdChan
95
119
curChan := cmdChan
96
120
for pkt := range pktChan {
97
121
// on file open packet, switch to rwChan
98
- switch pkt .(type ) {
122
+ switch pkt .requestPacket . (type ) {
99
123
case * sshFxpOpenPacket :
100
124
curChan = rwChan
101
125
// on file close packet, switch back to cmdChan
@@ -122,17 +146,13 @@ func (s *packetManager) controller() {
122
146
for {
123
147
select {
124
148
case pkt := <- s .requests :
125
- debug ("incoming id: %v" , pkt .id ())
126
- s .incoming = append (s .incoming , pkt .id ())
127
- if len (s .incoming ) > 1 {
128
- s .incoming .Sort ()
129
- }
149
+ debug ("incoming id (oid): %v (%v)" , pkt .id (), pkt .orderId ())
150
+ s .incoming = append (s .incoming , pkt )
151
+ s .incoming .Sort ()
130
152
case pkt := <- s .responses :
131
- debug ("outgoing pkt : %v" , pkt .id ())
153
+ debug ("outgoing id (oid) : %v (%v) " , pkt .id (), pkt . orderId ())
132
154
s .outgoing = append (s .outgoing , pkt )
133
- if len (s .outgoing ) > 1 {
134
- s .outgoing .Sort ()
135
- }
155
+ s .outgoing .Sort ()
136
156
case <- s .fini :
137
157
return
138
158
}
@@ -150,10 +170,11 @@ func (s *packetManager) maybeSendPackets() {
150
170
}
151
171
out := s .outgoing [0 ]
152
172
in := s .incoming [0 ]
153
- // debug("incoming: %v", s.incoming)
154
- // debug("outgoing: %v", outfilter(s.outgoing))
155
- if in == out .id () {
156
- s .sender .sendPacket (out )
173
+ // debug("incoming: %v", ids(s.incoming))
174
+ // debug("outgoing: %v", ids(s.outgoing))
175
+ if in .orderId () == out .orderId () {
176
+ debug ("Sending packet: %v" , out .id ())
177
+ s .sender .sendPacket (out .(encoding.BinaryMarshaler ))
157
178
// pop off heads
158
179
copy (s .incoming , s .incoming [1 :]) // shift left
159
180
s .incoming = s .incoming [:len (s .incoming )- 1 ] // remove last
@@ -165,10 +186,17 @@ func (s *packetManager) maybeSendPackets() {
165
186
}
166
187
}
167
188
168
- //func outfilter(o []responsePacket) []uint32 {
169
- // res := make([]uint32, 0, len(o))
170
- // for _, v := range o {
171
- // res = append(res, v.id())
172
- // }
173
- // return res
174
- //}
189
+ // func oids(o []orderedPacket) []uint32 {
190
+ // res := make([]uint32, 0, len(o))
191
+ // for _, v := range o {
192
+ // res = append(res, v.orderId())
193
+ // }
194
+ // return res
195
+ // }
196
+ // func ids(o []orderedPacket) []uint32 {
197
+ // res := make([]uint32, 0, len(o))
198
+ // for _, v := range o {
199
+ // res = append(res, v.id())
200
+ // }
201
+ // return res
202
+ // }
0 commit comments