हस्ताक्षर
अंतर सबसे अच्छा signatures में प्रकाश डाला गया है: Flow.map
एक समारोह है कि एक प्रकार रिटर्न T
Flow.mapAsync
एक समारोह है कि एक प्रकार Future[T]
रिटर्न में ले जाता है, जबकि में ले जाता है।
व्यावहारिक उदाहरण
उदाहरण के लिए, मान लीजिए कि हम एक समारोह है जो एक प्रयोक्ता आईडी के आधार पर एक उपयोगकर्ता का पूरा नाम के लिए एक डेटाबेस क्वेरी करता है:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
एक UserID
मूल्यों की Source
को देखते हुए हम डाटाबेस से पूछने के लिए बस स्ट्रीम के भीतर Flow.map
का उपयोग कर सकते हैं और कंसोल पर पूरा नाम प्रिंट कर सकते हैं:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
इस कार्यान्वयन की एक सीमा यह है कि यह स्ट्रीम केवल एक समय में 1 डीबी क्वेरी करेगी। यह एक "बाधा" होगी और संभवतः हमारी स्ट्रीम में अधिकतम थ्रूपुट को रोक देगा। प्रदर्शन में सुधार करने के लिए हम बस एक Future
के अंदर databaseLookup
लपेटकर द्वारा संगामिति जोड़ सकते हैं:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
इस सरलीकृत संगामिति परिशिष्ट के साथ समस्या यह है कि हम प्रभावी रूप से backpressure सफाया कर दिया है। चूंकि सिंक सिर्फ भविष्य में खींच रहा है और foreach println
जोड़ रहा है, जो डाटाबेस प्रश्नों की तुलना में अपेक्षाकृत तेज़ है, धारा लगातार स्रोत की मांग को प्रसारित करेगी और अधिक वायदा बंद कर देगी। इसका मतलब है कि databaseLookup
की संख्या के साथ चलने की कोई सीमा नहीं है, जो अंततः डेटाबेस को स्वैप कर सकता है।
Flow.mapAsync
बचाव के लिए; हम समवर्ती db लुकअप हो सकता है, जबकि एक ही समय में एक साथ लुकअप की संख्या कैपिंग:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
भी ध्यान दें कि Sink.foreach
सरल हो गया है, यह अब एक Future[FullName]
में ले जाता है, लेकिन सिर्फ एक के बजाय FullName
।
अक्रमित Async मानचित्र
आप FullNames आप Flow.mapAsyncUnordered
उपयोग कर सकते हैं करने के लिए UserIDs के आदेश को बनाए रखने के बारे में परवाह नहीं करते हैं। यह उपयोगी होगा यदि आप जिनके बारे में परवाह है, वे सभी पूर्ण नाम प्रिंट कर रहे थे लेकिन इस पर ध्यान नहीं दिया कि वे कंसोल पर किस आदेश पर पहुंचे थे।
क्या 'mapAsync' उस विशिष्ट चरण में एसिंक सीमा लागू करने के समान है? दस्तावेज़ीकरण के अनुसार, एसिंक सीमा को चिह्नित करने से अभिनेता में प्रत्येक चरण चल जाएगा, बस यह सोचकर कि यह वही है। – jarvis11