2016-11-23 9 views
6

कहो को नियंत्रित करता है कि मैं दो स्रोतों है:कैसे एक एक अक्का स्ट्रीम के प्रवाह को एक और धारा के आधार पर

val ticks = Source(1 to 10) 
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable]) 

मैं अक्का स्ट्रीम में एक Graph[...] प्रसंस्करण कदम है कि वर्तमान मूल्य के आधार पर बनाना चाहते हैं ticks धाराओं की धाराओं में जितना संभव हो उतना खपत करता है।

(1, None) 
(2, None) 
(3, Some(Seq(3))) 
(4, Some(Seq(4, 4))) 
(5, None) 
(6, None) 
(7, Some(Seq(7))) 
(8, Some(Seq(8,8,8,8))) 
(9, Some(Seq(9))) 
(10, None) 

आप इस व्यवहार कैसे लागू होगा: तो उदाहरण के लिए, जब मानों मैं सभी तत्वों को दूसरा स्रोत में से मेल लौटना चाहते से मेल खाते हैं, अन्यथा की तरह एक उत्पादन में जिसके परिणामस्वरूप बजाते रहते हैं?

उत्तर

1

मैं तुम्हें इस विषय पर अक्का स्ट्रीम प्रलेखन पर एक नज़र डालें की सलाह देते हैं:

final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] { 

val in = Inlet[E]("AccumulateWhileUnchanged.in") 
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out") 

override def shape = FlowShape(in, out) 
} 

: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html

साइट के अनुसार, अगर आप इस तरह एक GraphStage लागू कर सकते हैं इस विषय पर एक ब्लॉग पोस्ट भी है: http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

उम्मीद है कि यह मदद करता है :)

+0

क्या आप कृपया विनिर्देश दे सकते हैं? आपका उत्तर बस बताता है कि एक कस्टम चरण लिखना संभव है, यह दिए गए प्रश्न को हल करने के लिए एक मंच प्रदान नहीं करता है ... –

संबंधित मुद्दे