8000 added tests for composition · rssh/scala-gopher@2244853 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2244853

Browse files
committed
added tests for composition
1 parent 563f790 commit 2244853

File tree

4 files changed

+160
-24
lines changed

4 files changed

+160
-24
lines changed

src/main/scala/gopher/channels/Continuated.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package gopher.channels;
22

3-
import scala.language._
4-
import scala.concurrent._
5-
import java.util.concurrent.atomic.AtomicBoolean
63
import gopher._
74

5+
import scala.concurrent._
6+
import scala.language._
7+
88
/**
99
* represent continuated computation from A to B.
1010
*/
@@ -33,6 +33,7 @@ case class ContRead[A,B](
3333
override val flowTermination: FlowTermination[B]) extends FlowContinuated[B]
3434
{
3535
type El = A
36+
type S = B
3637
type F = ContRead[A,B]=>Option[ContRead.In[A] => Future[Continuated[B]]]
3738
}
3839

@@ -89,7 +90,9 @@ object ContRead
8990
type F = ContRead[A,B]=>Option[ContRead.In[A]=>Future[Continuated[B]]]
9091
}
9192

92-
type AuxF[A,B] = ContRead[A,B]=>Option[ContRead.In[A]=>Future[Continuated[B]]]
93+
type AuxF[A,B] = ContRead[A,B]=>Option[ContRead.In[A]=>Future[Continuated[B]]]
94+
95+
type AuxE[A] = ({type B; type L=ContRead[A,B]})#L
9396
}
9497

9598

src/main/scala/gopher/channels/InputOutput.scala

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package gopher.channels
22

3+
import java.util.concurrent.ConcurrentHashMap
4+
import java.util.concurrent.atomic.AtomicLong
5+
36
import gopher.{ChannelClosedException, FlowTermination, GopherAPI}
47

5-
import scala.concurrent.{ExecutionContext, Future}
8+
import scala.concurrent.Future
69
import scala.util.{Failure, Success}
710

811
trait InputOutput[A,B] extends Input[B] with Output[A]
@@ -30,47 +33,86 @@ trait InputOutput[A,B] extends Input[B] with Output[A]
3033

3134
class CompositionIO[C](other:InputOutput[B,C]) extends InputOutput[A,C]
3235
{
33-
private val s = new ForeverSelectorBuilder(){
34-
override def api = thisInputOutput.api
35-
}
3636

37-
s.readingWithFlowTerminationAsync(thisInputOutput,
38-
{(ec:ExecutionContext,ft:FlowTermination[Unit],a:B) =>
39-
implicit val iec = ec
40-
other.awrite(a) map (_ => ())
41-
}
42-
)
37+
protected val internalFlowTermination = PromiseFlowTermination[Unit]()
38+
39+
private val listeners = new ConcurrentHashMap[Long,ContRead.AuxE[C]]
40+
private val listenerIdsGen = new AtomicLong(0L)
41+
override val api: GopherAPI = thisInputOutput.api
42+
43+
4344

44-
protected val internalFuture = s.go
45+
def internalRead(cr:ContRead[B,Unit]):Option[(ContRead.In[B]=>Future[Continuated[Unit]])] = {
46+
{
47+
implicit val ec = api.gopherExecutionContext
48+
Some {
49+
case ContRead.Value(a) =>
50+
other.cbwrite[Unit](cw => {
51+
Some {
52+
(a, Future successful cr)
53+
}
54+
}, internalFlowTermination)
55+
Future successful Never // cr will be called after write.
56+
case ContRead.Skip => Future successful cr
57+
case ContRead.ChannelClosed =>
58+
internalFlowTermination.doExit(())
59+
Future successful Never
60+
case ContRead.Failure(ex) => internalFlowTermination.throwIfNotCompleted(ex)
61+
Future successful Never
62+
}
63+
}
64+
}
4565

66+
thisInputOutput.cbread(internalRead,internalFlowTermination)
67+
68+
internalFlowTermination.future.onComplete{ r =>
69+
listeners.forEach { (id, cr) =>
70+
val cre = cr.asInstanceOf[ContRead[A,cr.S]]
71+
cre.function(cre).foreach{ q =>
72+
val n = r match {
73+
case Failure(ex) => ContRead.Failure(ex)
74+
case Success(_) => ContRead.ChannelClosed
75+
}
76+
api.continue(q(n),cre.flowTermination)
77+
}
78+
}
79+
}(api.gopherExecutionContext)
4680

47-
// TODO: think, maybe we need intercept cbread here ?
48-
override def cbread[D](f: (ContRead[C, D]) => Option[(ContRead.In[C]) => Future[Continuated[D]]], ft: FlowTermination[D]): Unit =
81+
override def cbread[D](f: (ContRead[C, D]) => Option[ContRead.In[C] => Future[Continuated[D]]], ft: FlowTermination[D]): Unit =
4982
{
50-
if (checkNotCompleted(ft)) other.cbread(f,ft)
83+
val id = listenerIdsGen.incrementAndGet()
84+
def wf(cr:ContRead[C,D]):Option[ContRead.In[C] => Future[Continuated[D]]]=
85+
{
86+
f(cr) map { q =>
87+
listeners.remove(id)
88+
q
89+
}
90+
}
91+
val cr = ContRead(f,this,ft)
92+
listeners.put(id,cr.asInstanceOf[ContRead.AuxE[C]])
93+
other.cbread(wf,ft)
5194
}
5295

5396
override def cbwrite[B](f: (ContWrite[A, B]) => Option[(A, Future[Continuated[B]])], ft: FlowTermination[B]): Unit =
5497
{
98+
5599
if (checkNotCompleted(ft)) thisInputOutput.cbwrite(f,ft)
56100
}
57101

58102
private def checkNotCompleted[D](ft:FlowTermination[D]): Boolean =
59103
{
60-
if (internalFuture.isCompleted) {
104+
if (internalFlowTermination.isCompleted) {
61105
implicit val ec = api.gopherExecutionContext
62-
internalFuture.onComplete{
106+
internalFlowTermination.future.onComplete{
63107
case Failure(ex) => ft.doThrow(ex)
64108
case Success(_) => ft.doThrow(new ChannelClosedException())
65109
}
66110
false
67111
} else true
68112
}
69113

70-
override val api: GopherAPI = thisInputOutput.api
71114
}
72115

73-
//TODO: add test suite.
74116
def compose[C](other:InputOutput[B,C]):InputOutput[A,C] =
75117
new CompositionIO[C](other)
76118

@@ -102,9 +144,11 @@ trait CloseableInputOutput[A,B] extends InputOutput[A,B] with CloseableInput[B]
102144
with CloseableInputOutput[A,C]
103145

104146
override def compose[C](other: InputOutput[B,C]):CloseableInputOutput[A,C] =
105-
new CompositionIOC[C](other)
147+
new CompositionIOC[C](other) {
148+
149+
}
106150

107-
override def |>[C](other:InputOutput[B,C]):CloseableInputOutput[A,C] = this.compose(other)
151+
override def |>[C](other:InputOutput[B,C]): CloseableInputOutput[A,C] = this.compose(other)
108152

109153

110154
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package gopher.channels
2+
3+
import gopher.ChannelClosedException
4+
import org.scalatest.AsyncFunSuite
5+
6+
class IOComposeSuite extends AsyncFunSuite {
7+
8+
9+
test("simple composition of IO with map") {
10+
import gopherApi._
11+
12+
val ch = makeChannel[Int]()
13+
val ch1 = makeChannel[Int]()
14+
val chs = ch1.map( _.toString )
15+
16+
val pipeline = ch |> chs
17+
18+
pipeline.awriteAll(List(10,12,34,43))
19+
20+
for{
21+
r1 <- pipeline.aread
22+
r2 <- pipeline.aread
23+
r3 <- pipeline.aread
24+
r4 <- pipeline.aread
25+
} yield assert((r1,r2,r3,r4) == ("10","12","34","43") )
26+
27+
}
28+
29+
30+
test("simple double composition of IO with map") {
31+
import gopherApi._
32+
E377 33+
val ch = makeChannel[Int]()
34+
val chs = makeChannel[Int]().map( _.toString )
35+
val reverse = makeChannel[String]().map(_.reverse.toInt)
36+
37+
val pipeline = ch |> chs |> reverse
38+
39+
pipeline.awriteAll(List(10,12,34,43))
40+
41+
for{
42+
r1 <- pipeline.aread
43+
r2 <- pipeline.aread
44+
r3 <- pipeline.aread
45+
r4 <- pipeline.aread
46+
} yield assert((r1,r2,r3,r4) == (1,21,43,34) )
47+
48+
}
49+
50+
test("closing channel must close it's composition") {
51+
import gopherApi._
52+
53+
//pending
54+
55+
val ch = makeChannel[Int]()
56+
val ch1 = makeChannel[Int]()
57+
58+
val composed = ch |> ch1
59+
60+
val f1 = for{
61+
_ <- ch.awrite(1)
62+
} yield ch.close()
63+
64+
for{
65+
x1 <- composed.aread
66+
_ = assert(x1==1)
67+
x2 <- recoverToSucceededIf[ChannelClosedException](composed.aread)
68+
} yield x2
69+
70+
}
71+
72+
73+
val gopherApi = CommonTestObjects.gopherApi
74+
75+
}

src/test/scala/gopher/channels/InputOpsSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,20 @@ class InputOpsSuite extends AsyncFunSuite {
273273
r2 <- a2} yield assert(monotonic)
274274
}
275275

276+
test("order of reading from unbuffered channel") {
277+
val ch = gopherApi.makeChannel[Int]()
278+
ch.awriteAll(List(10,12,34,43))
279+
280+
for{
281+
r1 <- ch.aread
282+
r2 <- ch.aread
283+
r3 <- ch.aread
284+
r4 <- ch.aread
285+
} yield assert((r1,r2,r3,r4) == (10,12,34,43) )
286+
287+
288+
}
289+
276290

277291
def gopherApi = CommonTestObjects.gopherApi
278292

0 commit comments

Comments
 (0)
0