2012-02-13 8 views
7

थ्रेडिंग मैं निम्नलिखित पूरा करने के लिए कोशिश कर रहा हूँ:पर्ल कतार और

  1. एक धागा है कि एक बहुत बड़ी फ़ाइल से डेटा पढ़ता कहना के बारे में 10GB और उन्हें कतार में धक्का है। (मैं लिए कतार के लिए इच्छा नहीं है या तो बहुत बड़ी प्राप्त)

  2. buildQueue धागा एक ही समय में कतार को सूचना भेजे है, वहीं के बारे में 5 कार्यकर्ता धागे को डी-पंक्ति और प्रक्रिया डेटा है।

मैं एक प्रयास किया है, लेकिन मेरे दूसरे धागे मेरी buildQueue धागा में एक सतत पाश की वजह से नहीं पहुंचा जा सकता है।

मेरा दृष्टिकोण पूरी तरह गलत हो सकता है। किसी भी मदद के लिए धन्यवाद, यह बहुत सराहना की है।

यहाँ buildQueue के लिए कोड है:

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open DICT_FILE, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<DICT_FILE>) { 
      if ($queue->pending() < 100) { 
       my $query = <DICT_FILE>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

और क्योंकि इस सूत्र प्रक्रिया पूरी नहीं होगी, जैसा कि मैंने उम्मीद है कि जब इस सूत्र कुछ भी नहीं मार डाला जाता है और कुछ के बाद निष्पादित किया जाएगा।

my $builder = new Thread(&buildQueue); 

चूंकि बिल्डर थ्रेड लंबे समय तक चल रहा है, इसलिए मुझे कभी भी कार्यकर्ता धागे नहीं मिलते हैं।

#!/usr/bin/perl -w 
use strict; 
use Thread; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
my @threads; 

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open dict_file, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<dict_file>) { 
      if ($queue->pending() < 100) { 
       my $query = <dict_file>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

sub processor { 
    my $query; 
    while (1) { 
     if ($query = $queue->dequeue) { 
      print "$query\n"; 
     } 
    } 
} 

my $builder = new Thread(&buildQueue); 
push @threads, new Thread(&processor) for 1..5; 
+0

सवालों की एक जोड़ी: आप उल्लेख है कि अपने कतार निर्माता धागा खत्म नहीं होगा, लेकिन यह कुछ भी करता है? क्या कतार का आकार कभी भी 100 से नीचे डुबकी या 0 से ऊपर जाता है? साथ ही, [मुझे यकीन नहीं है कि आप अपने धागे सही तरीके से बना रहे हैं] (http://perldoc.perl.org/perlthrtut.html)। यह 'मेरा $ builder = threads-> बनाना नहीं है (\ & buildQueue); '? –

+0

कतार निर्माता ठीक बनाता है लेकिन चूंकि कार्यकर्ता धागे बनाए जाने के लिए नहीं पहुंचे थे, इसलिए वे कतार से कुछ भी नहीं हटा सकते हैं, इसलिए कतार 100 पर फंस गई है जबकि लगातार कूप की वजह से निर्माण कतार अभी भी चल रही है। – Sinista

+0

हम्म, मुझे संदर्भ स्थापित करने के लिए और अधिक कोड देखने की आवश्यकता होगी, खासकर जहां आप धागे बनाते हैं। कार्यकर्ता धागे बनाने से पहले आप कतार बिल्डर को 'शामिल' या 'अलग नहीं कर रहे हैं, है ना? –

उत्तर

10

आप चिह्नित करने के लिए जब आप अपने धागे (या तो joinor detach के माध्यम से) बाहर निकलना चाहते की आवश्यकता होगी:

यहाँ पूरे कोड है। तथ्य यह है कि आपके पास last बयान के साथ अनंत लूप हैं जिनमें से बाहर निकलने के लिए भी एक समस्या है।

संपादित करें: मैं भी एक बहुत ही महत्वपूर्ण हिस्सा भूल गया! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue। इसलिए कतार निर्माता के बाद प्रत्येक धागे के लिए हम विशेष रूप से undef क्यों लगाते हैं।

प्रयास करें:

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @threads; #Do you really need our instead of my? 

sub buildQueue 
{ 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 

    #Three-argument open, please! 
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); 
    while(my $query=<$dict_file>) 
    { 
     chomp($query); 
     while(1) 
     { #Wait to see if our queue has < 100 items... 
      if ($queue->pending() < 100) 
      { 
       $queue->enqueue($query); 
       print "Queue Size: " . $queue->pending . "\n"; 
       last; #This breaks out of the infinite loop 
      } 
     } 
    } 
    close($dict_file); 
    foreach(1..5) 
    { 
     $queue->enqueue(undef); 
    } 
} 

sub processor 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Thread " . threads->tid . " got $query\n"; 
    } 
} 

my $builder=threads->create(\&buildQueue); 
push @threads,threads->create(\&process) for 1..5; 

#Waiting for our threads to finish. 
$builder->join; 
foreach(@threads) 
{ 
    $_->join; 
} 
+1

ऐसा लगता है कि समस्या बहिष्कृत थ्रेड मॉड्यूल था जिसे मैंने थ्रेड मॉड्यूल पर स्विच किया था और मेरा कोड अब जैसा काम करता है। सही दिशा में मुझे इंगित करने के लिए जैक कई धन्यवाद। – Sinista

1

ऐसा लगता है कि इस मामले Parallel::ForkManager मॉड्यूल के साथ कर सकता है।

+0

यदि संभव हो तो फोर्कमेनगर समाधान देखना अच्छा लगेगा। – Sinista

0

एक अलग दृष्टिकोण: तुम भी user_tasksMCE 1.2+ में इस्तेमाल करते हैं और दो बहु कार्यकर्ता बना सकते हैंtasks, पढ़ना (क्योंकि यह एक बड़ी फ़ाइल है, तो आप भी फ़ाइल को संरक्षण समानांतर पढ़ने से फायदा हो सकता है की तलाश पढ़ें) के लिए एक कार्य और प्रसंस्करण के लिए एक कार्य, आदि

नीचे दिया गया कोड अभी भी Thread::Queue का उपयोग आपके बफर कतार को प्रबंधित करने के लिए करता है।

buildQueue उप में आपकी कतार आकार नियंत्रण है और यह डेटा को सीधे मैनेजर प्रक्रिया '$ R_QUEUE पर धक्का देता है क्योंकि हमने धागे का उपयोग किया है, इसलिए इसकी अभिभावक की स्मृति स्थान तक पहुंच है। यदि आप इसके बजाय कांटे का उपयोग करना चाहते हैं, तो आप अभी भी कॉल बैक फ़ंक्शन के माध्यम से कतार तक पहुंच सकते हैं। लेकिन यहां मैंने बस कतार को धक्का दिया है।

processQueue उप कतार में जो कुछ भी है, उसे बस कतार में डालें, जब तक कि कुछ भी लंबित न हो।

प्रत्येक कार्य में task_end सब प्रत्येक कार्य के अंत में प्रबंधक प्रक्रिया द्वारा केवल एक बार चलाया जाता है, इसलिए हम इसे अपने कार्यकर्ता प्रक्रियाओं को रोकने के लिए इसका उपयोग करते हैं।

जाहिर है, वहाँ कैसे आप हिस्सा के आकार या यहां तक ​​कि कैसे में अपने डेटा slurp करने पर फैसला कर सकते हैं, श्रमिकों के लिए अपने डेटा हिस्सा चाहता हूँ में स्वतंत्रता की एक बहुत कुछ है।

#!/usr/bin/env perl 
use strict; 
use warnings; 
use threads; 
use threads::shared; 
use Thread::Queue; 
use MCE; 

my $R_QUEUE = Thread::Queue->new; 
my $queue_workers = 8; 
my $process_workers = 8; 
my $chunk_size = 1; 

print "Enter a file name: "; 
my $input_file = <STDIN>; 
chomp($input_file); 

sub buildQueue { 
    my ($self, $chunk_ref, $chunk_id) = @_; 
    if ($R_QUEUE->pending() < 100) { 
     $R_QUEUE->enqueue($chunk_ref); 
     $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); 
    } 
} 

sub processQueue { 
    my $self = shift; 
    my $wid = $self->wid; 
    while (my $buff = $R_QUEUE->dequeue) { 
     $self->sendto('stdout', "Thread " . $wid . " got $$buff"); 
    } 
} 

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. 
    chunk_size => $chunk_size, 
    use_slurpio => 1, 

    user_tasks => [ 
     { # queueing task 
      max_workers => $queue_workers, 
      user_func => \&buildQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory. 
      task_end => sub { $R_QUEUE->enqueue((undef) x $process_workers) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! 
     }, 
     { # process task 
      max_workers => $process_workers, 
      user_func => \&processQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory 
      task_end => sub { print "Finished processing!\n"; } 
     } 
    ] 
); 

$mce->run(); 

exit; 
3

पर्ल के लिए एमसीई मॉड्यूल बड़ी फाइलों को प्यार करता है। एमसीई के साथ, कोई एक बार में कई लाइनों को तोड़ सकता है, स्केलर स्ट्रिंग के रूप में एक बड़ा हिस्सा फिसल सकता है, या एक समय में 1 लाइन पढ़ सकता है। कई लाइनों को एक बार में आईपीसी के लिए ओवरहेड कम कर देता है।

एमसीई 1.504 अब बाहर है। यह थ्रेड सहित बाल प्रक्रियाओं के समर्थन के साथ एमसीई :: कतार प्रदान करता है। इसके अलावा, 1.5 रिलीज 5 मॉडल (एमसीई :: फ्लो, एमसीई :: जीआरपी, एमसीई :: लूप, एमसीई :: मैप, और एमसीई :: स्ट्रीम) के साथ आता है जो एमसीई इंस्टेंस को तुरंत चालू करने के साथ-साथ ऑटो- max_workers tuning और chunk_size। कोई इन विकल्पों को ओवरराइड कर सकता है btw।

नीचे, एमसीई :: लूप प्रदर्शन के लिए प्रयोग किया जाता है।

use MCE::Loop; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
    my ($mce, $chunk_ref, $chunk_id) = @_; 

    foreach my $line (@$chunk_ref) { 
     chomp $line; 
     ## add your code here to process $line 
    } 

} $dict_path; 

यदि आप श्रमिकों की संख्या और/या chunk_size निर्दिष्ट करना चाहते हैं, तो इसे करने के 2 तरीके हैं।

use MCE::Loop max_workers => 5, chunk_size => 300000; 

या ...

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 300000 
}; 

बेडौल बड़ी फ़ाइलों के लिए पसंद किया जाता है, एक एक समय में एक लाइन बेडौल के साथ समय की तुलना कर सकते हैं। कोई ब्लॉक के अंदर पहली पंक्ति को छोड़ सकता है (टिप्पणी की गई)। ध्यान दें कि लूप के लिए आंतरिक की आवश्यकता नहीं है। $ chunk_ref अभी भी एक सरणी रेफरी है जिसमें 1 लाइन है। इनपुट स्केलर $ _ में लाइन होती है जब chunk_size 1 बराबर होता है, अन्यथा $ chunk_ref को इंगित करता है।

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 1 
}; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
# my ($mce, $chunk_ref, $chunk_id) = @_; 

    my $line = $_; 
    ## add your code here to process $line or $_ 

} $dict_path; 

मुझे उम्मीद है कि यह प्रदर्शन समानांतर में फ़ाइल को संसाधित करने के इच्छुक लोगों के लिए सहायक था।

:) मारियो

संबंधित मुद्दे