8000 added compose to InputOutput · rssh/scala-gopher@563f790 · GitHub
[go: up one dir, main page]

Skip to content

Commit 563f790

Browse files
committed
added compose to InputOutput
1 parent 354adbc commit 563f790

File tree

1 file changed

+65
-2
lines changed

1 file changed

+65
-2
lines changed

src/main/scala/gopher/channels/InputOutput.scala

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package gopher.channels
22

3-
import gopher.FlowTermination
3+
import gopher.{ChannelClosedException, FlowTermination, GopherAPI}
44

5-
import scala.concurrent.Future
5+
import scala.concurrent.{ExecutionContext, Future}
6+
import scala.util.{Failure, Success}
67

78
trait InputOutput[A,B] extends Input[B] with Output[A]
89
{
@@ -25,6 +26,59 @@ trait InputOutput[A,B] extends Input[B] with Output[A]
2526

2627
override def filter(p: B=>Boolean):InputOutput[A,B] = new FilteredIO(p)
2728

29+
30+
31+
class CompositionIO[C](other:InputOutput[B,C]) extends InputOutput[A,C]
32+
{
33+
private val s = new ForeverSelectorBuilder(){
34+
override def api = thisInputOutput.api
35+
}
36+
37+
s.readingWithFlowTerminationAsync(thisInputOutput,
38+
{(ec:ExecutionContext,ft:FlowTermination[Unit],a:B) =>
39+
implicit val iec = ec
40+
other.awrite(a) map (_ => ())
41+
}
42+
)
43+
44+
protected val internalFuture = s.go
45+
46+
47+
// TODO: think, maybe we need intercept cbread here ?
48+
override def cbread[D](f: (ContRead[C, D]) => Option[(ContRead.In[C]) => Future[Continuated[D]]], ft: FlowTermination[D]): Unit =
49+
{
50+
if (checkNotCompleted(ft)) other.cbread(f,ft)
51+
}
52+
53+
override def cbwrite[B](f: (ContWrite[A, B]) => Option[(A, Future[Continuated[B]])], ft: FlowTermination[B]): Unit =
54+
{
55+
if (checkNotCompleted(ft)) thisInputOutput.cbwrite(f,ft)
56+
}
57+
58+
private def checkNotCompleted[D](ft:FlowTermination[D]): Boolean =
59+
{
60+
if (internalFuture.isCompleted) {
61+
implicit val ec = api.gopherExecutionContext
62+
internalFuture.onComplete{
63+
case Failure(ex) => ft.doThrow(ex)
64+
case Success(_) => ft.doThrow(new ChannelClosedException())
65+
}
66+
false
67+
} else true
68+
}
69+
70+
override val api: GopherAPI = thisInputOutput.api
71+
}
72+
73+
//TODO: add test suite.
74+
def compose[C](other:InputOutput[B,C]):InputOutput[A,C] =
75+
new CompositionIO[C](other)
76+
77+
/**
78+
* Synonym for this.compose(other)
79+
*/
80+
def |>[C](other:InputOutput[B,C]):InputOutput[A,C] = this.compose(other)
81+
2882
}
2983

3084

@@ -43,5 +97,14 @@ trait CloseableInputOutput[A,B] extends InputOutput[A,B] with CloseableInput[B]
4397

4498
override def filter(p: B=>Boolean): CloseableInputOutput[A,B] = new FilteredIOC(p)
4599

100+
class CompositionIOC[C](other: InputOutput[B,C]) extends CompositionIO(other)
101+
with DoneSignalDelegate[C]
102+
with CloseableInputOutput[A,C]
103+
104+
override def compose[C](other: InputOutput[B,C]):CloseableInputOutput[A,C] =
105+
new CompositionIOC[C](other)
106+
107+
override def |>[C](other:InputOutput[B,C]):CloseableInputOutput[A,C] = this.compose(other)
108+
46109

47110
}

0 commit comments

Comments
 (0)
0