1
1
package gopher .channels
2
2
3
- import gopher .FlowTermination
3
+ import gopher .{ ChannelClosedException , FlowTermination , GopherAPI }
4
4
5
- import scala .concurrent .Future
5
+ import scala .concurrent .{ExecutionContext , Future }
6
+ import scala .util .{Failure , Success }
6
7
7
8
trait InputOutput [A ,B ] extends Input [B ] with Output [A ]
8
9
{
@@ -25,6 +26,59 @@ trait InputOutput[A,B] extends Input[B] with Output[A]
25
26
26
27
override def filter (p : B => Boolean ): InputOutput [A ,B ] = new FilteredIO (p)
27
28
29
+
30
+
31
+ class CompositionIO [C ](other: InputOutput [B ,C ]) extends InputOutput [A ,C ]
32
+ {
33
+ private val s = new ForeverSelectorBuilder (){
34
+ override def api = thisInputOutput.api
35
+ }
36
+
37
+ s.readingWithFlowTerminationAsync(thisInputOutput,
38
+ {(ec: ExecutionContext ,ft: FlowTermination [Unit ],a: B ) =>
39
+ implicit val iec = ec
40
+ other.awrite(a) map (_ => ())
41
+ }
42
+ )
43
+
44
+ protected val internalFuture = s.go
45
+
46
+
47
+ // TODO: think, maybe we need intercept cbread here ?
48
+ override def cbread [D ](f : (ContRead [C , D ]) => Option [(ContRead .In [C ]) => Future [Continuated [D ]]], ft : FlowTermination [D ]): Unit =
49
+ {
50
+ if (checkNotCompleted(ft)) other.cbread(f,ft)
51
+ }
52
+
53
+ override def cbwrite [B ](f : (ContWrite [A , B ]) => Option [(A , Future [Continuated [B ]])], ft : FlowTermination [B ]): Unit =
54
+ {
55
+ if (checkNotCompleted(ft)) thisInputOutput.cbwrite(f,ft)
56
+ }
57
+
58
+ private def checkNotCompleted [D ](ft: FlowTermination [D ]): Boolean =
59
+ {
60
+ if (internalFuture.isCompleted) {
61
+ implicit val ec = api.gopherExecutionContext
62
+ internalFuture.onComplete{
63
+ case Failure (ex) => ft.doThrow(ex)
64
+ case Success (_) => ft.doThrow(new ChannelClosedException ())
65
+ }
66
+ false
67
+ } else true
68
+ }
69
+
70
+ override val api : GopherAPI = thisInputOutput.api
71
+ }
72
+
73
+ // TODO: add test suite.
74
+ def compose [C ](other: InputOutput [B ,C ]): InputOutput [A ,C ] =
75
+ new CompositionIO [C ](other)
76
+
77
+ /**
78
+ * Synonym for this.compose(other)
79
+ */
80
+ def |> [C ](other: InputOutput [B ,C ]): InputOutput [A ,C ] = this .compose(other)
81
+
28
82
}
29
83
30
84
@@ -43,5 +97,14 @@ trait CloseableInputOutput[A,B] extends InputOutput[A,B] with CloseableInput[B]
43
97
44
98
override def filter (p : B => Boolean ): CloseableInputOutput [A ,B ] = new FilteredIOC (p)
45
99
100
+ class CompositionIOC [C ](other : InputOutput [B ,C ]) extends CompositionIO (other)
101
+ with DoneSignalDelegate [C ]
102
+ with CloseableInputOutput [A ,C ]
103
+
104
+ override def compose [C ](other : InputOutput [B ,C ]): CloseableInputOutput [A ,C ] =
105
+ new CompositionIOC [C ](other)
106
+
107
+ override def |> [C ](other: InputOutput [B ,C ]): CloseableInputOutput [A ,C ] = this .compose(other)
108
+
46
109
47
110
}
0 commit comments