2016-02-02 21 views
14

क्या कोई मुझे मानचित्र और मानचित्र के बीच अंतर बता सकता है। एसिंक w.r.t AKKA स्ट्रीम? In the documentation यह कहा जाता है किमानचित्र और मानचित्र के बीच अंतर Async

स्ट्रीम परिवर्तनों और दुष्प्रभावों बाहरी गैर धारा आधारित सेवाओं को शामिल mapAsync साथ किया जा सकता या mapAsyncUnordered

क्यों नहीं कर सकते हम बस हमें यहाँ नक्शा? मुझे लगता है कि प्रवाह, स्रोत, सिंक सभी प्रकृति में मोनैडिक होंगे और इस प्रकार नक्शा ठीक काम करना चाहिए w.r.t इन की प्रकृति में देरी?

उत्तर

31

हस्ताक्षर

अंतर सबसे अच्छा signatures में प्रकाश डाला गया है: Flow.map एक समारोह है कि एक प्रकार रिटर्न TFlow.mapAsync एक समारोह है कि एक प्रकार Future[T] रिटर्न में ले जाता है, जबकि में ले जाता है।

व्यावहारिक उदाहरण

उदाहरण के लिए, मान लीजिए कि हम एक समारोह है जो एक प्रयोक्ता आईडी के आधार पर एक उपयोगकर्ता का पूरा नाम के लिए एक डेटाबेस क्वेरी करता है:

type UserID = String 
type FullName = String 

val databaseLookup : UserID => FullName = ??? //implementation unimportant 

एक UserID मूल्यों की Source को देखते हुए हम डाटाबेस से पूछने के लिए बस स्ट्रीम के भीतर Flow.map का उपयोग कर सकते हैं और कंसोल पर पूरा नाम प्रिंट कर सकते हैं:

val userIDSource : Source[UserID, _] = ??? 

val stream = 
    userIDSource.via(Flow[UserID].map(databaseLookup)) 
       .to(Sink.foreach[FullName](println)) 
       .run() 

इस कार्यान्वयन की एक सीमा यह है कि यह स्ट्रीम केवल एक समय में 1 डीबी क्वेरी करेगी। यह एक "बाधा" होगी और संभवतः हमारी स्ट्रीम में अधिकतम थ्रूपुट को रोक देगा। प्रदर्शन में सुधार करने के लिए हम बस एक Future के अंदर databaseLookup लपेटकर द्वारा संगामिति जोड़ सकते हैं:

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
    Future { databaseLookup(userID) } 

val concurrentStream = 
    userIDSource.via(Flow[UserID].map(concurrentDBLookup)) 
       .to(Sink.foreach[Future[FullName]](_ foreach println)) 
       .run() 

इस सरलीकृत संगामिति परिशिष्ट के साथ समस्या यह है कि हम प्रभावी रूप से backpressure सफाया कर दिया है। चूंकि सिंक सिर्फ भविष्य में खींच रहा है और foreach println जोड़ रहा है, जो डाटाबेस प्रश्नों की तुलना में अपेक्षाकृत तेज़ है, धारा लगातार स्रोत की मांग को प्रसारित करेगी और अधिक वायदा बंद कर देगी। इसका मतलब है कि databaseLookup की संख्या के साथ चलने की कोई सीमा नहीं है, जो अंततः डेटाबेस को स्वैप कर सकता है।

Flow.mapAsync बचाव के लिए; हम समवर्ती db लुकअप हो सकता है, जबकि एक ही समय में एक साथ लुकअप की संख्या कैपिंग:

val maxLookupCount = 10 

val maxLookupConcurrentStream = 
    userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup)) 
       .to(Sink.foreach[FullName](println)) 
       .run() 

भी ध्यान दें कि Sink.foreach सरल हो गया है, यह अब एक Future[FullName] में ले जाता है, लेकिन सिर्फ एक के बजाय FullName

अक्रमित Async मानचित्र

आप FullNames आप Flow.mapAsyncUnordered उपयोग कर सकते हैं करने के लिए UserIDs के आदेश को बनाए रखने के बारे में परवाह नहीं करते हैं। यह उपयोगी होगा यदि आप जिनके बारे में परवाह है, वे सभी पूर्ण नाम प्रिंट कर रहे थे लेकिन इस पर ध्यान नहीं दिया कि वे कंसोल पर किस आदेश पर पहुंचे थे।

+0

क्या 'mapAsync' उस विशिष्ट चरण में एसिंक सीमा लागू करने के समान है? दस्तावेज़ीकरण के अनुसार, एसिंक सीमा को चिह्नित करने से अभिनेता में प्रत्येक चरण चल जाएगा, बस यह सोचकर कि यह वही है। – jarvis11

संबंधित मुद्दे