ठीक है, मैं वास्तव में इसे लागू नहीं कर सका, लेकिन अभिनेता जीसी पर रोक रहा है। स्कैला 2.9.2 (आरईपीएल) + अक्का 2.0.3 का उपयोग करना।
EventBus
WeakReference[ActorRef]
साथ मदद नहीं की - क्योंकि अक्का में आप भी एक dungeon
ChildrenContainer
(self.children
) के साथ, यह भी वहाँ जीवन चक्र घटनाओं के लिए Monitor
सदस्यता हो सकता है। जिस चीज को मैंने कोशिश नहीं की - प्रेषक के साथ अभिनेताओं को बनाना जो केवल हमारे नए चमकदार WeakEventBus
के बारे में जानता है - तो शायद मैं इस बिंदु को याद कर सकता हूं?
यहाँ चला जाता है आरईपीएल के लिए कोड (उचित आयात के साथ शुरू करते हैं, और :paste
2 में यह कदम):
// Start REPL with something like:
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar:
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar:
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar"
// :paste 1/2
import akka.actor._
import akka.pattern._
import akka.event._
import akka.util._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import akka.dispatch.Await
import scala.ref.WeakReference
import java.util.Comparator
import java.util.concurrent.atomic._
import java.util.UUID
case class Message(val id:String,val timestamp: Long)
case class PostMessage(
override val id:String=UUID.randomUUID().toString(),
override val timestamp: Long=new java.util.Date().getTime(),
text:String) extends Message(id, timestamp)
case class MessageEvent(val channel:String, val message:Message)
case class StartServer(nodeName: String)
case class ServerStarted(nodeName: String, actor: ActorRef)
case class IsAlive(nodeName: String)
case class IsAliveWeak(nodeName: String)
case class AmAlive(nodeName: String, actor: ActorRef)
case class GcCheck()
case class GcCheckScheduled(isScheduled: Boolean,
gcFlag: WeakReference[AnyRef])
trait WeakLookupClassification { this: WeakEventBus ⇒
protected final val subscribers = new Index[Classifier,
WeakReference[Subscriber]](mapSize(),
new Comparator[WeakReference[Subscriber]] {
def compare(a: WeakReference[Subscriber],
b: WeakReference[Subscriber]): Int = {
if (a.get == None || b.get == None) -1
else compareSubscribers(a.get.get, b.get.get)
}
})
protected def mapSize(): Int
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
protected def classify(event: Event): Classifier
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean =
subscribers.put(to, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean =
subscribers.remove(from, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber): Unit =
subscribers.removeValue(new WeakReference(subscriber))
def publish(event: Event): Unit = {
val i = subscribers.valueIterator(classify(event))
while (i.hasNext) publish(event, i.next().get.get)
}
}
class WeakEventBus extends EventBus with WeakLookupClassification {
type Event = MessageEvent
type Classifier=String
type Subscriber = ActorRef
protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b
protected def mapSize(): Int = 10
protected def classify(event: Event): Classifier = event.channel
protected def publish(event: Event, subscriber: Subscriber): Unit =
subscriber ! event
}
lazy val weakEventBus = new WeakEventBus
implicit val timeout = akka.util.Timeout(1000)
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString("""
akka {
loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
}
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-sent-messages = on
log-received-messages = on
}
}
serverconf {
include "common"
akka {
actor {
deployment {
/root {
remote = "akka://[email protected]:2552"
}
}
}
remote {
netty {
hostname = "127.0.0.1"
port = 2552
}
}
}
}
""").getConfig("serverconf"))
class Server extends Actor {
private[this] val scheduled = new AtomicBoolean(false)
private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]()
val gcCheckPeriod = Duration(5000, "millis")
override def preRestart(reason: Throwable, message: Option[Any]) {
self ! GcCheckScheduled(scheduled.get, gcFlagRef.get)
super.preRestart(reason, message)
}
def schedule(period: Duration, who: ActorRef) =
actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck)
def receive = {
case StartServer(nodeName) =>
sender ! ServerStarted(nodeName, self)
if (scheduled.compareAndSet(false, true))
schedule(gcCheckPeriod, self)
val gcFlagObj = new AnyRef()
gcFlagRef.set(new WeakReference(gcFlagObj))
weakEventBus.subscribe(self, nodeName)
actorSystem.eventStream.unsubscribe(self)
case GcCheck =>
val gcFlag = gcFlagRef.get
if (gcFlag == null) {
sys.error("gcFlag")
}
gcFlag.get match {
case Some(gcFlagObj) =>
scheduled.set(true)
schedule(gcCheckPeriod, self)
case None =>
println("Actor stopped because of GC: " + self)
context.stop(self)
}
case GcCheckScheduled(isScheduled, gcFlag) =>
if (isScheduled && scheduled.compareAndSet(false, isScheduled)) {
gcFlagRef.compareAndSet(null, gcFlag)
schedule(gcCheckPeriod, self)
}
case IsAlive(nodeName) =>
println("Im alive (default EventBus): " + nodeName)
sender ! AmAlive(nodeName, self)
case e: MessageEvent =>
println("Im alive (weak EventBus): " + e)
}
}
// :paste 2/2
class Root extends Actor {
def receive = {
case start @ StartServer(nodeName) =>
val server = context.actorOf(Props[Server], nodeName)
context.watch(server)
Await.result(server ? start, timeout.duration)
.asInstanceOf[ServerStarted] match {
case started @ ServerStarted(nodeName, _) =>
sender ! started
case _ =>
throw new RuntimeException(
"[S][FAIL] Could not start server: " + start)
}
case isAlive @ IsAlive(nodeName) =>
Await.result(context.actorFor(nodeName) ? isAlive,
timeout.duration).asInstanceOf[AmAlive] match {
case AmAlive(nodeName, _) =>
println("[S][SUCC] Server is alive : " + nodeName)
case _ =>
throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)
}
case isAliveWeak @ IsAliveWeak(nodeName) =>
actorSystem.eventStream.publish(MessageEvent(nodeName,
PostMessage(text="isAlive-default")))
weakEventBus.publish(MessageEvent(nodeName,
PostMessage(text="isAlive-weak")))
}
}
lazy val rootActor = actorSystem.actorOf(Props[Root], "root")
object Root {
def start(nodeName: String) = {
val msg = StartServer(nodeName)
var startedActor: Option[ActorRef] = None
Await.result(rootActor ? msg, timeout.duration)
.asInstanceOf[ServerStarted] match {
case succ @ ServerStarted(nodeName, actor) =>
println("[S][SUCC] Server started: " + succ)
startedActor = Some(actor)
case _ =>
throw new RuntimeException("[S][FAIL] Could not start server: " + msg)
}
startedActor
}
def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName)
def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName)
}
////////////////
// actual test
Root.start("weak")
Thread.sleep(7000L)
System.gc()
Root.isAlive("weak")
'' 'का उपयोग कर classOf [अपवाद]' वर्गीकारक EventStream' से unsubscribe' अभिनेता? – idonnie
यह काम करेगा। अगर मैं जानता हूं कि अभिनेता को अब किसी भी तरह से संदर्भित नहीं किया गया है। :-) मैं इस परिदृश्य को http सत्र के लिए कोशिश कर रहा हूं। जब ऐप सर्वर सत्र समाप्त हो जाता है तो मेरे पास 'सदस्यता समाप्त करने' का कोई अवसर नहीं है। आमतौर पर यह कमजोर संदर्भों के साथ किया जाता है। –
इवेंटबस 'एक्टोर सिस्टम' से 'डीफ इवेंटस्ट्रीम' के साथ जुड़ा हुआ है, कुछ घटनाएं कॉन्फ़िगरेशन के दौरान 'ईवेंटस्ट्रीम' पर भी प्रकाशित होती हैं। मैं 'WeakReference [ActorRef] '-s की सदस्यता के साथ' EventBus 'का विस्तार करने का सुझाव दूंगा, विशेषता' लुकअप क्लासिफिकेशन 'आशाजनक लग रहा है। लेखक की ओर से एक उद्धरण: 'EventBus बढ़ाएँ और https://groups.google.com/forum/?fromgroups=#!topic/akka-user/T3-FONxoX8E अक्का EventBus सरल उदाहरण https implement.': //gist.github.com/3163791 – idonnie