8000 1. more async tests. · rssh/scala-gopher@dcb1aeb · GitHub
[go: up one dir, main page]

Skip to content

Commit dcb1aeb

Browse files
committed
1. more async tests.
2. start to implement time API
1 parent 3a4419c commit dcb1aeb

File tree

6 files changed

+271
-211
lines changed

6 files changed

+271
-211
lines changed

src/main/scala/gopher/GopherAPI.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
107107
**/
108108
def config: Config = as.settings.config.atKey("gopher")
109109

110+
/**
111+
* time API
112+
*/
113+
lazy val time = new Time(this,gopherExecutionContext)
114+
110115
lazy val idleTimeout: FiniteDuration = {
111116
val m = try {
112117
config.getInt("idle-detection-tick")
@@ -118,6 +123,7 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
118123

119124
def currentFlow = CurrentFlowTermination
120125

126+
121127
//private[gopher] val idleDetector = new IdleDetector(this)
122128

123129
private[gopher] val continuatedProcessorRef: ActorRef = {

src/main/scala/gopher/Time.scala

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package gopher
2+
3+
import java.time.LocalDateTime
4+
5+
import gopher.channels.{Channel, Input, OneTimeChannel}
6+
7+
import scala.concurrent.{ExecutionContext, Future, Promise}
8+
import scala.concurrent.duration.FiniteDuration
9+
import scala.language.experimental.macros
10+
import scala.reflect.macros.blackbox.Context
11+
12+
/**
13+
* Time API
14+
*
15+
* @see gopherApi#time
16+
*/
17+
class Time(gopherAPI: GopherAPI, ec:ExecutionContext) {
18+
19+
implicit def executionContext = ec
20+
21+
def after(duration: FiniteDuration): Input[LocalDateTime] =
22+
{
23+
val ch = OneTimeChannel.apply[LocalDateTime]()(gopherAPI)
24+
gopherAPI.actorSystem.scheduler.scheduleOnce(duration){
25+
ch.awrite(LocalDateTime.now())
26+
}
27+
ch
28+
}
29+
30+
def asleep(duration: FiniteDuration): Future[LocalDateTime] =
31+
{
32+
val p = Promise[LocalDateTime]()
33+
gopherAPI.actorSystem.scheduler.scheduleOnce(duration){
34+
p success LocalDateTime.now()
35+
}
36+
p.future
37+
}
38+
39+
def sleep(duration: FiniteDuration): LocalDateTime = macro Time.sleepImpl
40+
41+
def tick(duration: FiniteDuration): Input[LocalDateTime] =
42+
{
43+
newTicker(duration)
44+
}
45+
46+
def newTicker(duration: FiniteDuration): Channel[LocalDateTime] =
47+
{
48+
val ch = gopherAPI.makeChannel[LocalDateTime]()
49+
gopherAPI.actorSystem.scheduler.schedule(duration,duration) {
50+
//TODO: make expired.
51+
ch.awrite(LocalDateTime.now())
52+
}
53+
ch
54+
}
55+
56+
57+
}
58+
59+
object Time
60+
{
61+
62+
def sleepImpl(c:Context)(duration:c.Expr[FiniteDuration]):c.Expr[LocalDateTime] =
63+
{
64+
import c.universe._
65+
val r = q"scala.async.Await.await(${c.prefix}.asleep(${duration}))(${c.prefix}.executionContext)"
66+
c.Expr[LocalDateTime](r)
67+
}
68+
69+
}

src/main/scala/gopher/channels/ActorBackedChannel.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@ package gopher.channels
33

44
import akka.actor._
55
import akka.pattern._
6+
import gopher._
7+
import gopher.channels.ContRead.In
68

79
import scala.concurrent._
810
import scala.concurrent.duration._
9-
import scala.util._
1011
import scala.language.experimental.macros
1112
import scala.language.postfixOps
12-
import scala.reflect.macros.blackbox.Context
13-
import scala.reflect.api._
14-
import gopher._
15-
import gopher.channels.ContRead.In
13+
import scala.util._
1614

1715
class ActorBackedChannel[A](futureChannelRef: Future[ActorRef], override val api: GopherAPI) extends Channel[A]
1816
{
@@ -34,7 +32,7 @@ class ActorBackedChannel[A](futureChannelRef: Future[ActorRef], override val api
3432
implicit val ec = api.gopherExecutionContext
3533
if (closed) {
3634
if (closedEmpty) {
37-
applyClosed();
35+
applyClosed()
3836
} else {
3937
// TODO: ask timeput on closed channel set in config.
4038
futureChannelRef.foreach{ ref => val f = ref.ask(ClosedChannelRead(cont))(5 seconds)

src/test/scala/gopher/channels/InputOpsSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import scala.language._
1010

1111
class InputOpsSuite extends AsyncFunSuite {
1212

13+
override implicit def executionContext = ExecutionContext.global
1314

1415
test("map operation for input") {
1516
val ch = gopherApi.makeChannel[String]()

0 commit comments

Comments
 (0)
0