2011-08-16 16 views
8

मैं अक्का अभिनेताओं में गलती सहनशील व्यवहार पाने का प्रयास कर रहा हूं। मैं कुछ कोड पर काम कर रहा हूं जो लंबे समय तक प्रसंस्करण के लिए उपलब्ध सिस्टम में अभिनेताओं पर निर्भर करता है। मुझे लगता है कि मेरी प्रसंस्करण कुछ घंटों के बाद बंद हो जाती है (इसमें लगभग 10 घंटे लग सकते हैं) और बहुत कुछ नहीं हो रहा है। मेरा मानना ​​है कि मेरे अभिनेता अपवादों से ठीक नहीं हो रहे हैं।मैं अक्का अभिनेता गलती सहनशीलता कैसे स्थापित करूं?

अभिनेताओं को एक आधार पर स्थायी रूप से पुन: प्रारंभ करने के लिए मुझे क्या करने की आवश्यकता है? मैं उम्मीद यह इस दस्तावेज़ http://akka.io/docs/akka/1.1.3/scala/fault-tolerance

मैं अक्का 1.1.3 और स्केला के साथ काम कर रहा हूँ 2,9

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached 
import akka.dispatch.Dispatchers 
import akka.routing.CyclicIterator 
import akka.routing.LoadBalancer 
import akka.config.Supervision._ 


object TestActor { 
    val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") 
        .setCorePoolSize(100) 
        .setMaxPoolSize(100) 
        .build 
} 

class TestActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.dispatcher = TestActor.dispatcher 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure") 
     println("Actor: " + name + " Received: " + num) 
     //Thread.sleep(100) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    //callback method for restart handling 
    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    //callback method for restart handling 
    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

trait CyclicLoadBalancing extends LoadBalancer { this: Actor => 
    val testActors: List[ActorRef] 
    val seq = new CyclicIterator[ActorRef](testActors) 
} 

trait TestActorManager extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000) 
    val testActors: List[ActorRef] 
    override def preStart = testActors foreach { self.startLink(_) } 
    override def postStop = { System.out.println("postStop") } 
} 


    object FaultTest { 
    def main(args : Array[String]) : Unit = { 
     println("starting FaultTest.main()") 
     val numOfActors = 5 
     val supervisor = actorOf(
     new TestActorManager with CyclicLoadBalancing { 
      val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i))); 
     } 
    ) 

     supervisor.start(); 

     println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) 

     val testActor = Actor.registry.actorsFor(classOf[TestActor]).head 

     (1 until 200 toList) foreach { testActor ! _ } 

    } 
    } 

इस कोड को एक LoadBalancer पीछे 5 अभिनेता सेट कि सिर्फ पूर्णांकों हैं कि प्रिंट आउट से किया जा सकता है कि उन्हें भेजा गया है कि वे गलतियों को अनुकरण करने के लिए भी संख्याओं पर अपवाद फेंक देते हैं। 200 से इंटीग्रर्स इन अभिनेताओं को भेजे जाते हैं। मैं उम्मीद करता हूं कि अजीब संख्याएं उत्पादन प्राप्त करेंगी लेकिन कुछ भी संख्याओं पर कुछ दोषों के बाद सबकुछ बंद हो रहा है। इस उत्पादन में एसबीटी परिणामों के साथ इस कोड चल रहा है:

[info] Running FaultTest 
starting FaultTest.main() 
Loading config [akka.conf] from the application classpath. 
Number of Actors: 5 
Actor: 2 Received: 1 
Actor: 2 Received: 9 
Actor: 1 Received: 3 
Actor: 3 Received: 7 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM 

क्या मुझे लगता है कि यहाँ क्या हो रहा है कि 5 अभिनेताओं प्रारंभ करें, और पहले 5 भी संख्या के लिए उन्हें कारोबार से बाहर रख दिया और वे पुनः प्रारंभ नहीं हो रहे है।

यह कोड कैसे बदला जा सकता है ताकि अभिनेता अपवादों से ठीक हो जाएं?

मुझे उम्मीद है कि यह वास्तव में सभी विषम संख्याओं को 1 से 200 तक प्रिंट करेगा। मुझे लगता है कि प्रत्येक अभिनेता भी संख्याओं पर असफल होगा लेकिन अपवादों पर एक बरकरार मेलबॉक्स के साथ पुनरारंभ होगा। मैं preRestart और postRestart से println देखने की उम्मीद है। इन चीजों को होने के लिए इस कोड नमूने में कॉन्फ़िगर करने की क्या आवश्यकता है?

यहां अक्का और अभिनेताओं के बारे में कुछ अतिरिक्त धारणाएं हैं जो मेरी गलतफहमी का कारण बन सकती हैं। मुझे लगता है कि एक अभिनेता को पर्यवेक्षक या एक गलती के साथ कॉन्फ़िगर किया जा सकता है ताकि इसे फिर से शुरू किया जा सके और प्राप्त होने के दौरान अपवाद फेंकने पर उपलब्ध रहे। मुझे लगता है कि अगर अभिनेता को भेजा गया संदेश खो जाएगा तो यह प्राप्त होने के दौरान अपवाद फेंकता है। मुझे लगता है कि अपवाद फेंकने वाले अभिनेता पर प्रीरस्टार्ट() और पोस्ट रीस्टार्ट() कहा जाएगा।

कोड उदाहरण मैं क्या करने की कोशिश कर रहा हूँ और पर Why is my Dispatching on Actors scaled down in Akka?

** एक और कोड नमूना **

यहाँ एक और कोड नमूना है कि और अधिक सरल है आधारित है प्रतिनिधित्व करता है। मैं एक अभिनेता शुरू कर रहा हूं जो संख्याओं पर भी अपवाद फेंकता है। रास्ते में कोई लोड बैलेंसर या अन्य सामान नहीं है। मैं अभिनेता के बारे में जानकारी प्रिंट करने का प्रयास कर रहा हूं। मैं अभिनेता को संदेश भेजे जाने के बाद एक मिनट के लिए कार्यक्रम से बाहर निकलने का इंतजार कर रहा हूं और क्या हो रहा है इसकी निगरानी कर रहा हूं।

मुझे उम्मीद है कि यह विषम संख्याओं को प्रिंट करेगा, लेकिन ऐसा लगता है कि अभिनेता अपने मेलबॉक्स में संदेशों के साथ बैठता है।

क्या मेरे पास OneForOneStrategy गलत है? क्या मुझे अभिनेता को कुछ जोड़ने की ज़रूरत है? क्या इस तरह की कॉन्फ़िगरेशन मूल रूप से मेरे हिस्से पर गलत दिशा निर्देशित है? क्या डिस्पैचर को कुछ गलती सहनशीलता के साथ स्थापित करने की आवश्यकता है? क्या मैं डिस्पैचर में धागे को गड़बड़ कर सकता हूं?उत्पादन

import akka.actor.Actor 
import akka.actor.Actor._ 
import akka.actor.ActorRef 
import akka.actor.ActorRegistry 
import akka.config.Supervision._ 

class SingleActor(val name: Integer) extends Actor { 
    self.lifeCycle = Permanent 
    self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000) 
    def receive = { 
     case num: Integer => { 
     if(num % 2 == 0) 
      throw new Exception("This is a simulated failure, where does this get logged?") 
     println("Actor: " + name + " Received: " + num) 
     } 
    } 

    override def postStop(){ 
    println("TestActor post Stop ") 
    } 

    override def preRestart(reason: Throwable){ 
    println("TestActor "+ name + " restaring after shutdown because of " + reason) 
    } 

    override def postRestart(reason: Throwable){ 
    println("Restaring TestActor "+name+"after shutdown because of " + reason) 
    } 
} 

object TestSingleActor{ 

    def main(args : Array[String]) : Unit = { 
     println("starting TestSingleActor.main()") 

     val testActor = Actor.actorOf(new SingleActor(1)).start() 

     println("number of actors: " + registry.actors.size) 
     printAllActorsInfo 

     (1 until 20 toList) foreach { testActor ! _ } 

     for(i <- 1 until 120){ 
     Thread.sleep(500) 
     printAllActorsInfo 
     } 
    } 

    def printAllActorsInfo() ={ 
    registry.actors.foreach((a) => 
     println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b " 
       .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted))) 
    } 
} 

मैं हो रही है की तरह:

[info] Running TestSingleActor 
starting TestSingleActor.main() 
Loading config [akka.conf] from the application classpath. 
number of actors: 1 
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false 
Actor: 1 Received: 1 
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 

... 117 more of these lines repeted ... 

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false 
[info] == run == 
[success] Successful. 
[info] 
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM 

उत्तर

5

समस्या यह थी कि मैं अपनी akka.conf फ़ाइल के साथ था। मैं इवेंट-हैंडलर को कॉन्फ़िगर करने वाली रेखा को छोड़कर संदर्भ 1.1.3 akka.conf फ़ाइल का उपयोग कर रहा था।

मेरा (टूटी हुई एक):

event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

संदर्भ 1.1.3 (एक है कि काम करता है):

event-handlers = ["akka.event.EventHandler$DefaultListener"] 
मेरी घटना संचालकों config लाइन के साथ

, अभिनेता पुनरारंभ नहीं होता है। संदर्भ 1.1.3 लाइन पुनरारंभ के साथ आश्चर्यजनक रूप से होता है।

मैं करके उस पृष्ठ में दिए गए सुझावों से छुटकारा और 1.1.3 संदर्भ के लिए वापस जा akka.conf मैं दोष सहिष्णु अभिनेता प्राप्त करने में सक्षम था हो रही इन निर्देशों का http://akka.io/docs/akka/1.1.3/general/slf4j.html

तो के आधार पर यह बदलाव किया।

1

मेरा मानना ​​है कि आपकी समस्या समाप्त हो जाता है के बाद संदेश भेजे जाते हैं, तो आप अपने अतुल्यकालिक आवेदन को जीवित रखने की कोशिश नहीं कर रहे हैं, और इसलिए मुख्य थ्रेड बाहर निकलता है , और इसके साथ सबकुछ नीचे ले जाता है।

+0

अगर मैं() मैं मुख्य के अंत में एक Trhead.sleep (100000) जोड़ें: '[जानकारी]() FaultTest रनिंग FaultTest.main शुरू कर आवेदन classpath से लोड हो रहा है config [akka.conf]। अभिनेताओं की संख्या: 5 अभिनेता: 0 प्राप्त: 1 अभिनेता: 4 प्राप्त: 3 अभिनेता: 1 प्राप्त: 7 अभिनेता: 1 प्राप्त: 9' और आउटपुट रोकता है लेकिन अतिरिक्त संख्याएं मुद्रित नहीं होती हैं। मैंने एप्लिकेशन से बाहर निकलने का इंतजार नहीं किया लेकिन 30-40sec के बाद कुछ भी नहीं था। इसके अलावा, अगर मैं गलती को हटा देता हूं तो संख्याएं 2 सेकंड से कम समय में बहुत तेजी से प्रिंट होती हैं। –

संबंधित मुद्दे