2016-02-23 6 views
6

डिफ़ॉल्ट रूप से जावा स्ट्रीम को common thread pool द्वारा संसाधित किया जाता है, जो डिफ़ॉल्ट पैरामीटर के साथ बनाया गया है। जैसा कि another question में उत्तर दिया गया है, कोई कस्टम पूल निर्दिष्ट करके या java.util.concurrent.ForkJoinPool.common.parallelism सिस्टम पैरामीटर सेट करके इन डिफ़ॉल्ट को समायोजित कर सकता है।मैं जावा स्ट्रीम को थ्रेड की डिफ़ॉल्ट संख्या से अधिक कैसे संसाधित कर सकता हूं?

हालांकि, मैं इन दो विधियों में से किसी एक द्वारा प्रसंस्करण स्ट्रीम करने के लिए आवंटित धागे की संख्या में वृद्धि करने में असमर्थ रहा हूं। उदाहरण के तौर पर, नीचे दिए गए कार्यक्रम पर विचार करें, जो उसके पहले तर्क में निर्दिष्ट फ़ाइल में निहित आईपी पते की एक सूची को संसाधित करता है और हल किए गए पते को आउटपुट करता है। लगभग 13000 अद्वितीय आईपी पते वाले फ़ाइल पर इसे चलाना, मैं ओरेकल जावा मिशन कंट्रोल का उपयोग 16 धागे के रूप में देख रहा हूं। इनमें से केवल पांच ही ForkJoinPool कर्मचारी हैं। फिर भी, यह विशेष कार्य कई और धागे से लाभान्वित होगा, क्योंकि थ्रेड अधिकांश समय DNS प्रतिक्रियाओं के लिए प्रतीक्षा करते हैं। तो मेरा सवाल यह है कि, मैं वास्तव में इस्तेमाल किए गए धागे की संख्या कैसे बढ़ा सकता हूं?

मैंने तीन वातावरण पर कार्यक्रम की कोशिश की है; ये ओएस-रिपोर्ट की संख्या धागे हैं। 17 धागे

  • जावा SE रनटाइम वातावरण एक 2-कोर OS X चला मशीन पर निर्माण 1.8.0_66-B17:

    • जावा SE रनटाइम वातावरण एक 8 कोर विंडोज 7 चल मशीन पर 1.8.0_73-B02 का निर्माण डार्विन 15.2.0: 23 धागे
    • openjdk संस्करण 1.8.0_72 एक 24 कोर FreeBSD 11.0 चल मशीन पर: 44 धागे
    
    import java.io.IOException; 
    import java.net.InetAddress; 
    import java.net.UnknownHostException; 
    import java.nio.file.Files; 
    import java.nio.file.Files; 
    import java.nio.file.Path; 
    import java.nio.file.Paths; 
    import java.util.concurrent.ForkJoinPool; 
    
    /** Resolve IP addresses in file args[0] using 100 threads */ 
    public class Resolve100 { 
        /** Resolve the passed IP address into a name */ 
        static String addressName(String ipAddress) { 
         try { 
          return InetAddress.getByName(ipAddress).getHostName(); 
         } catch (UnknownHostException e) { 
          return ipAddress; 
         } 
        } 
    
        public static void main(String[] args) { 
         Path path = Paths.get(args[0]); 
         ForkJoinPool fjp = new ForkJoinPool(100); 
         try { 
          fjp.submit(() -> { 
           try { 
            Files.lines(path) 
            .parallel() 
            .map(line -> addressName(line)) 
            .forEach(System.out::println); 
           } catch (IOException e) { 
            System.err.println("Failed: " + e); 
           } 
          }).get(); 
         } catch (Exception e) { 
          System.err.println("Failed: " + e); 
         } 
        } 
    } 
    
  • +2

    आपको कोशिश-संसाधन-संसाधन विवरण में 'Files.lines() 'संलग्न करना चाहिए! – fge

    +2

    मेरा सुझाव है कि आप समांतर() इसे करने का प्रयास करने से पहले सूची में लाइनें जोड़ें। यह बहुत बेहतर काम करता है जब यह जानता है कि अग्रिम में कितनी प्रविष्टियां हैं। –

    उत्तर

    6

    वहाँ अपने दृष्टिकोण के साथ दो समस्याएं हैं। सबसे पहले की जाती है कि कस्टम FJP का उपयोग कर धारा API द्वारा निर्मित व्यक्तिगत कार्यों की अधिकतम संख्या में परिवर्तन नहीं होगा के रूप में इस in the following way परिभाषित किया गया है:

    static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 
    

    तो आप कस्टम पूल का उपयोग कर रहे हैं, भले ही, समानांतर कार्यों की संख्या सीमित हो जाएगा commonPoolParallelism * 4 द्वारा। (यह वास्तव में कठिन सीमा नहीं है, बल्कि एक लक्ष्य है, लेकिन कई मामलों में कार्यों की संख्या इस संख्या के बराबर है)।

    उपर्युक्त समस्या java.util.concurrent.ForkJoinPool.common.parallelism सिस्टम प्रॉपर्टी का उपयोग करके तय की जा सकती है, लेकिन यहां आप एक और समस्या दबाते हैं: Files.lines वास्तव में बुरी तरह समांतर है। विवरण के लिए this question देखें। विशेष रूप से, 13000 इनपुट लाइनों के लिए अधिकतम संभव स्पीडअप 3.17x है (यह मानते हुए कि प्रत्येक लाइन प्रसंस्करण लगभग एक ही समय लेता है) भले ही आपके पास 100 सीपीयू हों। मेरी StreamEx लाइब्रेरी इसके लिए एक कार्य-आसपास प्रदान करती है (StreamEx.ofLines(path).parallel() का उपयोग करके स्ट्रीम बनाएं)।

    Files.readAllLines(path).parallelStream()... 
    

    इस प्रणाली संपत्ति के साथ काम करेगा: एक अन्य संभावित समाधान फ़ाइल लाइनों List में क्रमिक रूप से पढ़ने के लिए है, तो यह से एक समानांतर धारा बना है। हालांकि सामान्य स्ट्रीम एपीआई में समांतर प्रसंस्करण के लिए उपयुक्त नहीं है जब कार्यों में I/O शामिल होता है। अधिक लचीला समाधान प्रत्येक पंक्ति के लिए CompletableFuture उपयोग करने के लिए है:

    ForkJoinPool fjp = new ForkJoinPool(100); 
    List<CompletableFuture<String>> list = Files.lines(path) 
        .map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp)) 
        .collect(Collectors.toList()); 
    list.stream().map(CompletableFuture::join) 
        .forEach(System.out::println); 
    

    इस तरह से आप प्रणाली संपत्ति बदलाव करने और की जरूरत नहीं है अलग कार्यों के लिए अलग-अलग पूल का उपयोग कर सकते हैं।

    +0

    और यह अनिवार्य नहीं होना चाहिए कि धागे की संख्या को बदलने के लिए यह तकनीक पूरी तरह से कार्यान्वित, अनिर्दिष्ट व्यवहार और कुछ भी नहीं है, डेवलपर्स को – Holger

    +0

    @ होल्गर पर भरोसा करना चाहिए, मुझे लगता है कि आपका मतलब है। सबमिट विधि, है ना? –

    +0

    धन्यवाद! CompletableFuture दृष्टिकोण वास्तव में 100 धागे पैदा करता है और परिमाण गतिशीलता का एक आदेश प्रदान करता है। यहां संख्याएं हैं। मूल: 48m40.036s; CompletableFuture: 0m37.465s। (ध्यान दें कि मूल संस्करण भी एक गर्म DNS कैश पर चलता है।) –

    संबंधित मुद्दे

     संबंधित मुद्दे