2015-09-07 31 views
32

जावा 8 स्ट्रीम पर "विभाजन" ऑपरेशन को कैसे कार्यान्वित करें? विभाजन से मेरा मतलब है, एक धारा को किसी दिए गए आकार के उप-धाराओं में विभाजित करें। किसी भी तरह यह गुवा Iterators.partition() विधि के समान होगा, बस यह वांछनीय है कि विभाजन सूची के बजाए आलसी मूल्यांकन वाले स्ट्रीम हैं।विभाजन जावा 8 स्ट्रीम

+6

विभाजन lazily का मूल्यांकन करना आम तौर पर मेरे अनुभव में अव्यवहार्य है - क्या तुम अगर आपके पास कई विभाजन के लिए संदर्भ बनाए रखा होने की उम्मीद होती है, और फिर उन्हें आदेश से बाहर पहुंचा? –

+3

@ जोन्स स्केट - विशेष रूप से यदि वे समानांतर हैं। – OldCurmudgeon

+0

आपके बिंदु के लिए धन्यवाद, जॉन, मुझे संदेह है कि। क्या आपको लगता है कि नीचे दिए गए अपने उत्तर में गैर आलसी कार्यान्वयन इष्टतम है? – Trader001

उत्तर

25

यह तय आकार बैचों के लिए मनमाने ढंग से स्रोत धारा विभाजन करना असंभव है क्योंकि यह समानांतर प्रसंस्करण अप पेंच होगा। समानांतर में प्रसंस्करण करते समय आपको पता नहीं हो सकता कि विभाजन के बाद पहले उप-कार्य में कितने तत्व हैं, इसलिए आप अगले उप-कार्य के लिए विभाजन नहीं बना सकते हैं जब तक कि पूरी तरह से संसाधित नहीं हो जाता है।

हालांकि यादृच्छिक पहुंच List से विभाजन की धारा बनाना संभव है। इस तरह की सुविधा है, उदाहरण के लिए, उपलब्ध है मेरी StreamEx पुस्तकालय में:

List<Type> input = Arrays.asList(...); 

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize); 

या आप वास्तव में नदियों की धारा चाहते हैं:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream); 

आप तीसरे पक्ष के पुस्तकालयों पर निर्भर नहीं करना चाहते हैं , आप इस तरह ofSubLists विधि मैन्युअल रूप से लागू कर सकते हैं:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) { 
    if (length <= 0) 
     throw new IllegalArgumentException("length = " + length); 
    int size = source.size(); 
    if (size <= 0) 
     return Stream.empty(); 
    int fullChunks = (size - 1)/length; 
    return IntStream.range(0, fullChunks + 1).mapToObj(
     n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); 
} 

इस कार्यान्वयन एक छोटा सा लंबा लग रहा है, लेकिन यह clo जैसे कुछ कोने मामलों को ध्यान में रखता से-टू-MAX_VALUE सूची आकार।


यदि आप चाहते हैं अव्यवस्थित स्ट्रीम के लिए समानांतर के अनुकूल समाधान (ताकि आप परवाह नहीं है जो धारा तत्वों बैच में जोड़ दिए जाएँगे), आप (प्रेरणा के लिए @sibnick करने के लिए धन्यवाद) इस तरह कलेक्टर का उपयोग कर सकते :

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
        Collector<List<T>, A, R> downstream) { 
    class Acc { 
     List<T> cur = new ArrayList<>(); 
     A acc = downstream.supplier().get(); 
    } 
    BiConsumer<Acc, T> accumulator = (acc, t) -> { 
     acc.cur.add(t); 
     if(acc.cur.size() == batchSize) { 
      downstream.accumulator().accept(acc.acc, acc.cur); 
      acc.cur = new ArrayList<>(); 
     } 
    }; 
    return Collector.of(Acc::new, accumulator, 
      (acc1, acc2) -> { 
       acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc); 
       for(T t : acc2.cur) accumulator.accept(acc1, t); 
       return acc1; 
      }, acc -> { 
       if(!acc.cur.isEmpty()) 
        downstream.accumulator().accept(acc.acc, acc.cur); 
       return downstream.finisher().apply(acc.acc); 
      }, Collector.Characteristics.UNORDERED); 
} 

प्रयोग उदाहरण:

List<List<Integer>> list = IntStream.range(0,20) 
            .boxed().parallel() 
            .collect(unorderedBatches(3, Collectors.toList())); 

परिणाम:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]] 

इस तरह के संग्राहक पूरी तरह से थ्रेड-सुरक्षित है और अनुक्रमिक धारा के लिए आदेशित बैचों का उत्पादन करता है।

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
     Collector<T, AA, B> batchCollector, 
     Collector<B, A, R> downstream) { 
    return unorderedBatches(batchSize, 
      Collectors.mapping(list -> list.stream().collect(batchCollector), downstream)); 
} 

उदाहरण के लिए इस तरह से, आप मक्खी पर हर बैच में संख्या जोड़ सकते हैं::

आप हर बैच के लिए एक मध्यवर्ती परिवर्तन लागू करना चाहते हैं, तो आपको निम्न संस्करण का उपयोग कर सकते

List<Integer> list = IntStream.range(0,20) 
     .boxed().parallel() 
     .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
      Collectors.toList())); 
+0

मुझे स्ट्रीमएक्स में समानांतर समाधान जैसे कुछ देखने में बहुत दिलचस्पी होगी (जो मेरी परियोजनाओं में प्रमुख बन गया है जिस तरह से गुवा और लंबोक है)। कम क्योंकि मुझे paralellism की परवाह है, लेकिन इसके बजाय यह धाराओं पर काम करता है - StreamEx.ofSubLists के लिए आपको पहले से ही एक संक्षिप्त सूची की आवश्यकता है, जबकि मेरे उपयोग के मामले आमतौर पर चल रही धाराएं हैं जिन्हें मैं संग्रह में पतन नहीं करना चाहता हूं और स्मृति में हूं यकायक। – Torque

3

ऐसा लगता है जैसे जॉन स्कीट ने अपने comment में दिखाया है, विभाजन को आलसी बनाना संभव नहीं है। गैर आलसी विभाजन के लिए, मैं पहले से ही इस कोड है:

public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) { 
    final Iterator<T> it = source.iterator(); 
    final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream); 
    final Iterable<Stream<T>> iterable =() -> partIt; 

    return StreamSupport.stream(iterable.spliterator(), false); 
} 
+10

मुझे पता है कि यह एक पुराना विषय है, लेकिन ऐसा लगता है कि यह उल्लेखनीय है - यह शुद्ध जावा 8 नहीं है: 'इटरेटर' कक्षा गुवा से है। –

0

मुझे लगता है कि इसके अंदर हैक किसी प्रकार के साथ संभव है:

बैच के लिए उपयोगिता वर्ग बनाने के लिए:

0,123,
public static class ConcurrentBatch { 
    private AtomicLong id = new AtomicLong(); 
    private int batchSize; 

    public ConcurrentBatch(int batchSize) { 
     this.batchSize = batchSize; 
    } 

    public long next() { 
     return (id.getAndIncrement())/batchSize; 
    } 

    public int getBatchSize() { 
     return batchSize; 
    } 
} 

और विधि: - जो मुझे लगता है कि आप वास्तव में क्या चाहते है

public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){ 
    ConcurrentBatch batch = new ConcurrentBatch(batchSize); 
    //hack java map: extends and override computeIfAbsent 
    Supplier<ConcurrentMap<Long, List<T>>> mapFactory =() -> new ConcurrentHashMap<Long, List<T>>() { 
     @Override 
     public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) { 
      List<T> rs = super.computeIfAbsent(key, mappingFunction); 
      //apply batchFunc to old lists, when new batch list is created 
      if(rs.isEmpty()){ 
       for(Entry<Long, List<T>> e : entrySet()) { 
        List<T> batchList = e.getValue(); 
        //todo: need to improve 
        synchronized (batchList) { 
         if (batchList.size() == batch.getBatchSize()){ 
          batchFunc.accept(batchList); 
          remove(e.getKey()); 
          batchList.clear(); 
         } 
        } 
       } 
      } 
      return rs; 
     } 
    }; 
    stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s)) 
      .collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList()))) 
      .entrySet() 
      .stream() 
      //map contains only unprocessed lists (size<batchSize) 
      .forEach(e -> batchFunc.accept(e.getValue())); 
} 
+0

आप अनियंत्रित तथ्य के बारे में सही हैं और यही वजह है कि मैंने इसे 'हैक' कहा। इसके अलावा आप गैर-परमाणु 'computeIfAbsent' के बारे में सही हैं। मैं जल्द ही कोड संपादित करूंगा। लेकिन यह आलसी क्यों नहीं है? यह एक बैच संसाधित करने से पहले सभी सूचियों को आवंटित नहीं किया जाता है। यह भी आम है कि समवर्ती बैच प्रसंस्करण का आदेश नहीं दिया जाता है। – sibnick

+1

समांतर धारा के लिए यह बिल्कुल काम नहीं करता है। 'लागू करेंकंक्रंटबैचटॉस्ट्रीम (System.out :: println, IntStream.range (0,100) .boxed() समानांतर(), 3) 'प्रिंट कचरा (यादृच्छिक रूप से एकत्रित समूह, कुछ तत्व दोहराने, यहां तक ​​कि समूहों की संख्या रनों के बीच भिन्न होती है)। अनुक्रमिक-केवल धाराओं के लिए बहुत सरल और अधिक कुशल समाधान हैं (जैसे ओपी द्वारा प्रस्तुत किया गया)। –

+0

लेकिन आप बग का स्रोत भी दिखाते हैं: गैर-परमाणु 'computeIfAbsent'। – sibnick

6

बशर्ते आप स्ट्रीम क्रमिक रूप से उपयोग करना चाहते हैं, यह एक स्ट्रीम (और साथ ही इस तरह के विंडोइंग के रूप में कार्य से संबंधित प्रदर्शन विभाजन के लिए संभव है इस मामले में)। दो पुस्तकालय जो मानक स्ट्रीम के लिए पार्टिटोनिंग का समर्थन करेंगे cyclops-react (मैं लेखक हूं) और jOOλ जो साइक्लोप्स-प्रतिक्रिया विस्तारित करता है (विंडिंग जैसी कार्यक्षमता जोड़ने के लिए)।

साइक्लोप्स-स्ट्रीम में जावा स्ट्रीम पर परिचालन के लिए स्थिर कार्यों StreamUtils का संग्रह है, और विभाजन के लिए विभाजन जैसे splitAt, headAndTail, splitBy, जैसे कार्यों की एक श्रृंखला है।

आकार 30 के नेस्टेड स्ट्रीम के स्ट्रीम में स्ट्रीम को विंडो करने के लिए आप विंडो विधि का उपयोग कर सकते हैं।

ओपीएस बिंदु पर, स्ट्रीमिंग शर्तों में, किसी दिए गए आकार के एकाधिक स्ट्रीम में स्ट्रीम को विभाजित करना एक विंडिंग ऑपरेशन (विभाजन विभाजन के बजाए) है।

Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30); 

एक स्ट्रीम विस्तार वर्ग ReactiveSeq कि jool.Seq प्रदान करता है और विंडोइंग कार्यक्षमता जोड़ता कहा जाता है, कि कोड एक छोटे से क्लीनर कर सकते हैं नहीं है।

ReactiveSeq<Integer> seq; 
    ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30); 

Tagir ही उपरोक्त बताते हैं, यह समानांतर धाराओं के लिए उपयुक्त नहीं है। यदि आप एक स्ट्रीम को विंडो या बैच करना चाहते हैं तो आप एक बहुप्रचारित फैशन में निष्पादित करना चाहते हैं। cyclops-react में LazyFutureStream उपयोगी हो सकता है (विंडिंग टू-डू सूची पर है, लेकिन सादे पुराने बैचिंग अब उपलब्ध है)।

इस मामले में डेटा को बहु-निर्माता/सिंगल-कंज्यूमर प्रतीक्षा-मुक्त कतार में स्ट्रीम निष्पादित करने वाले एकाधिक धागे से पारित किया जाएगा और उस कतार के अनुक्रमिक डेटा को फिर से थ्रेड पर वितरित करने से पहले विंडो किया जा सकता है।

Stream<List<Data>> batched = new LazyReact().range(0,1000) 
               .grouped(30) 
               .map(this::process); 
0

यहाँ से AbacusUtil

IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray())); 

घोषणा त्वरित समाधान है: मैं AbacusUtil के डेवलपर हूं।

0

सबसे सुंदर और शुद्ध जावा 8 इस समस्या मैंने पाया के लिए समाधान:

public static <T> List<List<T>> partition(final List<T> list, int batchSize) { 
return IntStream.range(0, getNumberOfPartitions(list, batchSize)) 
       .mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size()))) 
       .collect(toList()); 
} 

//https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java 
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) { 
    return (list.size() + batchSize- 1)/batchSize; 
} 
संबंधित मुद्दे