2015-12-14 8 views
10

मनमाने ढंग से बड़ी संख्या में कार्य पूरा होने के लिए जावा सिंक्रनाइज़ेशन ऑब्जेक्ट का उपयोग करना चाहिए? बाधाएं हैं:सभी कार्यों को पूरा करने के लिए सिंक्रनाइज़ेशन ऑब्जेक्ट

  1. प्रत्येक कार्य को पूरा करने के लिए एक गैर-तुच्छ राशि लेती है और समानांतर में कार्य करने के लिए उपयुक्त है।
  2. स्मृति में फिट करने के लिए बहुत सारे कार्य हैं (यानी मैं को प्रत्येक कार्य के लिए Collection में नहीं डाल सकता और फिर सभी वायदा पर get पर कॉल कर सकता हूं)।
  3. मुझे नहीं पता कि कितने कार्य होंगे (यानी मैं CountDownLatch का उपयोग नहीं कर सकता)।
  4. ExecutorService तो साझा किया जा सकता मैं awaitTermination(long, TimeUnit)

उदाहरण के लिए, उपयोग नहीं कर सकते ग्रांड सेंट्रल डिस्पैच के साथ, मैं कुछ इस तरह कर सकते हैं:

let workQueue = dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0) 
let latch = dispatch_group_create() 
let startTime = NSDate() 
var itemsProcessed = 0 
let countUpdateQueue = dispatch_queue_create("countUpdateQueue", DISPATCH_QUEUE_SERIAL) 
for item in fetchItems() // generator returns too many items to store in memory 
{ 
    dispatch_group_enter(latch) 
    dispatch_async(workQueue) 
    { 
     self.processItem(item) // method takes a non-trivial amount of time to run 
     dispatch_async(countUpdateQueue) 
     { 
      itemsProcessed++ 
     } 
     dispatch_group_leave(latch) 
    } 
} 
dispatch_group_wait(latch, DISPATCH_TIME_FOREVER) 
let endTime = NSDate() 
let totalTime = endTime.timeIntervalSinceDate(startTime) 
print("Processed \(itemsProcessed) items in \(totalTime) seconds.") 

यह उत्पादन है कि (के लिए इस तरह दिखता है पैदा करता है 128 आइटम): Processed 128 items in 1.846794962883 seconds.

मैं एक Phaser साथ कुछ इसी तरह की कोशिश की:

final Executor executor = new ThreadPoolExecutor(64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>(8), new CallerRunsPolicy()); 
final Phaser latch = new Phaser(0); 
final long startTime = currentTimeMillis(); 
final AtomicInteger itemsProcessed = new AtomicInteger(0); 
for(final String item : fetchItems()) // iterator returns too many items to store in memory 
{ 
    latch.register(); 
    final Runnable task = new Runnable() { 
     public void run() { 
      processItem(item); // method takes a non-trivial amount of time to run 
      itemsProcessed.incrementAndGet(); 
      latch.arrive(); 
     } 
    }; 
    executor.execute(task); 
} 
latch.awaitAdvance(0); 
final long endTime = currentTimeMillis(); 
out.println("Processed " + itemsProcessed.get() + " items in " + (endTime - startTime)/1000.0 + " seconds."); 

कार्य करने के हमेशा पूरा नहीं हुआ पिछले प्रिंट बयान से पहले और मैं उत्पादन है कि इस तरह दिखता है मिल सकता है (128 मदों के लिए): Processed 121 items in 5.296 seconds.Phaser भी सही उपयोग करने के लिए वस्तु है? दस्तावेज इंगित करता है कि यह केवल 65,535 पार्टियों का समर्थन करता है, इसलिए मुझे या तो प्रक्रियाओं को संसाधित करने या Phaser स्तरीय प्रकार के परिचय देने की आवश्यकता होगी।

+1

जब आप कहते हैं कि स्मृति में फिट करने के लिए बहुत सारे कार्य हैं तो क्या यह संसाधन बाधा नहीं है? तो आप समान रूप से उन कार्यों को निष्पादित कर सकते हैं जो स्मृति में फिट होते हैं और आप इसके लिए भविष्य का उपयोग कर सकते हैं। सुनिश्चित नहीं है कि कोई दृष्टिकोण पास संसाधन प्रदूषण को पार करने में मदद करेगा और आप एक बहुत ही जटिल समाधान के साथ समाप्त हो जाएंगे। – Lokesh

उत्तर

1

"यह सुनिश्चित करने के लिए कि मनमाने ढंग से बड़ी संख्या में कार्य पूरा हो जाएं" - पूरा कार्य पूरा करने के लिए सबसे आसान तरीका है, कार्य को अवरुद्ध करने के साथ, कार्य की संख्या की संख्या तक पहुंचने के लिए। ऐसा कोई तैयार वर्ग है, लेकिन यह सुनिश्चित करने के लिए आसान है एक:

class EventCounter { 
    long counter=0; 

    synchronized void up() { 
    counter++; 
    notifyAll(); 
    } 
    synchronized void ensure (long count) { 
    while (counter<count) wait(); 
    } 
} 

"वहाँ स्मृति में फिट करने के लिए भी कई कार्य हैं" - तो नए कार्य जमा करने की प्रक्रिया को निलंबित कर दिया जाना चाहिए जब में चल रहे कार्यों की संख्या बहुत ऊंचा है।

Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks); 
EventCounter eventCounter =new EventCounter(); 

for(final String item : fetchItems()) { 
    final Runnable task = new Runnable() { 
     public void run() { 
      processItem(item); 
      runningTasksSema.release(); 
      eventCounter.up(); 
     } 
    }; 
    runningTasksSema.aquire(); 
    executor.execute(task); 
} 

एक धागा सुनिश्चित करने के लिए कुछ कार्यों को दिए गए नंबर पूरा कर रहे हैं चाहता है, यह आह्वान:

eventCounter.ensure(givenNumberOfFinishedTasks); 
सबसे आसान तरीका है एक संसाधन के रूप में चल रहे कार्यों की संख्या पर विचार करने और एक सेमाफोर के साथ गिनती करने के लिए है

runningTasksSema.aquire() और eventCounter.ensure() संचालन के असीमित (गैर-अवरुद्ध) संस्करणों को डिज़ाइन किया जा सकता है, लेकिन वे अधिक जटिल होंगे।

1

इस उदाहरण में Phaser उपयोग के साथ समस्या यह है कि CallerRunsPolicy किसी कार्य की शुरुआत धागे पर अमल करने के लिए अनुमति देता है। इस प्रकार, जबकि लूप अभी भी प्रगति पर है, आने वाली पार्टियों की संख्या पंजीकृत पार्टियों की संख्या के बराबर हो सकती है, जिससे चरण बढ़ता जा सकता है। समाधान Phaser को 1 पार्टी के साथ शुरू करना है, जब लूप समाप्त हो गया है, तो पहुंचें और अन्य पार्टियों के आने का इंतजार करें। यह सुनिश्चित करता है कि चरण 1 तक बढ़ता नहीं है जब तक कि सभी कार्य पूर्ण नहीं होते हैं।

final Executor executor = new ThreadPoolExecutor(64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>(8), new CallerRunsPolicy()); 
final Phaser latch = new Phaser(1); 
final long startTime = currentTimeMillis(); 
final AtomicInteger itemsProcessed = new AtomicInteger(0); 
for(final String item : fetchItems()) // iterator returns too many items to store in memory 
{ 
    latch.register(); 
    final Runnable task = new Runnable() { 
     public void run() { 
      processItem(item); // method takes a non-trivial amount of time to run 
      itemsProcessed.incrementAndGet(); 
      final int arrivalPhase = latch.arrive(); 
     } 
    }; 
    executor.execute(task); 
} 
latch.arriveAndAwaitAdvance(); 
final long endTime = currentTimeMillis(); 
out.println("Processed " + itemsProcessed.get() + " items in " + (endTime - startTime)/1000.0 + " seconds."); 
0

मामले में अगर आप java8 पर हैं तो आप CompletableFuture

java.util.concurrent.CompletableFuture.allOf(CompletableFuture<?>... cfs) 

कि पारित कर दिया सरणी में सभी वायदा के परिणाम के लिए इंतजार करेंगे उपयोग कर सकते हैं।

संबंधित मुद्दे