मैं अक्का अभिनेताओं में गलती सहनशील व्यवहार पाने का प्रयास कर रहा हूं। मैं कुछ कोड पर काम कर रहा हूं जो लंबे समय तक प्रसंस्करण के लिए उपलब्ध सिस्टम में अभिनेताओं पर निर्भर करता है। मुझे लगता है कि मेरी प्रसंस्करण कुछ घंटों के बाद बंद हो जाती है (इसमें लगभग 10 घंटे लग सकते हैं) और बहुत कुछ नहीं हो रहा है। मेरा मानना है कि मेरे अभिनेता अपवादों से ठीक नहीं हो रहे हैं।मैं अक्का अभिनेता गलती सहनशीलता कैसे स्थापित करूं?
अभिनेताओं को एक आधार पर स्थायी रूप से पुन: प्रारंभ करने के लिए मुझे क्या करने की आवश्यकता है? मैं उम्मीद यह इस दस्तावेज़ http://akka.io/docs/akka/1.1.3/scala/fault-tolerance
मैं अक्का 1.1.3 और स्केला के साथ काम कर रहा हूँ 2,9
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._
object TestActor {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
.setCorePoolSize(100)
.setMaxPoolSize(100)
.build
}
class TestActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.dispatcher = TestActor.dispatcher
def receive = {
case num: Integer => {
if(num % 2 == 0)
throw new Exception("This is a simulated failure")
println("Actor: " + name + " Received: " + num)
//Thread.sleep(100)
}
}
override def postStop(){
println("TestActor post Stop ")
}
//callback method for restart handling
override def preRestart(reason: Throwable){
println("TestActor "+ name + " restaring after shutdown because of " + reason)
}
//callback method for restart handling
override def postRestart(reason: Throwable){
println("Restaring TestActor "+name+"after shutdown because of " + reason)
}
}
trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
val testActors: List[ActorRef]
val seq = new CyclicIterator[ActorRef](testActors)
}
trait TestActorManager extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
val testActors: List[ActorRef]
override def preStart = testActors foreach { self.startLink(_) }
override def postStop = { System.out.println("postStop") }
}
object FaultTest {
def main(args : Array[String]) : Unit = {
println("starting FaultTest.main()")
val numOfActors = 5
val supervisor = actorOf(
new TestActorManager with CyclicLoadBalancing {
val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
}
)
supervisor.start();
println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length)
val testActor = Actor.registry.actorsFor(classOf[TestActor]).head
(1 until 200 toList) foreach { testActor ! _ }
}
}
इस कोड को एक LoadBalancer पीछे 5 अभिनेता सेट कि सिर्फ पूर्णांकों हैं कि प्रिंट आउट से किया जा सकता है कि उन्हें भेजा गया है कि वे गलतियों को अनुकरण करने के लिए भी संख्याओं पर अपवाद फेंक देते हैं। 200 से इंटीग्रर्स इन अभिनेताओं को भेजे जाते हैं। मैं उम्मीद करता हूं कि अजीब संख्याएं उत्पादन प्राप्त करेंगी लेकिन कुछ भी संख्याओं पर कुछ दोषों के बाद सबकुछ बंद हो रहा है। इस उत्पादन में एसबीटी परिणामों के साथ इस कोड चल रहा है:
[info] Running FaultTest
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info]
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM
क्या मुझे लगता है कि यहाँ क्या हो रहा है कि 5 अभिनेताओं प्रारंभ करें, और पहले 5 भी संख्या के लिए उन्हें कारोबार से बाहर रख दिया और वे पुनः प्रारंभ नहीं हो रहे है।
यह कोड कैसे बदला जा सकता है ताकि अभिनेता अपवादों से ठीक हो जाएं?
मुझे उम्मीद है कि यह वास्तव में सभी विषम संख्याओं को 1 से 200 तक प्रिंट करेगा। मुझे लगता है कि प्रत्येक अभिनेता भी संख्याओं पर असफल होगा लेकिन अपवादों पर एक बरकरार मेलबॉक्स के साथ पुनरारंभ होगा। मैं preRestart और postRestart से println देखने की उम्मीद है। इन चीजों को होने के लिए इस कोड नमूने में कॉन्फ़िगर करने की क्या आवश्यकता है?
यहां अक्का और अभिनेताओं के बारे में कुछ अतिरिक्त धारणाएं हैं जो मेरी गलतफहमी का कारण बन सकती हैं। मुझे लगता है कि एक अभिनेता को पर्यवेक्षक या एक गलती के साथ कॉन्फ़िगर किया जा सकता है ताकि इसे फिर से शुरू किया जा सके और प्राप्त होने के दौरान अपवाद फेंकने पर उपलब्ध रहे। मुझे लगता है कि अगर अभिनेता को भेजा गया संदेश खो जाएगा तो यह प्राप्त होने के दौरान अपवाद फेंकता है। मुझे लगता है कि अपवाद फेंकने वाले अभिनेता पर प्रीरस्टार्ट() और पोस्ट रीस्टार्ट() कहा जाएगा।
कोड उदाहरण मैं क्या करने की कोशिश कर रहा हूँ और पर Why is my Dispatching on Actors scaled down in Akka?
** एक और कोड नमूना **
यहाँ एक और कोड नमूना है कि और अधिक सरल है आधारित है प्रतिनिधित्व करता है। मैं एक अभिनेता शुरू कर रहा हूं जो संख्याओं पर भी अपवाद फेंकता है। रास्ते में कोई लोड बैलेंसर या अन्य सामान नहीं है। मैं अभिनेता के बारे में जानकारी प्रिंट करने का प्रयास कर रहा हूं। मैं अभिनेता को संदेश भेजे जाने के बाद एक मिनट के लिए कार्यक्रम से बाहर निकलने का इंतजार कर रहा हूं और क्या हो रहा है इसकी निगरानी कर रहा हूं।
मुझे उम्मीद है कि यह विषम संख्याओं को प्रिंट करेगा, लेकिन ऐसा लगता है कि अभिनेता अपने मेलबॉक्स में संदेशों के साथ बैठता है।
क्या मेरे पास OneForOneStrategy गलत है? क्या मुझे अभिनेता को कुछ जोड़ने की ज़रूरत है? क्या इस तरह की कॉन्फ़िगरेशन मूल रूप से मेरे हिस्से पर गलत दिशा निर्देशित है? क्या डिस्पैचर को कुछ गलती सहनशीलता के साथ स्थापित करने की आवश्यकता है? क्या मैं डिस्पैचर में धागे को गड़बड़ कर सकता हूं?उत्पादन
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._
class SingleActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
def receive = {
case num: Integer => {
if(num % 2 == 0)
throw new Exception("This is a simulated failure, where does this get logged?")
println("Actor: " + name + " Received: " + num)
}
}
override def postStop(){
println("TestActor post Stop ")
}
override def preRestart(reason: Throwable){
println("TestActor "+ name + " restaring after shutdown because of " + reason)
}
override def postRestart(reason: Throwable){
println("Restaring TestActor "+name+"after shutdown because of " + reason)
}
}
object TestSingleActor{
def main(args : Array[String]) : Unit = {
println("starting TestSingleActor.main()")
val testActor = Actor.actorOf(new SingleActor(1)).start()
println("number of actors: " + registry.actors.size)
printAllActorsInfo
(1 until 20 toList) foreach { testActor ! _ }
for(i <- 1 until 120){
Thread.sleep(500)
printAllActorsInfo
}
}
def printAllActorsInfo() ={
registry.actors.foreach((a) =>
println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
.format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
}
}
मैं हो रही है की तरह:
[info] Running TestSingleActor
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false
... 117 more of these lines repeted ...
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false
[info] == run ==
[success] Successful.
[info]
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM
अगर मैं() मैं मुख्य के अंत में एक Trhead.sleep (100000) जोड़ें: '[जानकारी]() FaultTest रनिंग FaultTest.main शुरू कर आवेदन classpath से लोड हो रहा है config [akka.conf]। अभिनेताओं की संख्या: 5 अभिनेता: 0 प्राप्त: 1 अभिनेता: 4 प्राप्त: 3 अभिनेता: 1 प्राप्त: 7 अभिनेता: 1 प्राप्त: 9' और आउटपुट रोकता है लेकिन अतिरिक्त संख्याएं मुद्रित नहीं होती हैं। मैंने एप्लिकेशन से बाहर निकलने का इंतजार नहीं किया लेकिन 30-40sec के बाद कुछ भी नहीं था। इसके अलावा, अगर मैं गलती को हटा देता हूं तो संख्याएं 2 सेकंड से कम समय में बहुत तेजी से प्रिंट होती हैं। –