2016-03-10 7 views
7

में मजबूत रूप से समवर्ती HTTPS अनुरोधों की बड़ी संख्या कैसे बना सकता हूं मेरे पास इनपुट की एक धारा है और मैं प्रोग्राम के दूसरे भाग पर परिणाम पारित करने से पहले प्रत्येक के लिए 2 HTTPS नेटवर्क अनुरोध करना चाहता हूं। ठेठ थ्रूपुट 50 प्रति सेकेंड है।मैं क्लोजर (/ जावा)

for each input: 
    HTTP request A 
    HTTP request B 
    pass event on with (A.body and B.body) 

मैं http-kit ग्राहक है, जो डिफ़ॉल्ट रूप से अतुल्यकालिक है उपयोग कर रहा हूँ। यह एक वादा देता है, और एक कॉलबैक भी ले सकता है। Http-किट, जावा NIO (here और here देखें)

में आ रहा है अनुरोध की गति, समय एक अनुरोध बनाने के लिए के साथ संयुक्त का उपयोग करता है इस अतुल्यकालिक रूप से किया जाना चाहिए काफी अधिक है।

  1. जब कोई ईवेंट में आता है, यह एक चैनल पर डाल दिया:

    मैं 3 दृष्टिकोण की कोशिश की है। चैनल को खींचने की संख्या go दिनचर्या। प्रत्येक अनुरोध करता है कि HTTP अनुरोधों से deref आईएनजी द्वारा goblock को 'ब्लॉक' करें। यह काम नहीं करता है क्योंकि मुझे नहीं लगता कि वादा धागे के साथ अच्छी तरह से खेलता है।

  2. जब कोई ईवेंट आता है, तो तुरंत future शुरू करें, जो एसिंक वादे पर 'ब्लॉक' है। इसके परिणामस्वरूप बहुत उच्च CPU उपयोग होता है। नेटवर्क संसाधनों के प्लस भुखमरी किसी भी तरह से।
  3. जब कोई ईवेंट आता है, तो http-kit अनुरोध अनुरोध के लिए तुरंत अनुरोध करें, कॉलबैक में गुज़रने से अनुरोध बी बी करता है, जो ईवेंट को पास करने वाले कॉलबैक को पास करता है। यह कुछ घंटों के बाद स्मृति त्रुटि से बाहर निकलता है।

ये सभी काम करते हैं और थोड़ी देर के लिए क्षमता को संभालते हैं। वे सभी अंततः दुर्घटनाग्रस्त हो गए। सबसे हालिया दुर्घटना, लगभग 12 घंटे के बाद:

Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run 
WARNING: com[email protected]1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending 
tasks! 
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run 
WARNING: com[email protected]1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status: 
     Managed Threads: 3 
     Active Threads: 1 
     Active Tasks: 
       com[email protected]65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0) 
     Pending Tasks: 
       [email protected]0d 
Pool thread stack traces: 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main] 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560) 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main] 
       java.lang.Object.wait(Native Method) 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534) 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main] 
       java.lang.Object.wait(Native Method) 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534) 


Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen 
java.lang.OutOfMemoryError: Java heap space 
     at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77) 
     at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76) 
     at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65) 
     at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63) 
     at sun.security.ssl.Handshaker.activate(Handshaker.java:514) 
     at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717) 
     at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743) 
     at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310) 
     at org.httpkit.client.HttpClient.run(HttpClient.java:375) 
     at java.lang.Thread.run(Thread.java:745) 
Mar 10, 2016 4:56:34 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 
Mar 10, 2016 5:00:43 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 
Mar 10, 2016 4:58:25 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 

मुझे नहीं पता कि विफलता का कारण क्या है। ऐसा हो सकता है कि बहुत सारे बंद होने या धीरे-धीरे संसाधन रिसाव, या थ्रेड भुखमरी हो रही है।

प्रश्न

  1. प्रति सेकंड 50 HTTP अनुरोध करने का यह, जिनमें से प्रत्येक 200 मि.से लग सकता है, एक अत्यधिक बोझ की तरह जिसका अर्थ है कि वहाँ किसी भी समय पर उड़ान में 100 अनुरोध हो सकता है, ध्वनि?

  2. मैं ऐसा कैसे कर सकता हूं जो थ्रूपुट को नियंत्रित करता है और मजबूत है?

संपादित

YourKit प्रोफाइलर मुझसे कहता है मैं java.util.concurrent.FutureTask रों जिससे पता चलता है कि पुराने हैंडलर (अर्थात अनुरोध) के लिए संदर्भ किसी भी तरह बनाए रखा जा रहा है के माध्यम से org.httpkit.client.Handler रों के माध्यम से char[] के 2GB के बारे में है। कॉलबैक का उपयोग करने का प्रयास करने का पूरा कारण इस से बचने के लिए था (हालांकि वे किसी भी तरह बंद होने में पकड़े जा सकते हैं)

+1

OutOfMemoryError इंगित करता है कि स्मृति पर एक समस्या है ... लेकिन हम आपके कोड को देखे बिना या स्क्रैच से एक पूर्ण समाधान लिखने में मदद नहीं कर सकते हैं। मैं एक अनंत अनुक्रम के सिर पर पकड़ने, या कनेक्शन जैसे संसाधनों की सफाई नहीं करना चाहता हूं। –

+0

मुझे आश्चर्य हुआ कि क्या यह बफर बनाए रखा जा सकता है, लेकिन जहां तक ​​मैं कचरा संग्रह को बता सकता हूं उसे मुक्त करने वाली मेमोरी/बाहरी बफर को संभालना चाहिए। एनआईओ आवंटित किया गया था। डाउनस्ट्रीम क्या होता है बस एक डेटाबेस सम्मिलन और एक चैनल पर एक प्रविष्टि है। – Joe

+0

मैंने कोड पोस्ट करने के बारे में सोचा लेकिन यह काफी शामिल है और मुझे यह जानने के लिए एक दिन लग जाएगा कि क्या मैं किसी सरलीकृत संस्करण में समस्या को दोहराता हूं। – Joe

उत्तर

0

आपकी विधि ए के लिए एक विकल्प (deref HTTP-किट में एक गो-ब्लॉक के अंदर वायदा लौटाया गया हो सकता है) हो सकता है एक संभावना है, बस ऐसा करने के लिए जो कोर को अवरुद्ध नहीं करता है।भविष्य है, जो आप httpkit के कॉलबैक और core.async के संयोजन के द्वारा कर सकते हैं पर async हैंडलर धागे:

(defn handle-event 
"Return a core.async channel that will contain the result of making both HTTP call A and B." 
    [event-data] 
    (let [event-a-chan (clojure.core.async/chan) 
     event-b-chan (clojure.core.async/chan) 
     return-chan (clojure.core.async/chan)] 
    (org.httpkit.client/request "https://event-a-call" 
           {:method :get :params {"param1-k" "param1-v"}} 
           (fn [resp] 
            (clojure.core.async/put! event-a-chan resp))) 
    (org.httpkit.client/request "https://event-b-call" 
           {:method :get :params {"param1-k" "param1-v"}} 
           (fn [resp] 
            (clojure.core.async/put! event-b-chan resp))) 
    (clojure.core.async/go 
     (clojure.core.async/>! return-chan {:event-a-response (clojure.core.async/<! event-a-chan) 
              :event-b-response (clojure.core.async/<! event-b-chan)})) 
    return-chan)) 
0
  1. प्रति सेकंड 50 HTTP अनुरोध, जिनमें से प्रत्येक 200 मि.से ले सकता करने का यह, जिसका अर्थ है कि वहाँ हो सकता है किसी भी समय उड़ान में 100 अनुरोध हो, अत्यधिक बोझ की तरह आवाज?

यह निश्चित रूप से आधुनिक हार्डवेयर पर अत्यधिक नहीं है।

  1. मैं इसे कैसे तरीके से करता हूं जो थ्रूपुट को नियंत्रित करता है और मजबूत है?

आप core.async पाइपलाइनों और http-किट का कॉलबैक इस लक्ष्य को हासिल करने के लिए जोड़ सकते हैं। आपको प्रत्येक अनुरोध के लिए go दिनचर्या बनाने की ज़रूरत नहीं है (हालांकि इसे चोट नहीं पहुंची), क्योंकि आप http-kit कॉलबैक से async put! का उपयोग कर सकते हैं।

सक्रिय कनेक्शन की संख्या को सीमित करने के लिए पाइपलाइन के प्रत्येक चरण के लिए बाध्य बफर का उपयोग करें, जो कम से कम आपके सिस्टम पर उपलब्ध अल्पकालिक टीसीपी पोर्टों की संख्या से बाधित होगा।

यहां एक छोटे से प्रोग्राम का एक उदाहरण दिया गया है जो आपके द्वारा वर्णित कुछ जैसा ही करता है। इस मामले में यह चैनल — से "ईवेंट" पढ़ता है, प्रत्येक ईवेंट आईडी "1" — है और उन आईडी को HTTP सेवा पर देखता है। यह उस पहली कॉल से प्रतिक्रिया लेता है, जेएसओएन कुंजी "next" देखता है और यह बताता है कि चरण 2 के लिए यूआरएल के रूप में। अंत में, जब यह लुकअप पूरा हो जाता है तो यह out चैनल में एक ईवेंट जोड़ता है जो go नियमित रिपोर्ट आंकड़ों की रिपोर्ट करने के लिए करता है।

(ns concur-req.core 
    (require [clojure.core.async :as async] 
      [cheshire.core :refer [decode]] 
      [org.httpkit.client :as http])) 

(defn url-of 
    [id] 
    ;; this service responds within 100-200ms 
    (str "http://localhost:28080/" id ".json")) 

(defn retrieve-json-async 
    [url c] 
    (http/get url nil 
      (fn [{body :body status :status :as resp}] 
       (if (= 200 status) 
       (async/put! c (decode body true)) 
       (println "ERROR:" resp)) 
       (async/close! c)))) 

(defn run [parallelism stop-chan] 
    (let [;; allocate half of the parallelism to each step 
     step1-n (int (max (/ parallelism 2) 1)) 
     step2-n step1-n 
     ;; buffer to take ids, transform them into urls 
     step1-chan (async/chan step1-n (map url-of)) 
     ;; buffer for result of pulling urls from step1, xform by extracting :next url 
     step2-chan (async/chan step2-n (map :next)) 
     ;; buffer to count completed results 
     out-chan (async/chan 1 (map (constantly 1))) 
     ;; for delivering the final result 
     final-chan (async/chan) 
     start-time (System/currentTimeMillis)] 

    ;; process URLs from step1 and put the result in step2 
    (async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan) 
    ;; process URLs from step2 and put the result in out 
    (async/pipeline-async step2-n out-chan retrieve-json-async step2-chan) 

    ;; keep the input channel full until stop-chan is closed. 
    (async/go-loop [] 
     (let [[v c] (async/alts! [stop-chan [step1-chan "1"]])] 
     (if (= c stop-chan) 
      (async/close! step1-chan) 
      (recur)))) 

    ;; count messages on out-chan until the pipeline is closed, printing 
    ;; status message every second 
    (async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0] 
     (let [[v c] (async/alts! [status-timer out-chan])] 
     (cond (= c status-timer) 
       (do (println subt "records...") 
        (recur (async/timeout 1000) 0 (+ subt accu))) 

       (nil? v) 
       (async/>! final-chan (+ subt accu)) 

       :else 
       (recur status-timer (+ v subt) accu)))) 

    ;; block until done, then emit final report. 
    (let [final-total (async/<!! final-chan) 
      elapsed-ms (- (System/currentTimeMillis) start-time) 
      elapsed-s (/ elapsed-ms 1000.0)] 
     (print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n" 
        final-total parallelism elapsed-s 
        (int (/ final-total elapsed-s))))))) 

(defn run-for 
    [seconds parallelism] 
    (let [stop-chan (async/chan)] 
    (future 
     (Thread/sleep (* seconds 1000)) 
     (async/close! stop-chan)) 
    (run parallelism stop-chan))) 

(do 
    ;; Warm up the connection pool, avoid somaxconn problems... 
    (doseq [p (map #(* 20 (inc %)) (range 25))] 
    (run-for 1 p)) 
    (run-for (* 60 60 6) 500)) 

इसका परीक्षण करने के लिए, मैंने एक HTTP सेवा स्थापित की है जो केवल 100-200ms के बीच यादृच्छिक समय सोने के बाद प्रतिक्रिया देती है। तब मैंने अपने मैकबुक प्रो पर 6 घंटे के लिए इस कार्यक्रम को चलाया।

समांतरता 500 के साथ सेट के साथ, मुझे औसतन 1155 पूर्ण लेनदेन प्रति सेकंड (2310 प्रति सेकेंड HTTP अनुरोध पूर्ण) मिला। मुझे यकीन है कि यह कुछ ट्यूनिंग (और विशेष रूप से HTTP सेवा को एक अलग मशीन पर ले जाकर) के साथ बहुत अधिक हो सकता है। जेवीएम मेमोरी पहले 30 मिनट के भीतर 1.5 जीबी तक गिर गई और फिर उस आकार को बनाए रखा। मैं ओरेकल के 64-बिट 1.8 जेवीएम का उपयोग कर रहा हूं।