2013-01-02 9 views
6

क्या यह कहना सही है कि असुरक्षित अभिनेता ईवेंट स्ट्रीम में सदस्यता लेते हैं? कम से कम, मुझे अक्का के साथ प्रयोग करने से मिलता है ...कमजोर-संदर्भित इवेंटबस कलाकारों को कार्यान्वित करें?

मैं इवेंटबस परिदृश्य में अभिनेताओं के लिए कमजोर संदर्भ प्रस्तुत करने की कोशिश कर रहा हूं। उन मामलों में घटना श्रोताओं/अभिनेता आम तौर पर आते हैं और जाते हैं। स्वतंत्र अभिनेताओं के विपरीत जो हर समय उपस्थित होना चाहिए। निश्चित रूप से स्पष्ट अनियंत्रण काम करता है। लेकिन मैं हमेशा ऐसा करने के लिए सही पल को समझने में सक्षम नहीं हूं।

क्या अक्का ऐसे उपयोग मामले में प्रदान करता है?

val as = ActorSystem.create("weak") 
var actor = as.actorOf(Props[ExceptionHandler]) 
as.eventStream.subscribe(actor,classOf[Exception]) 

// an event is published & received 
as.eventStream.publish(new KnownProblem) 

//session expires or whatever that makes the actor redundant 
actor = null 
(1 to 30).foreach(_ => System.gc) 

// an event is published & STILL received 
as.eventStream.publish(new KnownProblem) 
+0

'' 'का उपयोग कर classOf [अपवाद]' वर्गीकारक EventStream' से unsubscribe' अभिनेता? – idonnie

+0

यह काम करेगा। अगर मैं जानता हूं कि अभिनेता को अब किसी भी तरह से संदर्भित नहीं किया गया है। :-) मैं इस परिदृश्य को http सत्र के लिए कोशिश कर रहा हूं। जब ऐप सर्वर सत्र समाप्त हो जाता है तो मेरे पास 'सदस्यता समाप्त करने' का कोई अवसर नहीं है। आमतौर पर यह कमजोर संदर्भों के साथ किया जाता है। –

+0

इवेंटबस 'एक्टोर सिस्टम' से 'डीफ इवेंटस्ट्रीम' के साथ जुड़ा हुआ है, कुछ घटनाएं कॉन्फ़िगरेशन के दौरान 'ईवेंटस्ट्रीम' पर भी प्रकाशित होती हैं। मैं 'WeakReference [ActorRef] '-s की सदस्यता के साथ' EventBus 'का विस्तार करने का सुझाव दूंगा, विशेषता' लुकअप क्लासिफिकेशन 'आशाजनक लग रहा है। लेखक की ओर से एक उद्धरण: 'EventBus बढ़ाएँ और https://groups.google.com/forum/?fromgroups=#!topic/akka-user/T3-FONxoX8E अक्का EventBus सरल उदाहरण https implement.': //gist.github.com/3163791 – idonnie

उत्तर

0

ठीक है, मैं वास्तव में इसे लागू नहीं कर सका, लेकिन अभिनेता जीसी पर रोक रहा है। स्कैला 2.9.2 (आरईपीएल) + अक्का 2.0.3 का उपयोग करना।

EventBusWeakReference[ActorRef] साथ मदद नहीं की - क्योंकि अक्का में आप भी एक dungeonChildrenContainer (self.children) के साथ, यह भी वहाँ जीवन चक्र घटनाओं के लिए Monitor सदस्यता हो सकता है। जिस चीज को मैंने कोशिश नहीं की - प्रेषक के साथ अभिनेताओं को बनाना जो केवल हमारे नए चमकदार WeakEventBus के बारे में जानता है - तो शायद मैं इस बिंदु को याद कर सकता हूं?

यहाँ चला जाता है आरईपीएल के लिए कोड (उचित आयात के साथ शुरू करते हैं, और :paste 2 में यह कदम):

// Start REPL with something like: 
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar: 
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar: 
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar: 
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar: 
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar" 

// :paste 1/2 
import akka.actor._ 
import akka.pattern._ 
import akka.event._ 
import akka.util._ 
import com.typesafe.config.ConfigFactory 
import akka.util.Timeout 
import akka.dispatch.Await 
import scala.ref.WeakReference 
import java.util.Comparator 
import java.util.concurrent.atomic._ 
import java.util.UUID 

case class Message(val id:String,val timestamp: Long) 
case class PostMessage(
    override val id:String=UUID.randomUUID().toString(), 
    override val timestamp: Long=new java.util.Date().getTime(), 
    text:String) extends Message(id, timestamp) 
case class MessageEvent(val channel:String, val message:Message) 

case class StartServer(nodeName: String) 
case class ServerStarted(nodeName: String, actor: ActorRef) 
case class IsAlive(nodeName: String) 
case class IsAliveWeak(nodeName: String) 
case class AmAlive(nodeName: String, actor: ActorRef) 
case class GcCheck() 
case class GcCheckScheduled(isScheduled: Boolean, 
    gcFlag: WeakReference[AnyRef]) 

trait WeakLookupClassification { this: WeakEventBus ⇒ 
protected final val subscribers = new Index[Classifier, 
    WeakReference[Subscriber]](mapSize(), 
    new Comparator[WeakReference[Subscriber]] { 
      def compare(a: WeakReference[Subscriber], 
     b: WeakReference[Subscriber]): Int = { 
       if (a.get == None || b.get == None) -1 
       else compareSubscribers(a.get.get, b.get.get) 
     } 
     }) 
protected def mapSize(): Int 
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int 
protected def classify(event: Event): Classifier 
protected def publish(event: Event, subscriber: Subscriber): Unit 
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = 
    subscribers.put(to, new WeakReference(subscriber)) 
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = 
    subscribers.remove(from, new WeakReference(subscriber)) 
def unsubscribe(subscriber: Subscriber): Unit = 
    subscribers.removeValue(new WeakReference(subscriber)) 
def publish(event: Event): Unit = { 
     val i = subscribers.valueIterator(classify(event)) 
     while (i.hasNext) publish(event, i.next().get.get) 
} 
    } 

class WeakEventBus extends EventBus with WeakLookupClassification { 
    type Event = MessageEvent 
    type Classifier=String 
    type Subscriber = ActorRef 

    protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b 

    protected def mapSize(): Int = 10 
    protected def classify(event: Event): Classifier = event.channel 
    protected def publish(event: Event, subscriber: Subscriber): Unit = 
     subscriber ! event 
} 

lazy val weakEventBus = new WeakEventBus 

implicit val timeout = akka.util.Timeout(1000) 
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString(""" 
akka { 
    loglevel = "DEBUG" 
    actor { 
     provider = "akka.remote.RemoteActorRefProvider" 
     debug { 
      receive = on 
      autoreceive = on   
      lifecycle = on 
      event-stream = on 
     } 
    } 
    remote { 
     transport = "akka.remote.netty.NettyRemoteTransport" 
     log-sent-messages = on 
     log-received-messages = on  
    } 
} 
serverconf { 
    include "common" 
    akka { 
     actor { 
      deployment { 
     /root { 
      remote = "akka://[email protected]:2552" 
     }  
      } 
     } 
     remote { 
      netty { 
     hostname = "127.0.0.1" 
     port = 2552 
      } 
     } 
    } 
} 
""").getConfig("serverconf")) 

class Server extends Actor { 
    private[this] val scheduled = new AtomicBoolean(false) 
    private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]() 

    val gcCheckPeriod = Duration(5000, "millis") 

    override def preRestart(reason: Throwable, message: Option[Any]) { 
     self ! GcCheckScheduled(scheduled.get, gcFlagRef.get) 
     super.preRestart(reason, message) 
    } 

    def schedule(period: Duration, who: ActorRef) = 
     actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck) 

    def receive = {  
     case StartServer(nodeName) => 
      sender ! ServerStarted(nodeName, self) 
      if (scheduled.compareAndSet(false, true)) 
     schedule(gcCheckPeriod, self) 
      val gcFlagObj = new AnyRef()    
      gcFlagRef.set(new WeakReference(gcFlagObj)) 
      weakEventBus.subscribe(self, nodeName) 
      actorSystem.eventStream.unsubscribe(self)  
     case GcCheck => 
      val gcFlag = gcFlagRef.get 
      if (gcFlag == null) { 
     sys.error("gcFlag") 
      } 
     gcFlag.get match { 
     case Some(gcFlagObj) => 
      scheduled.set(true) 
      schedule(gcCheckPeriod, self) 
     case None => 
      println("Actor stopped because of GC: " + self) 
      context.stop(self)   
     } 
     case GcCheckScheduled(isScheduled, gcFlag) => 
      if (isScheduled && scheduled.compareAndSet(false, isScheduled)) { 
     gcFlagRef.compareAndSet(null, gcFlag) 
     schedule(gcCheckPeriod, self)    
      } 
     case IsAlive(nodeName) => 
      println("Im alive (default EventBus): " + nodeName) 
      sender ! AmAlive(nodeName, self) 
     case e: MessageEvent => 
      println("Im alive (weak EventBus): " + e)  
    } 
} 

// :paste 2/2 
class Root extends Actor { 
    def receive = { 
     case start @ StartServer(nodeName) => 
     val server = context.actorOf(Props[Server], nodeName) 
     context.watch(server) 
     Await.result(server ? start, timeout.duration) 
     .asInstanceOf[ServerStarted] match { 
     case started @ ServerStarted(nodeName, _) => 
      sender ! started 
     case _ => 
      throw new RuntimeException(
      "[S][FAIL] Could not start server: " + start) 
     } 
     case isAlive @ IsAlive(nodeName) => 
     Await.result(context.actorFor(nodeName) ? isAlive, 
     timeout.duration).asInstanceOf[AmAlive] match { 
     case AmAlive(nodeName, _) => 
      println("[S][SUCC] Server is alive : " + nodeName) 
     case _ => 
     throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)  
      } 
     case isAliveWeak @ IsAliveWeak(nodeName) =>     
     actorSystem.eventStream.publish(MessageEvent(nodeName, 
     PostMessage(text="isAlive-default"))) 
     weakEventBus.publish(MessageEvent(nodeName, 
     PostMessage(text="isAlive-weak"))) 
} 
    } 

lazy val rootActor = actorSystem.actorOf(Props[Root], "root") 

object Root { 
    def start(nodeName: String) = { 
     val msg = StartServer(nodeName) 
     var startedActor: Option[ActorRef] = None 
     Await.result(rootActor ? msg, timeout.duration) 
     .asInstanceOf[ServerStarted] match { 
      case succ @ ServerStarted(nodeName, actor) => 
      println("[S][SUCC] Server started: " + succ) 
      startedActor = Some(actor) 
      case _ => 
     throw new RuntimeException("[S][FAIL] Could not start server: " + msg) 
      } 
     startedActor 
    } 
    def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName) 
    def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName) 
} 

//////////////// 
// actual test 
Root.start("weak") 
Thread.sleep(7000L) 
System.gc() 
Root.isAlive("weak") 
संबंधित मुद्दे