2016-05-05 13 views
7

के माध्यम से http अनुरोध करते समय अक्का फ्लो लटकता है मैं अक्का 2.4.4 का उपयोग कर रहा हूं और अपाचे HttpAsyncClient (असफल) से स्थानांतरित करने का प्रयास कर रहा हूं।कनेक्शन पूल

नीचे मेरे प्रोजेक्ट में उपयोग किए जाने वाले कोड का सरलीकृत संस्करण है।

समस्या यह है कि अगर मैं प्रवाह के लिए 1-3 से अधिक अनुरोध भेजता हूं तो यह लटकता है। डिबगिंग के 6 घंटों के बाद भी मैं समस्या का पता नहीं लगा सका। मुझे अपवाद, त्रुटि लॉग, Decider में ईवेंट नहीं दिख रहे हैं। कुछ भी नहीं :)

मैंने connection-timeout को 1s की सेटिंग को कम करने की कोशिश की, यह सोचकर कि यह सर्वर से प्रतिक्रिया की प्रतीक्षा कर रहा है लेकिन इससे मदद नहीं मिली।

मैं क्या गलत कर रहा हूं?

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.headers.Referer 
import akka.http.scaladsl.model.{HttpRequest, HttpResponse} 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.stream.Supervision.Decider 
import akka.stream.scaladsl.{Sink, Source} 
import akka.stream.{ActorAttributes, Supervision} 
import com.typesafe.config.ConfigFactory 

import scala.collection.immutable.{Seq => imSeq} 
import scala.concurrent.{Await, Future} 
import scala.concurrent.duration.Duration 
import scala.util.Try 

object Main { 

    implicit val system = ActorSystem("root") 
    implicit val executor = system.dispatcher 
    val config = ConfigFactory.load() 

    private val baseDomain = "www.google.com" 
    private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config)) 

    private val decider: Decider = { 
    case ex => 
     ex.printStackTrace() 
     Supervision.Stop 
    } 

    private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] = 

    Source.fromIterator(() => items.toIterator) 
     .via(poolClientFlow) 
     .log("Logger")(log = myAdapter) 
     .recoverWith { 
     case ex => 
      println(ex) 
      null 
     } 
     .withAttributes(ActorAttributes.supervisionStrategy(decider)) 
     .runWith(Sink.seq) 
     .map { v => 
     println(s"Got ${v.length} responses in Flow") 
     v.asInstanceOf[Seq[(Try[HttpResponse], T)]] 
     } 

    def main(args: Array[String]) { 

    val headers = imSeq(Referer("https://www.google.com/")) 
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID" 
    val requests = List.fill(10)(reqPair) 
    val qwe = sendMultipleRequests(requests).map { case responses => 
     println(s"Got ${responses.length} responses") 

     system.terminate() 
    } 

    Await.ready(system.whenTerminated, Duration.Inf) 
    } 
} 

इसके अलावा proxy support के साथ क्या हो रहा है? मेरे लिए काम नहीं लग रहा है।

उत्तर

7

आपको प्रतिक्रिया के शरीर को पूरी तरह से उपभोग करने की आवश्यकता है ताकि कनेक्शन बाद के अनुरोधों के लिए उपलब्ध कराया जा सके। आप प्रतिक्रिया संस्था के बारे में बिल्कुल भी परवाह नहीं है, तो आप बस इसे एक Sink.ignore को समाप्त हो सकती है, कुछ इस तरह:

resp.entity.dataBytes.runWith(Sink.ignore) 

डिफ़ॉल्ट config रूप से, जब एक कनेक्शन पूल मेजबान का उपयोग कर, अधिकतम कनेक्शन है 4 पर सेट करें। प्रत्येक पूल में अपनी स्वयं की कतार है जहां अनुरोध तब तक प्रतीक्षा करते हैं जब तक कि खुले कनेक्शन में से कोई भी उपलब्ध न हो जाए। यदि वह कतार कभी 32 से अधिक हो जाती है (डिफ़ॉल्ट कॉन्फ़िगरेशन, बदला जा सकता है, 2 की शक्ति होनी चाहिए) तो आप असफलताओं को देखना शुरू कर देंगे। आपके मामले में, आप केवल 10 अनुरोध करते हैं, इसलिए आप उस सीमा को नहीं दबाते हैं। लेकिन प्रतिक्रिया इकाई का उपभोग न करके आप कनेक्शन को मुक्त नहीं करते हैं और बाकी सब कुछ पीछे की कतार में हैं, कनेक्शन को मुक्त करने की प्रतीक्षा कर रहे हैं।

+0

मैंने वास्तव में कोशिश की और इससे मदद नहीं मिली। शायद मैंने इसे गलत जगह पर रखा है। क्या आप कृपया स्वयं निर्मित निहित प्रोजेक्ट को देख सकते हैं? https://github.com/cppexpert/akka_flow_freezing – expert

+2

हां, यह समस्या है। आप 10 वायदा के परिणामों को अनुक्रमित करने और फिर शरीर को पढ़ने की कोशिश कर रहे हैं। समस्या यह है कि 'अनुक्रम' पर मानचित्र को कॉल करने के लिए, सभी 10 वायदा पूर्ण हो चुके होंगे और केवल पहले 4 होंगे और वे पहले 4 अन्य को अवरुद्ध कर रहे हैं 6. उत्तर पढ़ने के कोड को आगे बढ़ाएं और यह ठीक हो जाएगा आपकी समस्या – cmbaxter

+0

क्या आप दिखा सकते हैं कि आप प्रतिक्रिया पढ़ने के कोड को कैसे आगे बढ़ाएंगे? मैंने कुछ चीजों की कोशिश की और यह अभी भी थोक में वायदा खत्म होने की प्रतीक्षा कर रहा है। मेरे उदाहरण में भी 'parseResponse' को प्रत्येक प्रतिक्रिया पर पहले समकालिक रूप से समझा जाना चाहिए इससे पहले कि यह भविष्य में पारित हो जाए। परिणाम? शायद मैं कतार के 'toMat' पर जा सकता हूं लेकिन फिर मैं विभिन्न प्रतिक्रियाओं को पार्स करने के लिए इसका उपयोग नहीं कर पाऊंगा। प्रत्येक अनुरोध के लिए '(कोई भी, वादा [..])' के साथ लम्बा पैकिंग मुझे बहुत बदसूरत लगता है। – expert

संबंधित मुद्दे