सूचना
विक्टर Klang के रूप में टिप्पणी में कहा: एक Tuple2[O,O2]
में ज़िप करने केवल व्यवहार्य है, जब यह ज्ञात है कि दोनों प्रवाह, flow1
& flow2
, 1: 1 इनपुट तत्व गिनती और उत्पादन के संबंध में तत्व गिनती
ग्राफ आधारित समाधान
एक टपल निर्माण एक Graph के अंदर बनाया जा सकता है। वास्तव में, अपने प्रश्न लगभग पूरी तरह से परिचयात्मक उदाहरण मेल खाता है:
कड़ी में नमूना कोड का विस्तार, आप उपयोग कर सकते हैं Broadcast और Zip
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Zip[Int, Int]()) //different than link
val f1, f2, f4 = Flow[Int].map(_ + 10)
val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})//end RunnableGraph.fromGraph
कुछ हद तक Hacky स्ट्रीम समाधान
यदि आप शुद्ध स्ट्रीम समाधान की तलाश में हैं, तो मध्यवर्ती धाराओं का उपयोग करना संभव है लेकिन Mat
रखरखाव नहीं किया जाएगा और यह प्रत्येक इनपुट तत्व के लिए 2 धाराओं के भौतिकीकरण शामिल है:
def andFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[O] = Source
.single(i)
.via(flow1)
.to(Sink.head[O])
.run()
val o2 : Future[O2] = Source
.single(i)
.via(flow2)
.to(Sink.head[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
जेनेरिक ज़िप करना
आप इस ज़िप करने जेनेरिक, सबसे प्रवाह के लिए, तो उत्पादन प्रकार बनाना चाहते हैं होगा (Seq[O], Seq[O2])
होना चाहिए।इस प्रकार का Sink.seq
बजाय Sink.head
का उपयोग करके उपरोक्त andFlows
समारोह में द्वारा उत्पन्न किया जा सकता है: 1 इसलिए इस तरह एक सामान्य Combinator (1 1 उत्पादन के लिए इनपुट) मुश्किल हो जाएगा:
def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int)
(flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
(implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] =
Flow[I].mapAsync(maxConcurrentStreams){ i =>
val o : Future[Seq[O]] = Source
.single(i)
.via(flow1)
.to(Sink.seq[O])
.run()
val o2 : Future[Seq[O2]] = Source
.single(i)
.via(flow2)
.to(Sink.seq[O2])
.run()
o zip o2
}//end Flow[I].mapAsync
एक प्रवाह सख्ती से 1 नहीं है। (आप ग्राफडीएसएल का उपयोग कर सकते हैं और ब्रॉडकास्ट + मर्ज का उपयोग कर सकते हैं) –