1
1
package gopher .channels
2
2
3
+ import java .util .concurrent .ConcurrentHashMap
4
+ import java .util .concurrent .atomic .AtomicLong
5
+
3
6
import gopher .{ChannelClosedException , FlowTermination , GopherAPI }
4
7
5
- import scala .concurrent .{ ExecutionContext , Future }
8
+ import scala .concurrent .Future
6
9
import scala .util .{Failure , Success }
7
10
8
11
trait InputOutput [A ,B ] extends Input [B ] with Output [A ]
@@ -30,47 +33,86 @@ trait InputOutput[A,B] extends Input[B] with Output[A]
30
33
31
34
class CompositionIO [C ](other: InputOutput [B ,C ]) extends InputOutput [A ,C ]
32
35
{
33
- private val s = new ForeverSelectorBuilder (){
34
- override def api = thisInputOutput.api
35
- }
36
36
37
- s.readingWithFlowTerminationAsync(thisInputOutput,
38
- {(ec: ExecutionContext ,ft: FlowTermination [Unit ],a: B ) =>
39
- implicit val iec = ec
40
- other.awrite(a) map (_ => ())
41
- }
42
- )
37
+ protected val internalFlowTermination = PromiseFlowTermination [Unit ]()
38
+
39
+ private val listeners = new ConcurrentHashMap [Long ,ContRead .AuxE [C ]]
40
+ private val listenerIdsGen = new AtomicLong (0L )
41
+ override val api : GopherAPI = thisInputOutput.api
42
+
43
+
43
44
44
- protected val internalFuture = s.go
45
+ def internalRead (cr: ContRead [B ,Unit ]): Option [(ContRead .In [B ]=> Future [Continuated [Unit ]])] = {
46
+ {
47
+ implicit val ec = api.gopherExecutionContext
48
+ Some {
49
+ case ContRead .Value (a) =>
50
+ other.cbwrite[Unit ](cw => {
51
+ Some {
52
+ (a, Future successful cr)
53
+ }
54
+ }, internalFlowTermination)
55
+ Future successful Never // cr will be called after write.
56
+ case ContRead .Skip => Future successful cr
57
+ case ContRead .ChannelClosed =>
58
+ internalFlowTermination.doExit(())
59
+ Future successful Never
60
+ case ContRead .Failure (ex) => internalFlowTermination.throwIfNotCompleted(ex)
61
+ Future successful Never
62
+ }
63
+ }
64
+ }
45
65
66
+ thisInputOutput.cbread(internalRead,internalFlowTermination)
67
+
68
+ internalFlowTermination.future.onComplete{ r =>
69
+ listeners.forEach { (id, cr) =>
70
+ val cre = cr.asInstanceOf [ContRead [A ,cr.S ]]
71
+ cre.function(cre).foreach{ q =>
72
+ val n = r match {
73
+ case Failure (ex) => ContRead .Failure (ex)
74
+ case Success (_) => ContRead .ChannelClosed
75
+ }
76
+ api.continue(q(n),cre.flowTermination)
77
+ }
78
+ }
79
+ }(api.gopherExecutionContext)
46
80
47
- // TODO: think, maybe we need intercept cbread here ?
48
- override def cbread [D ](f : (ContRead [C , D ]) => Option [(ContRead .In [C ]) => Future [Continuated [D ]]], ft : FlowTermination [D ]): Unit =
81
+ override def cbread [D ](f : (ContRead [C , D ]) => Option [ContRead .In [C ] => Future [Continuated [D ]]], ft : FlowTermination [D ]): Unit =
49
82
{
50
- if (checkNotCompleted(ft)) other.cbread(f,ft)
83
+ val id = listenerIdsGen.incrementAndGet()
84
+ def wf (cr: ContRead [C ,D ]): Option [ContRead .In [C ] => Future [Continuated [D ]]]=
85
+ {
86
+ f(cr) map { q =>
87
+ listeners.remove(id)
88
+ q
89
+ }
90
+ }
91
+ val cr = ContRead (f,this ,ft)
92
+ listeners.put(id,cr.asInstanceOf [ContRead .AuxE [C ]])
93
+ other.cbread(wf,ft)
51
94
}
52
95
53
96
override def cbwrite [B ](f : (ContWrite [A , B ]) => Option [(A , Future [Continuated [B ]])], ft : FlowTermination [B ]): Unit =
54
97
{
98
+
55
99
if (checkNotCompleted(ft)) thisInputOutput.cbwrite(f,ft)
56
100
}
57
101
58
102
private def checkNotCompleted [D ](ft: FlowTermination [D ]): Boolean =
59
103
{
60
- if (internalFuture .isCompleted) {
104
+ if (internalFlowTermination .isCompleted) {
61
105
implicit val ec = api.gopherExecutionContext
62
- internalFuture .onComplete{
106
+ internalFlowTermination.future .onComplete{
63
107
case Failure (ex) => ft.doThrow(ex)
64
108
case Success (_) => ft.doThrow(new ChannelClosedException ())
65
109
}
66
110
false
67
111
} else true
68
112
}
69
113
70
- override val api : GopherAPI = thisInputOutput.api
71
114
}
72
115
73
- // TODO: add test suite.
74
116
def compose [C ](other: InputOutput [B ,C ]): InputOutput [A ,C ] =
75
117
new CompositionIO [C ](other)
76
118
@@ -102,9 +144,11 @@ trait CloseableInputOutput[A,B] extends InputOutput[A,B] with CloseableInput[B]
102
144
with CloseableInputOutput [A ,C ]
103
145
104
146
override def compose [C ](other : InputOutput [B ,C ]): CloseableInputOutput [A ,C ] =
105
- new CompositionIOC [C ](other)
147
+ new CompositionIOC [C ](other) {
148
+
149
+ }
106
150
107
- override def |> [C ](other: InputOutput [B ,C ]): CloseableInputOutput [A ,C ] = this .compose(other)
151
+ override def |> [C ](other: InputOutput [B ,C ]): CloseableInputOutput [A ,C ] = this .compose(other)
108
152
109
153
110
154
}
0 commit comments