8000 more test transformations to async form · rssh/scala-gopher@d83538c · GitHub
[go: up one dir, main page]

Skip to content
< 8000 script crossorigin="anonymous" type="application/javascript" src="https://github.githubassets.com/assets/vendors-node_modules_github_remote-form_dist_index_js-node_modules_delegated-events_dist_inde-94fd67-b0625c39513c.js" defer="defer">

Commit d83538c

Browse files
committed
more test transformations to async form
1 parent 665737b commit d83538c

File tree

7 files changed

+124
-120
lines changed

7 files changed

+124
8000
-120
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % "test"
2525
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.1"
2626

2727
//testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-n", "Now")
28-
//fork in Test := true
28+
fork in Test := true
2929
//javaOptions in Test += s"""-javaagent:${System.getProperty("user.home")}/.ivy2/local/com.github.rssh/trackedfuture_2.11/0.3/jars/trackedfuture_2.11-assembly.jar"""
3030

3131
version:="0.99.11-SNAPSHOT"

src/main/scala/gopher/channels/ForeverSelectorBuilder.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ trait ForeverSelectorBuilder extends SelectorBuilder[Unit]
2525
def readingWithFlowTerminationAsync[A](ch: Input[A], f: (ExecutionContext, FlowTermination[Unit], A) => Future[Unit] ): this.type =
2626
{
2727
lazy val cont = ContRead(normalized, ch, selector)
28-
def normalized(_cont:ContRead[A,Unit]):Option[ContRead.In[A]=>Future[Continuated[Unit]]] =
29-
Some(ContRead.liftIn(_cont)(a=>f(ec,selector,a) map Function.const(cont)))
28+
def normalized(_cont:ContRead[A,Unit]):Option[ContRead.In[A]=>Future[Continuated[Unit]]] = {
29+
Some(ContRead.liftIn(_cont)(a => f(ec, selector, a) map Function.const(cont)))
30+
}
31+
3032
withReader[A](ch, normalized)
3133
}
3234

src/main/scala/gopher/channels/Input.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,9 @@ trait Input[A] extends GopherAPIProvider
221221
{ val contA = ContRead(f,this,cont.flowTermination)
222222
f(contA) map (f1 => { case v@ContRead.Value(a) => f1(ContRead.Value(a))
223223
case ContRead.Skip => f1(ContRead.Skip)
224-
Future successful cont
224+
Future successful cont
225225
case ChannelClosed => f1(ContRead.Skip)
226-
Future successful ContRead(f,other,cont.flowTermination)
226+
Future successful ContRead(f,other,cont.flowTermination)
227227
case ContRead.Failure(ex) => f1(ContRead.Failure(ex))
228228
})
229229
}

src/main/scala/gopher/channels/Selector.scala

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,17 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A] {
5050
private[channels] def lockedRead[E](f: ContRead.AuxF[E, A], ch: Input[E], ft: FlowTermination[A]): ContRead.AuxF[E, A] = {
5151
val cont0 = ContRead(f,ch,ft)
5252
def f1(cont: ContRead[E, A]): Option[ContRead.In[E] => Future[Continuated[A]]] = {
53-
tryLocked(f(cont0), cont, "read") map { q =>
54-
in =>
55-
unlockAfter(
56-
try {
57-
errorHandled(q(in), cont0)
58-
} catch {
59-
case e: Throwable => ft.doThrow(e)
60-
Future successful Never
61-
},
62-
ft, "read")
63-
}
53+
tryLocked(f(cont0), cont, "read") map { q =>
54+
in =>
55+
unlockAfter(
56+
try {
57+
errorHandled(q(in), cont0)
58+
}catch {
59+
case e: Throwable => ft.doThrow(e)
60+
Future successful Never
61+
},
62+
ft, "read")
63+
}
6464
}
6565

6666
f1
@@ -69,19 +69,19 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A] {
6969

7070

7171
private[channels] def lockedWrite[E](f: ContWrite.AuxF[E, A], ch: Output[E], ft: FlowTermination[A]): ContWrite.AuxF[E, A] = { (cont) =>
72-
val cont0 = ContWrite(f,ch,ft)
73-
tryLocked(f(cont0), cont, "write") map {
74-
case (a, future) =>
75-
(a, unlockAfter(errorHandled(future,cont0), ft, "write"))
76-
}
72+
val cont0 = ContWrite(f, ch, ft)
73+
tryLocked(f(cont0), cont, "write") map {
74+
case (a, future) =>
75+
(a, unlockAfter(errorHandled(future, cont0), ft, "write"))
76+
}
7777
}
7878

7979
private[channels] def lockedSkip(f: Skip.AuxF[A], ft: FlowTermination[A]): Skip.AuxF[A] = { cont =>
8080
// TODO: check, maybe pass cont in this situation is enough ?
81-
val cont0 = Skip(f,ft)
82-
tryLocked(f(cont0), cont, "skip") map { f =>
83-
unlockAfter( errorHandled(f, cont0), ft, "skip")
84-
}
81+
val cont0 = Skip(f, ft)
82+
tryLocked(f(cont0), cont, "skip") map { f =>
83+
unlockAfter(errorHandled(f, cont0), ft, "skip")
84+
}
8585
}
8686

8787
private[channels] def makeLocked(block: Continuated[A]): Continuated[A] = {
@@ -95,6 +95,8 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A] {
9595
}
9696

9797

98+
99+
98100
@inline
99101
private[this] def tryLocked[X](body: => Option[X], cont: FlowContinuated[A], dstr: String): Option[X] =
100102
if (tryLock()) {
@@ -149,6 +151,8 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A] {
149151
)
150152
}
151153

154+
//private[this] def ifNotCompleted1[X](ft:FlowTermination[A],f:X => Future[Continuated[A]])
155+
152156
private[this] def toWaiters(cont:Continuated[A]):Unit=
153157
{
154158
waiters.add(cont)

src/test/scala/gopher/channels/FlowTerminationSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ class FlowTerminationSuite extends AsyncFunSuite
6464
case x: channel.read => sum += x
6565
select.shutdown()
6666
}
67-
val f2 = channel.awrite(1)
68-
val f3 = channel.awrite(2)
69-
f0 map { _ => assert(sum == 1) }
67+
for {r2 <- channel.awrite(1)
68+
r3 <- channel.awrite(2)
69+
r0 <- f0 } yield assert(sum == 1)
70+
7071
}
7172

7273

src/test/scala/gopher/channels/IOTimeoutsSuite.scala

Lines changed: 86 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -4,110 +4,107 @@ import gopher._
44
import org.scalatest._
55
import org.scalatest.concurrent._
66

7-
import scala.concurrent.ExecutionContext.Implicits.global
87
import scala.concurrent._
98
import scala.concurrent.duration._
109
import scala.language._
1110
import scala.util._
1211

13-
class IOTimeoutsSuite extends FunSuite with Waiters {
12+
class IOASyncTimeoutsSuite extends AsyncFunSuite {
1413

15-
16-
test("messsaged from timeouts must be appear during reading attempt from empty channel") {
17-
val ch = gopherApi.makeChannel[String]()
18-
val (chReady, chTimeout) = ch.withInputTimeouts(300 milliseconds)
19-
val w = new Waiter()
20-
val f = gopherApi.select.once {
21-
case x: chReady.read => 1
22-
case x: chTimeout.read => 2
23-
}
24-
Await.ready(f map ( x => { w( assert(x == 2) ); w.dismiss() } ), 1 second )
25-
w.await()
26-
}
14+
test("messsaged from timeouts must be appear during reading attempt from empty channel") {
15+
val ch = gopherApi.makeChannel[String]()
16+
val (chReady, chTimeout) = ch.withInputTimeouts(300 milliseconds)
17+
val f = gopherApi.select.once {
18+
case x: chReady.read => 1
19+
case x: chTimeout.read => 2
20+
}
21+
for(x <- f) yield assert( x == 2)
22+
}
2723

28-
test("when we have value, we have no timeouts") {
29-
val ch = gopherApi.makeChannel[String]()
30-
ch.awrite("qqq")
31-
val (chReady, chTimeout) = ch.withInputTimeouts(300 milliseconds)
32-
val w = new Waiter()
33-
val f = gopherApi.select.once {
34-
case x: chReady.read => 1
35-
case x: chTimeout.read => 2
36-
}
37-
Await.ready(f map ( x => { w( assert(x == 1) ); w.dismiss() } ), 1 second )
38-
w.await()
39-
}
4024

25+
test("when we have value, we have no timeouts") {
26+
val ch = gopherApi.makeChannel[String]()
27+
ch.awrite("qqq")
28+
val (chReady, chTimeout) = ch.withInputTimeouts(300 milliseconds)
29+
val f = gopherApi.select.once {
30+
case x: chReady.read => 1
31+
case x: chTimeout.read => 2
32+
}
33+
for(x <- f) yield assert (x==1)
34+
}
4135

42-
test("on input close it's timeout channel also must close") {
43-
val w = new Waiter()
44-
val ch = gopherApi.makeChannel[String](1)
45-
Await.ready(ch.awrite("qqq"), 1 second)
46-
val (chReady, chTimeout) = ch.withInputTimeouts(300 milliseconds)
47-
ch.close()
48-
// now must read one element
49-
val f1 = gopherApi.select.once {
36+
37+
test("on input close it's timeout channel also must close") {
38+
val ch = gopherApi.makeChannel[String](1)
39+
for{
40+
_ <- ch.awrite("qqq")
41+
(chReady, chTimeout) = ch.withInputTimeouts(300 milliseconds)
42+
_ = ch.close()
43+
f1 = gopherApi.select.once {
5044
case x: chReady.read => 1
5145
case x: chTimeout.read => 2
52-
}
53-
Await.ready(f1 map ( x => { w( x == 1); w.dismiss() } ), 1 second )
54-
// now receive stream-closed exception
55-
val f2 = chReady.aread
56-
f2 onComplete { x => w(assert(x.isFailure && x.failed.get.isInstanceOf[ChannelClosedException]))
57-
w.dismiss()
58-
}
59-
Await.ready(f2, 3 second)
60-
val f3 = chTimeout.aread
61-
f3 onComplete { x => w(assert(x.isFailure && x.failed.get.isInstanceOf[ChannelClosedException]))
62-
w.dismiss()
63-
}
64-
Await.ready(f3, 6 seconds)
65-
w.await(dismissals=Dismissals(3))
66-
67-
}
46+
}
47+
x <- f1
48+
_ <- assert(x==1)
49+
_ <- recoverToSucceededIf[ChannelClosedException] {
50+
chReady.aread
51+
}
52+
l <- recoverToSucceededIf[ChannelClosedException] {
53+
chTimeout.aread
54+
}
55+
} yield l
56+
}
6857

58+
test("messsaged from timeouts must be appear during attempt to write to filled unbuffered channel") {
59+
val ch = gopherApi.makeChannel[Int]()
60+
val (chReady, chTimeout) = ch.withOutputTimeouts(150 milliseconds)
61+
@volatile var count = 1
62+
val f = gopherApi.select.forever {
63+
case x: chReady.write if (x==count) =>
64+
{};
65+
count += 1 // will newer called, since we have no reader
66+
case t: chTimeout.read =>
67+
implicitly[FlowTermination[Unit]].doExit(count)
68+
}
69+
f map (_ => assert(count==1))
70+
}
6971

70-
test("messsaged from timeouts must be appear during attempt to write to filled unbuffered channel") {
71-
val ch = gopherApi.makeChannel[Int]()
72-
val (chReady, chTimeout) = ch.withOutputTimeouts(150 milliseconds)
73-
@volatile var count = 1
74-
val f = gopherApi.select.forever {
75-
case x: chReady.write if (x==count) =>
76-
{};
77-
count += 1 // will newer called, since we have no reader
78-
case t: chTimeout.read =>
79-
implicitly[FlowTermination[Unit]].doExit(count)
80-
}
81-
Await.ready(f, 1 second)
82-
assert(count==1)
83-
}
72+
test("messsaged from timeouts must be appear during attempt to write to filled buffered channel") {
73+
val ch = gopherApi.makeChannel[Int](1)
74+
val (chReady, chTimeout) = ch.withOutputTimeouts(150 milliseconds)
75+
@volatile var count = 1
76+
val f = gopherApi.select.forever {
77+
case x: chReady.write if (x==count) =>
78+
{};
79+
count += 1
80+
case t: chTimeout.read =>
81+
implicitly[FlowTermination[Unit]].doExit(count)
82+
}
83+
f map { _ => assert(count==2) }
84+
}
85+
86+
test("when we have where to write -- no timeouts") {
87+
val ch = gopherApi.makeChannel[Int](1)
88+
val (chReady, chTimeout) = ch.withOutputTimeouts(300 milliseconds)
89+
val f = gopherApi.select.once {
90+
case x: chReady.write if (x==1) => 1
91+
case t: chTimeout.read => 2
92+
}
93+
f map { r => assert(r == 1) }
94+
}
95+
96+
97+
98+
def gopherApi = CommonTestObjects.gopherApi
99+
100+
101+
}
102+
103+
class IOSyncTimeoutsSuite extends FunSuite with Waiters {
104+
105+
import scala.concurrent.ExecutionContext.Implicits.global
84106

85-
test("messsaged from timeouts must be appear during attempt to write to filled buffered channel") {
86-
val ch = gopherApi.makeChannel[Int](1)
87-
val (chReady, chTimeout) = ch.withOutputTimeouts(150 milliseconds)
88-
@volatile var count = 1
89-
val f = gopherApi.select.forever {
90-
case x: chReady.write if (x==count) =>
91-
{};
92-
count += 1
93-
case t: chTimeout.read =>
94-
implicitly[FlowTermination[Unit]].doExit(count)
95-
}
96-
Await.ready(f, 3 second)
97-
assert(count==2)
98-
}
99107

100-
test("when we have where to write -- no timeouts") {
101-
val ch = gopherApi.makeChannel[Int](1)
102-
val (chReady, chTimeout) = ch.withOutputTimeouts(300 milliseconds)
103-
val f = gopherApi.select.once {
104-
case x: chReady.write if (x==1) => 1
105-
case t: chTimeout.read => 2
106-
}
107-
val r = Await.result(f, 1 second)
108-
assert(r == 1)
109-
}
110-
111108
test("on output close it's timeout channel also must close") {
112109
val ch = gopherApi.makeChannel[Int](1)
113110
val (chReady, chTimeout) = ch.withOutputTimeouts(300 milliseconds)

src/test/scala/gopher/transputers/ReplicateSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ class ReplicateSuite extends FunSuite
112112
( r.in.distribute( (_ % 37 ) ).
113113
out.share()
114114
)
115-
val inChannel = gopherApi.makeChannel[Int](10);
116-
val outChannel = gopherApi.makeChannel[Int](10);
115+
val inChannel = gopherApi.makeChannel[Int](10)
116+
val outChannel = gopherApi.makeChannel[Int](10)
117117
r.in.connect(inChannel)
118118
r.out.connect(outChannel)
119119
val f0 = r.start()
@@ -135,7 +135,7 @@ class ReplicateSuite extends FunSuite
135135
}
136136

137137

138-
test("WordCount must be replicated and accessbke via *! ports side") {
138+
test("WordCount must be replicated and accessbke via *! ports side", Now) {
139139
//pending
140140
import PortAdapters._
141141
val nReplics = 2

0 commit comments

Comments
 (0)
0