के माध्यम से http अनुरोध करते समय अक्का फ्लो लटकता है मैं अक्का 2.4.4 का उपयोग कर रहा हूं और अपाचे HttpAsyncClient (असफल) से स्थानांतरित करने का प्रयास कर रहा हूं।कनेक्शन पूल
नीचे मेरे प्रोजेक्ट में उपयोग किए जाने वाले कोड का सरलीकृत संस्करण है।
समस्या यह है कि अगर मैं प्रवाह के लिए 1-3 से अधिक अनुरोध भेजता हूं तो यह लटकता है। डिबगिंग के 6 घंटों के बाद भी मैं समस्या का पता नहीं लगा सका। मुझे अपवाद, त्रुटि लॉग, Decider
में ईवेंट नहीं दिख रहे हैं। कुछ भी नहीं :)
मैंने connection-timeout
को 1s की सेटिंग को कम करने की कोशिश की, यह सोचकर कि यह सर्वर से प्रतिक्रिया की प्रतीक्षा कर रहा है लेकिन इससे मदद नहीं मिली।
मैं क्या गलत कर रहा हूं?
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory
import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try
object Main {
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
val config = ConfigFactory.load()
private val baseDomain = "www.google.com"
private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))
private val decider: Decider = {
case ex =>
ex.printStackTrace()
Supervision.Stop
}
private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =
Source.fromIterator(() => items.toIterator)
.via(poolClientFlow)
.log("Logger")(log = myAdapter)
.recoverWith {
case ex =>
println(ex)
null
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
.map { v =>
println(s"Got ${v.length} responses in Flow")
v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
}
def main(args: Array[String]) {
val headers = imSeq(Referer("https://www.google.com/"))
val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
val requests = List.fill(10)(reqPair)
val qwe = sendMultipleRequests(requests).map { case responses =>
println(s"Got ${responses.length} responses")
system.terminate()
}
Await.ready(system.whenTerminated, Duration.Inf)
}
}
इसके अलावा proxy support के साथ क्या हो रहा है? मेरे लिए काम नहीं लग रहा है।
मैंने वास्तव में कोशिश की और इससे मदद नहीं मिली। शायद मैंने इसे गलत जगह पर रखा है। क्या आप कृपया स्वयं निर्मित निहित प्रोजेक्ट को देख सकते हैं? https://github.com/cppexpert/akka_flow_freezing – expert
हां, यह समस्या है। आप 10 वायदा के परिणामों को अनुक्रमित करने और फिर शरीर को पढ़ने की कोशिश कर रहे हैं। समस्या यह है कि 'अनुक्रम' पर मानचित्र को कॉल करने के लिए, सभी 10 वायदा पूर्ण हो चुके होंगे और केवल पहले 4 होंगे और वे पहले 4 अन्य को अवरुद्ध कर रहे हैं 6. उत्तर पढ़ने के कोड को आगे बढ़ाएं और यह ठीक हो जाएगा आपकी समस्या – cmbaxter
क्या आप दिखा सकते हैं कि आप प्रतिक्रिया पढ़ने के कोड को कैसे आगे बढ़ाएंगे? मैंने कुछ चीजों की कोशिश की और यह अभी भी थोक में वायदा खत्म होने की प्रतीक्षा कर रहा है। मेरे उदाहरण में भी 'parseResponse' को प्रत्येक प्रतिक्रिया पर पहले समकालिक रूप से समझा जाना चाहिए इससे पहले कि यह भविष्य में पारित हो जाए। परिणाम? शायद मैं कतार के 'toMat' पर जा सकता हूं लेकिन फिर मैं विभिन्न प्रतिक्रियाओं को पार्स करने के लिए इसका उपयोग नहीं कर पाऊंगा। प्रत्येक अनुरोध के लिए '(कोई भी, वादा [..])' के साथ लम्बा पैकिंग मुझे बहुत बदसूरत लगता है। – expert