2016-10-07 9 views
6

के अंदर भविष्य से बैकप्रेसर बनाएं, मैं सामान्य रूप से अक्का धाराओं और धाराओं में नया हूं इसलिए मैंने एक वैचारिक स्तर पर पूरी तरह से गलत समझा होगा, लेकिन क्या भविष्य में हल होने तक मैं बैकप्रेसर बना सकता हूं? अनिवार्य रूप से मुझे क्या करना चाहते हैं इस तरह है:एक अक्का धारा

object Parser { 
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ??? 
} 

val futures = FileIO.fromPath(path) 
    .map(st => Parser.parseBytesToSeq(st.toByteBuffer)) 
    .batch(1000, x => x)(_ ++ _) 
    .map(values => doAsyncOp(values)) 
    .runWith(Sink.seq) 

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ??? 

बाइट्स एक फ़ाइल से पढ़ा जाता है और पार्सर, जो ExampleObject के Seq रों का उत्सर्जन करता है स्ट्रीम, और उन एक async आपरेशन है कि एक Future रिटर्न के लिए स्ट्रीम किए जाते हैं। मैं इसे बनाना चाहता हूं कि जब तक Future हल नहीं हो जाता है, तब तक बाकी स्ट्रीम को बैकप्रेस किया जाता है, फिर भविष्य को हल करने के बाद फिर से शुरू होता है, Seq[ExampleObject] से doAsyncOp तक गुजरता है, जो बैकप्रेसर को फिर से शुरू करता है।

अभी मैं इस के साथ काम मिल गया है:

Await.result(doAsyncOp(values), 10 seconds) 

लेकिन मैं समझता हूँ कि इस पूरे धागा ऊपर ताले और बुरे है। क्या इसके चारों ओर कोई बेहतर तरीका है?

यदि यह मदद करता है, तो बड़ी तस्वीर यह है कि मैं जौन के साथ एक बहुत बड़ी जेएसओएन फ़ाइल (स्मृति में फिट करने के लिए बहुत बड़ा) पार्स करने के लिए कोशिश कर रहा हूं, फिर ऑलस्टिक खोज में ऑब्जेक्ट्स को अनुक्रमित करने के लिए 'पार्स किया गया - लोचदार खोज में 50 लंबित परिचालनों की कतार है, अगर यह बहती है तो यह नई वस्तुओं को अस्वीकार करना शुरू कर देती है।

उत्तर

9

यह काफी आसान है। आप mapAync :)

val futures = FileIO.fromPath(path) 
    .map(st => Parser.parseBytesToSeq(st.toByteBuffer)) 
    .batch(1000, x => x)(_ ++ _) 
    .mapAsync(4)(values => doAsyncOp(values)) 
    .runWith(Sink.seq) 

उपयोग करने के लिए जहां 4 समानांतरवाद का स्तर है की जरूरत है।