2009-06-17 5 views
8

in my own answer to my own question के रूप में समसामयिक रूप से प्रसंस्करण, मेरे पास ऐसी स्थिति है जिससे मैं कतार में आने वाली बड़ी संख्या में घटनाओं को संसाधित कर रहा हूं। प्रत्येक घटना को ठीक उसी तरह से संभाला जाता है और प्रत्येक को अन्य सभी घटनाओं से स्वतंत्र रूप से संभाला जा सकता है।स्कैला

मेरा प्रोग्राम स्कैला समरूपता ढांचे का लाभ उठाता है और इसमें शामिल कई प्रक्रियाओं को Actor एस के रूप में मॉडलिंग किया जाता है। Actor रों प्रक्रिया के रूप में उनके संदेशों को क्रमिक रूप से, वे इस विशेष समस्या (भले ही मेरी अन्य अभिनेताओं क्रिया जो अनुक्रमिक हैं प्रदर्शन कर रहे हैं) के लिए अच्छी तरह से अनुकूल नहीं हैं।

  1. घटना का एक पूल के लिए घटनाओं भेजें: मैं "नियंत्रण" के लिए स्काला चाहते जैसा कि सभी धागा निर्माण ऐसा लगता है मैं 2 विकल्प हैं (जो मुझे लगता है कि यह पहली जगह में एक संगामिति प्रणाली होने की बात है) प्रोसेसर है, जो मैं
  2. को नियंत्रित किसी अन्य तरीके से मेरी Actor उन्हें कार्रवाई करने के लिए मिलता है समवर्ती

मैं सोचा होगा कि # 1 अभिनेताओं सबसिस्टम का उपयोग कर की बात को नकारता: कितने प्रोसेसर अभिनेताओं मैं बनाना चाहिए? एक स्पष्ट सवाल है। ये चीजें माना जाता है कि वे मुझसे छिपाए गए हैं और उपप्रणाली द्वारा हल किए गए हैं।

val eventProcessor = actor { 
    loop { 
    react { 
     case MyEvent(x) => 
     //I want to be able to handle multiple events at the same time 
     //create a new actor to handle it 
     actor { 
      //processing code here 
      process(x) 
     } 
    } 
    } 
} 

वहाँ एक बेहतर दृष्टिकोण है:

मेरा जवाब निम्न करने के लिए था? क्या यह गलत है?

संपादित करें: एक संभवतः बेहतर तरीका है:

val eventProcessor = actor { 
    loop { 
    react { 
     case MyEvent(x) => 
     //Pass processing to the underlying ForkJoin framework 
     Scheduler.execute(process(e)) 
    } 
    } 
} 
+0

हालांकि अभिनेता सीधे श्रमिकों के पूल का समर्थन नहीं करते हैं, यह क्यू इस कमी को उजागर करने में सहायक था। सभी डॉक्स मेरे पास उपलब्ध है, इस स्पष्ट रूप से उल्लेख नहीं है। – ePharaoh

उत्तर

8

यह एक और सवाल का डुप्लिकेट की तरह लगता है। तो मैं अपना जवाब डुप्लिकेट करूंगा

अभिनेता एक समय में एक संदेश को संसाधित करते हैं। एकाधिक संदेशों को संसाधित करने के लिए क्लासिक पैटर्न उपभोक्ता कलाकारों के पूल के लिए एक समन्वयक अभिनेता मोर्चा होना है। यदि आप प्रतिक्रिया का उपयोग करते हैं तो उपभोक्ता पूल बड़ा हो सकता है लेकिन अभी भी केवल एक छोटी संख्या में JVM थ्रेड का उपयोग करेगा। यहां एक उदाहरण दिया गया है जहां मैं 10 उपभोक्ताओं का एक पूल और उनके लिए एक समन्वयक बना देता हूं।

import scala.actors.Actor 
import scala.actors.Actor._ 

case class Request(sender : Actor, payload : String) 
case class Ready(sender : Actor) 
case class Result(result : String) 
case object Stop 

def consumer(n : Int) = actor { 
    loop { 
    react { 
     case Ready(sender) => 
     sender ! Ready(self) 
     case Request(sender, payload) => 
     println("request to consumer " + n + " with " + payload) 
     // some silly computation so the process takes awhile 
     val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString 
     sender ! Result(result) 
     println("consumer " + n + " is done processing " + result) 
     case Stop => exit 
    } 
    } 
} 

// a pool of 10 consumers 
val consumers = for (n <- 0 to 10) yield consumer(n) 

val coordinator = actor { 
    loop { 
    react { 
     case msg @ Request(sender, payload) => 
      consumers foreach {_ ! Ready(self)} 
      react { 
       // send the request to the first available consumer 
       case Ready(consumer) => consumer ! msg 
      } 
     case Stop => 
      consumers foreach {_ ! Stop} 
      exit 
    } 
    } 
} 

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop 
for (i <- 0 to 1000) coordinator ! Request(self, i.toString) 

यह कोड यह देखने के लिए परीक्षण करता है कि कौन सा उपभोक्ता उपलब्ध है और उस उपभोक्ता से अनुरोध भेजता है। विकल्प केवल उपभोक्ताओं को यादृच्छिक रूप से असाइन करना या राउंड रॉबिन शेड्यूलर का उपयोग करना है।

आप जो कर रहे हैं उसके आधार पर, आपको स्कैला के वायदा के साथ बेहतर सेवा दी जा सकती है। उदाहरण के लिए, यदि आप वास्तव में तो अभिनेताओं की जरूरत नहीं है ऊपर मशीनरी के सभी

के रूप में लिखा जा सकता है
import scala.actors.Futures._ 

def transform(payload : String) = {  
    val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString 
    println("transformed " + payload + " to " + result) 
    result 
} 

val results = for (i <- 0 to 1000) yield future(transform(i.toString)) 
+0

धन्यवाद - मुझे पता नहीं था कि आप उसी शेड्यूलर पर कार्यवाही कर सकते हैं जो अभिनेता ढांचे का उपयोग करता है। मुझे लगता है कि इसलिए सबसे अच्छा तरीका Scheduler.execute (प्रक्रिया (ई)) –

+0

का उपयोग करने के लिए है - हां; यह एक बहुत ही समान प्रश्न है (जिसे मैं लिंक करता हूं) लेकिन काफी समान नहीं है। पहला सवाल "अभिनेता अनुक्रमिक हैं?" जबकि दूसरा सवाल था "जैसा कि अभिनेता अनुक्रमिक हैं, मैं एक्स कैसे कर सकता हूं" –

+0

संयोग से: '0 से 10' में 11 तत्व हैं, 10 नहीं। –

3

घटनाओं सभी स्वतंत्र रूप से नियंत्रित किया जा सकता है, तो क्यों वे एक कतार पर हैं? अपने डिजाइन के बारे में और कुछ नहीं जानना, यह एक अनावश्यक कदम की तरह लगता है। आप जो कुछ भी उन घटनाओं फायरिंग कर रहा है के साथ process समारोह रचना कर सकता है, तो आप संभवतः कतार को समाप्त कर सकता है।

एक अभिनेता अनिवार्य रूप से एक समवर्ती एक कतार से सुसज्जित प्रभाव है। यदि आप एक साथ कई संदेशों को संसाधित करना चाहते हैं, तो आप वास्तव में एक अभिनेता नहीं चाहते हैं। आप बस एक सुविधाजनक कार्य (किसी भी =>()) को कुछ सुविधाजनक समय पर निष्पादन के लिए निर्धारित करना चाहते हैं।

यह कहकर, यदि आप अभिनेता पुस्तकालय में रहना चाहते हैं और यदि ईवेंट कतार आपके नियंत्रण में नहीं है तो आपका दृष्टिकोण उचित है।

Scalaz अभिनेताओं और समवर्ती प्रभावों के बीच एक अंतर बनाता है। जबकि इसकी Actor बहुत हल्का वजन है, scalaz.concurrent.Effect अभी भी हल्का है। यहां आपका कोड लगभग स्कालज़ लाइब्रेरी में अनुवादित है:

val eventProcessor = effect (x => process x) 

यह नवीनतम ट्रंक हेड के साथ है, अभी तक जारी नहीं किया गया है।

+0

धन्यवाद! वे एक "कतार" विशुद्ध रूप से क्योंकि मैं उन्हें एक अभिनेता के लिए भेज रहा हूं पर हैं और एक अभिनेता के एक कतार है, जो इसे क्रमिक रूप से संसाधित करता है। जैसा कि अभिनेता लाइब्रेरी है, मैं स्कैला में समवर्ती (*) को संभालने के लिए _supposed_ कैसे हूं, मैं इसका उपयोग करने की कोशिश कर रहा हूं। अन्यथा मैं केवल execorService.invokeAll का उपयोग करूंगा। –

+0

उपरोक्त jschen पर भी मेरी टिप्पणी देखें। मैं लंबे समय से जावा में समवर्ती कोड लिख रहा हूं और अभिनेताओं और उम का उपयोग करने के बीच सही सीमा खोजने की कोशिश कर रहा हूं, एक स्कैला कार्यक्रम में कलाकारों का उपयोग नहीं कर रहा हूं, जो समवर्ती होने की उम्मीद है। –

+1

अभिनेता रामबाण नहीं हैं, और वहाँ कुछ भी नहीं कहा गया है कि आप अगर आप स्काला में संगामिति चाहते अभिनेताओं उपयोग करना पड़ता है। यह सिर्फ एक पुस्तकालय है और, मेरी राय में, एक अत्यधिक जटिल एक। – Apocalisp

1

यह एक साधारण उपभोक्ता/निर्माता समस्या की तरह लगता है। मैं उपभोक्ताओं के एक पूल के साथ एक कतार का उपयोग करेंगे। आप java.util.concurrent का उपयोग करके कोड की कुछ पंक्तियों के साथ इसे लिख सकते हैं।

+0

स्कैला अभिनेता लाइब्रेरी का उपयोग करने का पूरा बिंदु यह है कि यह वर्तमान ऑपरेटिंग वातावरण में उपलब्ध समेकन पर आपके कोड (अभिनेताओं का उपयोग करके लिखे गए) को बेहतर तरीके से मैप कर सकता है। तो यदि स्कैला सोचती है कि उसे 4 प्रोसेसर मिल गए हैं, तो शायद यह 4 श्रमिकों के साथ अपने कलाकारों के लिए बैकिंग थ्रेड पूल बनाएगा। मुझे इस काम को निष्पादित करने के लिए अपना खुद का अलग थ्रेड पूल बनाकर कुछ भी नहीं मिला - मैं सब खत्म कर दूंगा, अनावश्यक संदर्भ स्विचिंग का एक भार है। मुझे जावा में इसे हल करने के बारे में पूरी तरह से पता है - मैं स्कैला अभिनेता लाइब्रेरी का उपयोग करके इसे हल करने के बारे में पूछ रहा हूं, इसलिए टैग। –

+0

कम से कम मुझे लगता है कि इसका उपयोग करने का पूरा बिंदु है: -/ –

+0

क्षमा करें, मुझे नहीं पता था कि यह अभिनेताओं के साथ एक अकादमिक अभ्यास था। मैंने सोचा कि आप समस्या का अच्छा समाधान चाहते हैं। "तो अगर स्कैला सोचता है कि इसमें 4 प्रोसेसर हैं, तो शायद यह 4 श्रमिकों के साथ अपने कलाकारों के लिए बैकिंग थ्रेड पूल बनाएगा।" यह java.util.concurrent का उपयोग कर कोड की दो पंक्तियां हो सकती है जिसे आप आसानी से स्कैला से उपयोग कर सकते हैं। मैं इसे हर समय jruby से उपयोग करते हैं। – jshen

1

एक अभिनेता का उद्देश्य (ठीक है, उनमें से एक) यह सुनिश्चित करना है कि अभिनेता के भीतर राज्य को केवल एक ही थ्रेड द्वारा एक्सेस किया जा सके। संदेश के प्रसंस्करण अभिनेता के भीतर किसी भी परिवर्तनशील राज्य पर निर्भर नहीं करता है, तो यह शायद अधिक उपयुक्त सिर्फ एक अनुसूचक या एक धागा पूल पर कार्रवाई करने के लिए एक कार्य प्रस्तुत करने के लिए किया जाएगा। अभिनेता द्वारा प्रदान किया जाने वाला अतिरिक्त अमूर्त वास्तव में आपके रास्ते में हो रहा है।

स्कैला.एक्टर्स में सुविधाजनक तरीके हैं। इसके लिए शेड्यूलर, या आप java.util.concurrent से निष्पादक का उपयोग कर सकते हैं।

1

अभिनेता ज्यादा धागे की तुलना में अधिक हल्के हैं, और इस तरह के एक अन्य विकल्प के रूप में Runnable वस्तुओं की तरह अभिनेता वस्तुओं का उपयोग करने के लिए है आप थ्रेड पूल में जमा करने के लिए उपयोग किया जाता है। मुख्य अंतर यह है कि आप ThreadPool बारे में चिंता करने की आवश्यकता नहीं है - थ्रेड पूल अभिनेता ढांचे द्वारा प्रबंधित किया जाता है और ज्यादातर एक विन्यास चिंता का विषय है। अपने प्रश्न से

submit(new MyEvent(x)) 

, जो मेल खाती है

को
eventProcessor ! new MyEvent(x) 

:

def submit(e: MyEvent) = actor { 
    // no loop - the actor exits immediately after processing the first message 
    react { 
    case MyEvent(x) => 
     process(x) 
    } 
} ! e // immediately send the new actor a message 

तब संदेश प्रस्तुत करने के लिए, यह कहना।

क्वाड-कोर i7 लैपटॉप पर लगभग 10 सेकंड में भेजे गए और प्राप्त किए गए 1 मिलियन संदेशों के साथ सफलतापूर्वक इस पैटर्न का परीक्षण किया।

उम्मीद है कि इससे मदद मिलती है।