के अंदर भविष्य से बैकप्रेसर बनाएं, मैं सामान्य रूप से अक्का धाराओं और धाराओं में नया हूं इसलिए मैंने एक वैचारिक स्तर पर पूरी तरह से गलत समझा होगा, लेकिन क्या भविष्य में हल होने तक मैं बैकप्रेसर बना सकता हूं? अनिवार्य रूप से मुझे क्या करना चाहते हैं इस तरह है:एक अक्का धारा
object Parser {
def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}
val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.map(values => doAsyncOp(values))
.runWith(Sink.seq)
def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???
बाइट्स एक फ़ाइल से पढ़ा जाता है और पार्सर, जो ExampleObject
के Seq
रों का उत्सर्जन करता है स्ट्रीम, और उन एक async आपरेशन है कि एक Future
रिटर्न के लिए स्ट्रीम किए जाते हैं। मैं इसे बनाना चाहता हूं कि जब तक Future
हल नहीं हो जाता है, तब तक बाकी स्ट्रीम को बैकप्रेस किया जाता है, फिर भविष्य को हल करने के बाद फिर से शुरू होता है, Seq[ExampleObject]
से doAsyncOp
तक गुजरता है, जो बैकप्रेसर को फिर से शुरू करता है।
अभी मैं इस के साथ काम मिल गया है:
Await.result(doAsyncOp(values), 10 seconds)
लेकिन मैं समझता हूँ कि इस पूरे धागा ऊपर ताले और बुरे है। क्या इसके चारों ओर कोई बेहतर तरीका है?
यदि यह मदद करता है, तो बड़ी तस्वीर यह है कि मैं जौन के साथ एक बहुत बड़ी जेएसओएन फ़ाइल (स्मृति में फिट करने के लिए बहुत बड़ा) पार्स करने के लिए कोशिश कर रहा हूं, फिर ऑलस्टिक खोज में ऑब्जेक्ट्स को अनुक्रमित करने के लिए 'पार्स किया गया - लोचदार खोज में 50 लंबित परिचालनों की कतार है, अगर यह बहती है तो यह नई वस्तुओं को अस्वीकार करना शुरू कर देती है।