संकेत: यह जवाब akka-stream-experimental
संस्करण 2.0-M2
पर आधारित है। एपीआई अन्य संस्करणों में थोड़ा अलग हो सकता है।
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
प्रत्येक तत्व है कि हर तत्व यह है कि के माध्यम से चला जाता है ग्राहक के पक्ष में या (सर्वर साइड पर और सामान्य रूप में प्राप्त होता है:
एक आसान तरीका है कनेक्शन बंद करने के लिए एक PushStage
का उपयोग करना है Flow
) ऐसे Stage
घटक से गुजरता है। अक्का में, पूर्ण अमूर्तता को GraphStage
कहा जाता है, अधिक जानकारी official documentation में पाई जा सकती है।
PushStage
के साथ हम ठोस मूल्य आने वाले तत्वों को उनके मूल्य के लिए देख सकते हैं और तदनुसार संदर्भ को बदल सकते हैं। ऊपर दिए गए उदाहरण में, goodbye
संदेश प्राप्त होने के बाद हम संदर्भ समाप्त कर देते हैं अन्यथा हम push
विधि के माध्यम से मूल्य को आगे बढ़ाते हैं।
अब, हम transform
विधि के माध्यम से एक मनमाना प्रवाह के लिए closeClient
घटक कनेक्ट कर सकते हैं:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
प्रवाह के ऊपर एक ByteString
प्राप्त करता है और एक ByteString
देता है, जिसका मतलब है कि यह join
के माध्यम से connection
से जोड़ा जा सकता तरीका। इससे पहले कि हम closeClient
पर भेज दें, प्रवाह के अंदर हम पहले बाइट को एक स्ट्रिंग में परिवर्तित करते हैं। यदि PushStage
स्ट्रीम को समाप्त नहीं करता है, तो तत्व को धारा में अग्रेषित किया जाता है, जहां इसे छोड़ दिया जाता है और stdin से कुछ इनपुट द्वारा प्रतिस्थापित किया जाता है, जिसे तब तार पर वापस भेज दिया जाता है। यदि स्ट्रीम समाप्त हो जाता है, तो मंच घटक के बाद सभी आगे स्ट्रीम प्रोसेसिंग चरणों को छोड़ दिया जाएगा - स्ट्रीम अब बंद हो गई है।
आपको बहुत बहुत धन्यवाद!ऐसा लगता है कि मैं कुछ ढूंढ रहा था। – Tvaroh
दुर्भाग्य से यह 'इन' प्रवाह पर लागू होने पर वेबस्केट कनेक्शन (अक्का-http पर) बंद नहीं करता है। हो सकता है कि इसे 'आउट' स्रोत को 'फिनिश' होने की भी आवश्यकता हो। – Tvaroh
हां, अक्का-http कनेक्शन समाप्त कर देता है जब वे दोनों "समाप्त" होते हैं: 'प्रवाह' - इसे 'समाप्त' करके 'स्रोत' (आउटपुट के) - इसके अंतर्निहित अभिनेता को रोककर। – Tvaroh