2017-03-25 12 views
5

निम्नलिखित कार्यान्वयन की बात कर में उल्लेख किया है:स्रोत कतार कार्यान्वयन थ्रेड का उपयोग कर अक्का-http में कनेक्शन पूलिंग सुरक्षित है?

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io") 
val queue = 
    Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew) 
    .via(poolClientFlow) 
    .toMat(Sink.foreach({ 
     case ((Success(resp), p)) => p.success(resp) 
     case ((Failure(e), p)) => p.failure(e) 
    }))(Keep.left) 
    .run() 

इसे सुरक्षित थ्रेड है एक से अधिक थ्रेड से कतार HTTP अनुरोध पेशकश करने के लिए? यदि यह नहीं है, तो ऐसी आवश्यकता को लागू करने का सबसे अच्छा तरीका क्या है? शायद एक समर्पित अभिनेता का उपयोग कर?

उत्तर

1

नहीं, यह सुरक्षित थ्रेड नहीं है, api doc के अनुसार: SourceQueue that current source is materialized to is for single thread usage only.

एक समर्पित अभिनेता ठीक काम करेगा लेकिन, अगर आप कर सकते हैं, Source.queue के बजाय Source.actorRef (doc link) का उपयोग आसान होगा।

सामान्यतः, Source.actorRef का नकारात्मक पक्ष बैकप्रेसर की कमी है, लेकिन जैसा कि आप OverflowStrategy.dropNew का उपयोग करते हैं, यह स्पष्ट है कि आप बैकप्रेसर की अपेक्षा नहीं करते हैं। इस प्रकार, आप Source.actorRef का उपयोग कर एक ही व्यवहार प्राप्त कर सकते हैं।

+0

आप अपनी टिप्पणी के लिए बहुत बहुत धन्यवाद। मुझे बफर ओवरफ़्लो के मामले में किसी प्रकार की विफलता को इंगित करने की आवश्यकता है, जैसे कि वापसी Future.failed (BufferFlowException), जो मेरी समझ के लिए Source.actorRef का उपयोग करके लागू नहीं किया जा सकता है। स्रोत कतार QueueOfferResult API का उपयोग कर विवरण फिट बैठता है। –

+0

@Nik मुझे अभी भी लगता है कि मैंने मूल प्रश्न का सही उत्तर दिया है। आपकी नई आवश्यकता के लिए, मैं वास्तव में कतार को संभालने में विशेष रूप से एक अभिनेता को लागू करता हूं। इसे 'queue.offer (???) पाइप टू स्वयं' का उपयोग करके कतार में जोड़ना होगा, और उसके बाद यह 'प्राप्तकर्ता' विधि में 'QueueOfferResult' के विभिन्न उपप्रकारों को संभालने में विफलता पर प्रतिक्रिया करने में सक्षम होगा। –

+0

अभिनेता मेलबॉक्स ओवरफ़्लो को रोकने के लिए आप 'NonBlockingBoundedMailbox' (http://doc.akka.io/docs/akka/current/scala/mailboxes.html) का उपयोग कर सकते हैं। – khiramatsu

1

जैसा कि @ frederic-a, SourceQueue द्वारा सही ढंग से बताया गया है, एक थ्रेड सुरक्षित समाधान नहीं है।

शायद एक उपयुक्त समाधान MergeHub का उपयोग करना होगा (अधिक जानकारी के लिए docs देखें)। यह आपको प्रभावी ढंग से दो चरणों में अपना ग्राफ चलाने की अनुमति देता है।

    अपने सिंक करने के लिए अपने हब से
  1. (यह एक सिंक करने के लिए भौतिक रूप धारण)
  2. सिंक अपने उपयोगकर्ताओं के लिए बिंदु 1 पर materialized वितरित करते हैं। Sink एस वास्तव में वितरित करने के लिए डिज़ाइन किए गए हैं, इसलिए यह पूरी तरह से सुरक्षित है।

यह समाधान सुरक्षित backpressure के लिहाज से हो सकता है, प्रति MergeHub व्यवहार

के रूप में यदि उपभोक्ता नहीं तो उत्पादकों के सभी कर रहे हैं backpressured रख सकते हैं।

कोड नीचे दिए गए उदाहरण:

val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = 
    MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16) 
    .via(poolClientFlow) 
    .toMat(Sink.foreach({ 
    case ((Success(resp), p)) => p.success(resp) 
    case ((Failure(e), p)) => p.failure(e) 
    }))(Keep.left) 
    .run() 

// on the user threads 

val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ??? 
source.runWith(reqSink) 
संबंधित मुद्दे