2015-12-15 12 views
6

से अक्का-http वेबस्केट कनेक्शन बंद करें मेरे परिदृश्य में, एक ग्राहक "अलविदा" वेबसाईट संदेश भेजता है और मुझे सर्वर की ओर से पहले स्थापित कनेक्शन को बंद करने की आवश्यकता है।सर्वर

से अक्का-http docs:

समापन कनेक्शन अपने सर्वर तर्क से आने वाली कनेक्शन फ्लो रद्द करके संभव है (एक Sink.cancelled और एक Source.empty करने के लिए अपनी नदी के ऊपर करने के लिए अपने नीचे की ओर जोड़ने के द्वारा जैसे) । इनकमिंग कनेक्शन कनेक्शन को रद्द करके सर्वर की सॉकेट को बंद करना भी संभव है।

लेकिन यह मेरे लिए स्पष्ट नहीं है कि कि ध्यान में रखते हुए कि Sink और Source एक बार जब एक नया कनेक्शन बातचीत कर सेट कर रहे हैं करने के लिए:

(get & path("ws")) { 
    optionalHeaderValueByType[UpgradeToWebsocket]() { 
    case Some(upgrade) ⇒ 
     val connectionId = UUID() 
     complete(upgrade.handleMessagesWithSinkSource(sink, source)) 
    case None ⇒ 
     reject(ExpectedWebsocketRequestRejection) 
    } 
} 

उत्तर

3

संकेत: यह जवाब akka-stream-experimental संस्करण 2.0-M2 पर आधारित है। एपीआई अन्य संस्करणों में थोड़ा अलग हो सकता है।

import akka.stream.stage._ 

val closeClient = new PushStage[String, String] { 
    override def onPush(elem: String, ctx: Context[String]) = elem match { 
    case "goodbye" ⇒ 
     // println("Connection closed") 
     ctx.finish() 
    case msg ⇒ 
     ctx.push(msg) 
    } 
} 

प्रत्येक तत्व है कि हर तत्व यह है कि के माध्यम से चला जाता है ग्राहक के पक्ष में या (सर्वर साइड पर और सामान्य रूप में प्राप्त होता है:


एक आसान तरीका है कनेक्शन बंद करने के लिए एक PushStage का उपयोग करना है Flow) ऐसे Stage घटक से गुजरता है। अक्का में, पूर्ण अमूर्तता को GraphStage कहा जाता है, अधिक जानकारी official documentation में पाई जा सकती है।

PushStage के साथ हम ठोस मूल्य आने वाले तत्वों को उनके मूल्य के लिए देख सकते हैं और तदनुसार संदर्भ को बदल सकते हैं। ऊपर दिए गए उदाहरण में, goodbye संदेश प्राप्त होने के बाद हम संदर्भ समाप्त कर देते हैं अन्यथा हम push विधि के माध्यम से मूल्य को आगे बढ़ाते हैं।

अब, हम transform विधि के माध्यम से एक मनमाना प्रवाह के लिए closeClient घटक कनेक्ट कर सकते हैं:

val connection = Tcp().outgoingConnection(address, port) 

val flow = Flow[ByteString] 
    .via(Framing.delimiter(
     ByteString("\n"), 
     maximumFrameLength = 256, 
     allowTruncation = true)) 
    .map(_.utf8String) 
    .transform(() ⇒ closeClient) 
    .map(_ ⇒ StdIn.readLine("> ")) 
    .map(_ + "\n") 
    .map(ByteString(_)) 

connection.join(flow).run() 

प्रवाह के ऊपर एक ByteString प्राप्त करता है और एक ByteString देता है, जिसका मतलब है कि यह join के माध्यम से connection से जोड़ा जा सकता तरीका। इससे पहले कि हम closeClient पर भेज दें, प्रवाह के अंदर हम पहले बाइट को एक स्ट्रिंग में परिवर्तित करते हैं। यदि PushStage स्ट्रीम को समाप्त नहीं करता है, तो तत्व को धारा में अग्रेषित किया जाता है, जहां इसे छोड़ दिया जाता है और stdin से कुछ इनपुट द्वारा प्रतिस्थापित किया जाता है, जिसे तब तार पर वापस भेज दिया जाता है। यदि स्ट्रीम समाप्त हो जाता है, तो मंच घटक के बाद सभी आगे स्ट्रीम प्रोसेसिंग चरणों को छोड़ दिया जाएगा - स्ट्रीम अब बंद हो गई है।

+0

आपको बहुत बहुत धन्यवाद!ऐसा लगता है कि मैं कुछ ढूंढ रहा था। – Tvaroh

+0

दुर्भाग्य से यह 'इन' प्रवाह पर लागू होने पर वेबस्केट कनेक्शन (अक्का-http पर) बंद नहीं करता है। हो सकता है कि इसे 'आउट' स्रोत को 'फिनिश' होने की भी आवश्यकता हो। – Tvaroh

+0

हां, अक्का-http कनेक्शन समाप्त कर देता है जब वे दोनों "समाप्त" होते हैं: 'प्रवाह' - इसे 'समाप्त' करके 'स्रोत' (आउटपुट के) - इसके अंतर्निहित अभिनेता को रोककर। – Tvaroh

2

यह

package com.trackabus.misc 

import akka.stream.stage._ 
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} 

// terminates the flow based on a predicate for a message of type T 
// if forwardTerminatingMessage is set the message is passed along the flow 
// before termination 
// if terminate is true the stage is failed, if it is false the stage is completed 
class TerminateFlowStage[T](
    pred: T => Boolean, 
    forwardTerminatingMessage: Boolean = false, 
    terminate: Boolean = true) 
    extends GraphStage[FlowShape[T, T]] 
{ 
    val in = Inlet[T]("TerminateFlowStage.in") 
    val out = Outlet[T]("TerminateFlowStage.out") 
    override val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 

     setHandlers(in, out, new InHandler with OutHandler { 
     override def onPull(): Unit = { pull(in) } 

     override def onPush(): Unit = { 
      val chunk = grab(in) 

      if (pred(chunk)) { 
      if (forwardTerminatingMessage) 
       push(out, chunk) 
      if (terminate) 
       failStage(new RuntimeException("Flow terminated by TerminateFlowStage")) 
      else 
       completeStage() 
      } 
      else 
      push(out, chunk) 
     } 
     }) 
    } 
} 

यह अपने मंच को परिभाषित का उपयोग करने के

val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe]) 

और फिर भाग के रूप में शामिल वर्तमान (2.4.14) अक्का धारा के संस्करण में पालन करके पूरा किया जा सकता प्रवाह

.via(termOnKillMe)