8000 implement growing buffer channel · rssh/scala-gopher@a0ea6a4 · GitHub
[go: up one dir, main page]

Skip to content

Commit a0ea6a4

Browse files
committed
implement growing buffer channel
1 parent 000d10e commit a0ea6a4

File tree

3 files changed

+82
-41
lines changed

3 files changed

+82
-41
lines changed

src/main/scala/gopher/channels/ActorBackedChannel.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class ActorBackedChannel[A](futureChannelRef: Future[ActorRef], override val api
6969
closed=true
7070
}
7171

72+
7273
override protected def finalize(): Unit =
7374
{
7475
// allow channel actor be grabage collected

src/main/scala/gopher/channels/BufferedChannelActor.scala

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import gopher._
1010
/**
1111
* ChannelActor - actor, which leave
1212
*/
13-
class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends ChannelActor[A](id,api)
13+
class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends BaseBufferedChannelActor[A](id,api)
1414
{
1515

1616

@@ -51,21 +51,7 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Cha
5151
}
5252

5353

54-
protected[this] def getNElements(): Int = nElements;
55-
56-
57-
def processReaders() : Boolean =
58-
{
59-
var retval = false
60-
while(!readers.isEmpty && nElements > 0) {
61-
val current = readers.head
62-
readers = readers.tail
63-
retval ||= processReader(current)
64-
}
65-
retval
66-
}
67-
68-
private[this] def processReader[B](reader:ContRead[A,B]): Boolean =
54+
protected[this] def processReader[B](reader:ContRead[A,B]): Boolean =
6955
reader.function(reader) match {
7056
case Some(f1) =>
7157
val readedElement = elementAt(readIndex)
@@ -81,12 +67,6 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Cha
8167
false
8268
}
8369

84-
private[this] def processReaderClosed[B](reader:ContRead[A,B]): Boolean =
85-
reader.function(reader) match {
86-
case Some(f1) => api.continue(f1(ContRead.ChannelClosed), reader.flowTermination)
87-
true
88-
case None => false
89-
}
9070

9171
def checkWriters: Boolean =
9272
{
@@ -114,24 +94,6 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Cha
11494
}
11595

11696

117-
def stopIfEmpty: Boolean =
118-
{
119-
require(closed==true)
120-
if (nElements == 0) {
121-
stopReaders()
122-
}
123-
stopWriters()
124-
if (nElements == 0) {
125-
if (nRefs == 0) {
126-
// here we leave 'closed' channels in actor-system untile they will be
127-
// garbage-collected. TODO: think about actual stop ?
128-
self ! GracefullChannelStop
129-
}
130-
true
131-
} else
132-
false
133-
}
134-
13597
@inline
13698
private[this] def elementAt(i:Int): A =
13799
buffer(i).asInstanceOf[A]
@@ -145,6 +107,5 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Cha
145107
val buffer= new Array[AnyRef](capacity+1)
146108
var readIndex=0
147109
var writeIndex=0
148-
var nElements=0
149110

150111
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package gopher.channels
2+
3+
import akka.actor._
4+
import scala.language._
5+
import scala.concurrent._
6+
import scala.collection.immutable._
7+
import gopher._
8+
9+
class ChannelOverflowException extends RuntimeException
10+
11+
/**
12+
* ChannelActor - actor, which leave
13+
*/
14+
class GrowingBufferedChannelActor[A](id:Long, limit:Int, api: GopherAPI) extends BaseBufferedChannelActor[A](id,api)
15+
{
16+
17+
18+
protected[this] def onContWrite(cwa: gopher.channels.ContWrite[A, _]): Unit =
19+
{
20+
if (closed) {
21+
cwa.flowTermination.throwIfNotCompleted(new ChannelClosedException())
22+
} else {
23+
val prevNElements = nElements
24+
if (processWriter(cwa) && prevNElements==0) {
25+
processReaders()
26+
}
27+
}
28+
}
29+
30+
protected[this] def onContRead(cra: gopher.channels.ContRead[A, _]): Unit =
31+
{
32+
if (nElements==0) {
33+
if (closed) {
34+
processReaderClosed(cra)
35+
} else {
36+
readers = readers :+ cra
37+
}
38+
} else {
39+
val prevNElements = nElements
40+
if (processReader(cra)) {
41+
if (closed) {
42+
stopIfEmpty
43+
}
44+
}
45+
}
46+
}
47+
48+
49+
protected[this] def processReader[B](reader:ContRead[A,B]): Boolean =
50+
reader.function(reader) match {
51+
case Some(f1) =>
52+
val readedElement = buffer.head.asInstanceOf[A]
53+
buffer = buffer.tail
54+
nElements-=1
55+
Future{
56+
val cont = f1(ContRead.In value readedElement )
57+
api.continue(cont, reader.flowTermination)
58+
}(ap A8E7 i.executionContext)
59+
true
60+
case None =>
61+
false
62+
}
63+
64+
65+
private[this] def processWriter[B](writer:ContWrite[A,B]): Boolean =
66+
writer.function(writer) match {
67+
case Some((a,cont)) =>
68+
nElements+=1
69+
buffer = buffer :+ a
70+
api.continue(cont, writer.flowTermination)
71+
true
72+
case None =>
73+
false
74+
}
75+
76+
77+
var buffer= Queue[Any]()
78+
79+
}

0 commit comments

Comments
 (0)
0