2017-11-18 26 views
7

क्या निम्नलिखित करने के लिए एक अक्का धारा संयोजक है (या उस प्रभाव के लिए कुछ)? (चलिए अभी के लिए यह and कहते हैं।)दो प्रवाहों को साइड-बाय-साइड कैसे लिखें?

(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat] 

अर्थ विज्ञान होगा कि जो कुछ भी स्रोत, उसके तत्वों दोनों Flow रों को दे दिया जाएगा, और उनके outputs एक नया Flow एक टपल के रूप में जोड़ा जा होगा। (उन लोगों से श्रेणी सिद्धांत तीर के साथ परिचित कार्यात्मक प्रोग्रामिंग के स्वाद के लिए, मैं &&& की तरह कुछ के लिए देख रहा हूँ।)

पुस्तकालय में दो combinators कि अर्थात् प्रासंगिक देखा, zip और alsoTo रहे हैं। लेकिन पूर्व SourceShape, और बाद में, SinkShape स्वीकार करता है। न तो GraphShape स्वीकार करेगा। यह एक केस क्यों है?

मेरे उपयोग के मामले निम्नलिखित की तरह कुछ है:

someSource 
    .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs) 
    .runWith(someSink) 

यह काम करता है, लेकिन मैं एक के लिए देख रहा हूँ:

someSource 
    .via(someFlowThatReturnsUnit.and(Flow.apply)) 
    .runWith(someSink) 

.and की तरह कुछ को खोजने के लिए असफल, मैं अपने मूल Flow इस तरह संशोधित क्लीनर, अधिक रचनात्मक समाधान।

+1

एक प्रवाह सख्ती से 1 नहीं है। (आप ग्राफडीएसएल का उपयोग कर सकते हैं और ब्रॉडकास्ट + मर्ज का उपयोग कर सकते हैं) –

उत्तर

6

सूचना

विक्टर Klang के रूप में टिप्पणी में कहा: एक Tuple2[O,O2] में ज़िप करने केवल व्यवहार्य है, जब यह ज्ञात है कि दोनों प्रवाह, flow1 & flow2, 1: 1 इनपुट तत्व गिनती और उत्पादन के संबंध में तत्व गिनती

ग्राफ आधारित समाधान

एक टपल निर्माण एक Graph के अंदर बनाया जा सकता है। वास्तव में, अपने प्रश्न लगभग पूरी तरह से परिचयात्मक उदाहरण मेल खाता है:

enter image description here

कड़ी में नमूना कोड का विस्तार, आप उपयोग कर सकते हैं Broadcast और Zip

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 
    val in = Source(1 to 10) 
    val out = Sink.ignore 

    val bcast = builder.add(Broadcast[Int](2)) 

    val merge = builder.add(Zip[Int, Int]()) //different than link 

    val f1, f2, f4 = Flow[Int].map(_ + 10) 

    val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link 

    in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out 
       bcast ~> f4 ~> merge 
    ClosedShape 
})//end RunnableGraph.fromGraph 

कुछ हद तक Hacky स्ट्रीम समाधान

यदि आप शुद्ध स्ट्रीम समाधान की तलाश में हैं, तो मध्यवर्ती धाराओं का उपयोग करना संभव है लेकिन Mat रखरखाव नहीं किया जाएगा और यह प्रत्येक इनपुट तत्व के लिए 2 धाराओं के भौतिकीकरण शामिल है:

def andFlows[I, O, O2] (maxConcurrentSreams : Int) 
         (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
         (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[O] = Source 
          .single(i) 
          .via(flow1) 
          .to(Sink.head[O]) 
          .run() 

    val o2 : Future[O2] = Source 
          .single(i) 
          .via(flow2) 
          .to(Sink.head[O2]) 
          .run() 

    o zip o2 
    }//end Flow[I].mapAsync 

जेनेरिक ज़िप करना

आप इस ज़िप करने जेनेरिक, सबसे प्रवाह के लिए, तो उत्पादन प्रकार बनाना चाहते हैं होगा (Seq[O], Seq[O2]) होना चाहिए।इस प्रकार का Sink.seq बजाय Sink.head का उपयोग करके उपरोक्त andFlows समारोह में द्वारा उत्पन्न किया जा सकता है: 1 इसलिए इस तरह एक सामान्य Combinator (1 1 उत्पादन के लिए इनपुट) मुश्किल हो जाएगा:

def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int) 
           (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
           (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[Seq[O]] = Source 
           .single(i) 
           .via(flow1) 
           .to(Sink.seq[O]) 
           .run() 

    val o2 : Future[Seq[O2]] = Source 
           .single(i) 
           .via(flow2) 
           .to(Sink.seq[O2]) 
           .run() 

    o zip o2 
    }//end Flow[I].mapAsync 
संबंधित मुद्दे