2010-03-22 7 views
5

का उपयोग कर साझा निर्भरता के साथ लूप के लिए कैसे लिखूं? हमारे पास कुछ कोड है जो तेजी से चलने की आवश्यकता है। यह पहले से ही प्रोफाइल है इसलिए हम कई धागे का उपयोग करना चाहते हैं। आम तौर पर मैं मेमोरी कतार में एक सेट अप करता हूं, और कतार की नौकरियां लेने और परिणामों की गणना करने वाले कई धागे हैं। साझा डेटा के लिए मैं एक ConcurrentHashMap या इसी तरह का उपयोग करता हूं।मैं अभिनेताओं

मैं वास्तव में उस मार्ग को फिर से नीचे नहीं जाना चाहता हूं। अभिनेताओं का उपयोग करके मैंने जो पढ़ा है, उसके परिणामस्वरूप क्लीनर कोड होगा और यदि मैं 1 से अधिक जेवीएम में माइक्रेट करने वाला एक्क्का का उपयोग करना आसान होना चाहिए। क्या यह सच है?

हालांकि, मुझे नहीं पता कि अभिनेताओं में कैसे सोचना है, इसलिए मुझे यकीन नहीं है कि कहां से शुरू करना है।

case class Trade(price:Double, volume:Int, stock:String) { 
    def value(priceCalculator:PriceCalculator) = 
    (priceCalculator.priceFor(stock)-> price)*volume 
} 
class PriceCalculator { 
    def priceFor(stock:String) = { 
    Thread.sleep(20)//a slow operation which can be cached 
    50.0 
    } 
} 
object ValueTrades { 

    def valueAll(trades:List[Trade], 
     priceCalculator:PriceCalculator):List[(Trade,Double)] = { 
    trades.map { trade => (trade,trade.value(priceCalculator)) } 
    } 

    def main(args:Array[String]) { 
    val trades = List(
     Trade(30.5, 10, "Foo"), 
     Trade(30.5, 20, "Foo") 
     //usually much longer 
    ) 
    val priceCalculator = new PriceCalculator 
    val values = valueAll(trades, priceCalculator) 
    } 

} 

मैं अगर यह सराहनीय होगा अभिनेताओं उपयोग करने का सुझाव सकता है कि यह कैसे अभिनेताओं को नक्शे पर होगा अनुभव के साथ किसी को:

समस्या यहाँ का एक बेहतर विचार कुछ नमूना कोड दे रहा है।

उत्तर

2

सरल समांतरता के लिए, जहां मैं प्रक्रिया के लिए काम का एक गुच्छा फेंक देता हूं और उसके बाद वापस आने के लिए प्रतीक्षा करता हूं, मैं एक वायदा पैटर्न का उपयोग करना चाहता हूं। काम का बोझ प्रति एक - -

class ActorExample { 
    import actors._ 
    import Actor._ 
    class Worker(val id: Int) extends Actor { 
    def busywork(i0: Int, i1: Int) = { 
     var sum,i = i0 
     while (i < i1) { 
     i += 1 
     sum += 42*i 
     } 
     sum 
    } 
    def act() { loop { react { 
     case (i0:Int,i1:Int) => sender ! busywork(i0,i1) 
     case None => exit() 
    }}} 
    } 

    val workforce = (1 to 4).map(i => new Worker(i)).toList 

    def parallelFourSums = { 
    workforce.foreach(_.start()) 
    val futures = workforce.map(w => w !! ((w.id,1000000000))); 
    val computed = futures.map(f => f() match { 
     case i:Int => i 
     case _ => throw new IllegalArgumentException("I wanted an int!") 
    }) 
    workforce.foreach(_ ! None) 
    computed 
    } 

    def serialFourSums = { 
    val solo = workforce.head 
    workforce.map(w => solo.busywork(w.id,1000000000)) 
    } 

    def timed(f: => List[Int]) = { 
    val t0 = System.nanoTime 
    val result = f 
    val t1 = System.nanoTime 
    (result, t1-t0) 
    } 

    def go { 
    val serial = timed(serialFourSums) 
    val parallel = timed(parallelFourSums) 
    println("Serial result: " + serial._1) 
    println("Parallel result:" + parallel._1) 
    printf("Serial took %.3f seconds\n",serial._2*1e-9) 
    printf("Parallel took %.3f seconds\n",parallel._2*1e-9) 
    } 
} 

असल में, विचार कार्यकर्ताओं का एक संग्रह बनाने के लिए है और फिर से उन पर सभी डेटा फेंक !! जो तुरंत एक भविष्य देता है। जब आप भविष्य को पढ़ने की कोशिश करते हैं, प्रेषक तब तक ब्लॉक करता है जब तक कार्यकर्ता वास्तव में डेटा के साथ नहीं किया जाता।

आप उपरोक्त ताकि PriceCalculator बढ़ाया Actor बजाय, और valueAll डेटा की वापसी समन्वित पुनर्लेखन सकता है।

ध्यान दें कि आपको आस-पास के अपरिवर्तनीय डेटा को सावधान रहना होगा।

वैसे भी, मशीन पर मैं, से अगर आप चलाने के इस टाइप कर रहा हूँ कि आप ऊपर मिलती है:

scala> (new ActorExample).go 
Serial result: List(-1629056553, -1629056636, -1629056761, -1629056928) 
Parallel result:List(-1629056553, -1629056636, -1629056761, -1629056928) 
Serial took 1.532 seconds 
Parallel took 0.443 seconds 

(जाहिर है मैं कम से कम चार कोर है, समानांतर समय बदलता रहता है बल्कि एक सा है जिस पर निर्भर करता है कार्यकर्ता को प्रोसेसर और मशीन पर और क्या चल रहा है।)

+0

धन्यवाद, यह बहुत उपयोगी है और नौकरियों बिट के वितरण बताते हैं। निर्भरता मुद्दे के बारे में क्या। उदाहरण के लिए, यदि व्यस्त कार्य कम्प्यूटेशनल रूप से महंगी गणना पर निर्भर करता है जहां परिणाम अन्य कार्यों द्वारा साझा किए जा सकते हैं। यह एक नए अभिनेता वर्ग द्वारा किया जा सकता है लेकिन फिर श्रमिक अभिनेता अवरुद्ध हो जाएंगे। –

+0

यदि आप साझा परिणाम चाहते हैं, तो आपको अभी भी 'ConcurrentHashMap' और उसके दोस्तों का उपयोग करना चाहिए। अभिनेताओं का उद्देश्य डेटा साझा किए बिना उपयोग किया जाना है (अपरिवर्तनीय संदेशों को छोड़कर)। आप उन्हें अन्य तरीकों से भी उपयोग कर सकते हैं, लेकिन यदि आप करते हैं, तो आप अन्य दृष्टिकोणों के सभी नुकसान को अपनाते हैं। –

+0

@ थॉमस मुझे कोई कारण नहीं दिखता कि यह साझा परिणाम अभिनेताओं के साथ क्यों नहीं किया जा सकता है। आप एक अभिनेता के संदर्भ को पास करते हैं जो परिणाम प्राप्त करने के लिए परिणाम (और इसे बाद में कैश) की गणना करने के बारे में जानता है, और परिणाम प्राप्त करने के लिए मजदूर सिर्फ '!?' का उपयोग करते हैं। जब तक कि अभिनेता खत्म नहीं हो जाता तब तक वे ब्लॉक करेंगे। इसके बाद यह बदले में प्रत्येक कॉलर को परिणाम देगा। –

3

यह महंगा गणनाओं के लिए साझा परिणामों पर मेरी टिप्पणी का एक पूरक है। संदेश यह है:

import scala.actors._ 
import Actor._ 
import Futures._ 

case class PriceFor(stock: String) // Ask for result 

// The following could be an "object" as well, if it's supposed to be singleton 
class PriceCalculator extends Actor { 
    val map = new scala.collection.mutable.HashMap[String, Future[Double]]() 
    def act = loop { 
    react { 
     case PriceFor(stock) => reply(map getOrElseUpdate (stock, future { 
     Thread.sleep(2000) // a slow operation 
     50.0 
     })) 
    } 
    } 
} 

यहाँ एक उपयोग उदाहरण है:

scala> val pc = new PriceCalculator; pc.start 
pc: PriceCalculator = [email protected] 

scala> class Test(stock: String) extends Actor { 
    | def act = { 
    |  println(System.currentTimeMillis().toString+": Asking for stock "+stock) 
    |  val f = (pc !? PriceFor(stock)).asInstanceOf[Future[Double]] 
    |  println(System.currentTimeMillis().toString+": Got the future back") 
    |  val res = f.apply() // this blocks until the result is ready 
    |  println(System.currentTimeMillis().toString+": Value: "+res) 
    | } 
    | } 
defined class Test 

scala> List("abc", "def", "abc").map(new Test(_)).map(_.start) 
1269310737461: Asking for stock abc 
res37: List[scala.actors.Actor] = List([email protected], [email protected], [email protected]) 
1269310737461: Asking for stock abc 
1269310737461: Asking for stock def 
1269310737464: Got the future back 

scala> 1269310737462: Got the future back 
1269310737465: Got the future back 
1269310739462: Value: 50.0 
1269310739462: Value: 50.0 
1269310739465: Value: 50.0 


scala> new Test("abc").start // Should return instantly 
1269310755364: Asking for stock abc 
res38: scala.actors.Actor = [email protected] 
1269310755365: Got the future back 

scala> 1269310755367: Value: 50.0 
संबंधित मुद्दे