2012-08-16 14 views
18

मैं एक ही समय में चल रहे उदाहरणों की संख्या सीमित करते समय प्रोग्राम.py के कई उदाहरणों को एक साथ चलाने के लिए चाहता हूं (उदाहरण के लिए मेरे सिस्टम पर उपलब्ध CPU कोर की संख्या)। उदाहरण के लिए, यदि मेरे पास 10 कोर हैं और कुल में प्रोग्राम.py के 1000 रन करना है, तो किसी भी समय केवल 10 उदाहरण बनाए जाएंगे और चलेंगे।चल रहे प्रक्रियाओं की संख्या को सीमित करते हुए पाइथन में मल्टीप्रोसेसिंग

मैंने मल्टीप्रोसेसिंग मॉड्यूल, मल्टीथ्रेडिंग और कतारों का उपयोग करने का प्रयास किया है, लेकिन ऐसा कुछ भी नहीं है जो मुझे एक आसान कार्यान्वयन के लिए उधार देने के लिए प्रतीत होता है। मेरे पास सबसे बड़ी समस्या एक साथ चल रही प्रक्रियाओं की संख्या को सीमित करने का एक तरीका ढूंढ रही है। यह महत्वपूर्ण है क्योंकि यदि मैं एक बार में 1000 प्रक्रियाएं बनाता हूं, तो यह एक कांटा बम के बराबर हो जाता है। मुझे प्रोग्रामेटिक रूप से प्रक्रियाओं से लौटाए गए परिणामों की आवश्यकता नहीं है (वे डिस्क पर आउटपुट), और प्रक्रियाएं सभी एक-दूसरे से स्वतंत्र रूप से चलती हैं।

क्या कोई मुझे सुझाव या उदाहरण दे सकता है कि मैं इसे पायथन में कैसे लागू कर सकता हूं, या यहां तक ​​कि बाश भी कर सकता हूं? मैं कतार का उपयोग करके अब तक लिखा गया कोड पोस्ट करूंगा, लेकिन यह इरादे के रूप में काम नहीं करता है और पहले से ही गलत पथ हो सकता है।

बहुत धन्यवाद।

+2

क्या आपने [पायथन प्रक्रिया पूल] (http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool) की कोशिश की है? – C2H5OH

+0

ऐसा करने का सबसे आसान तरीका एक "नियंत्रक" प्रोग्राम बनाना है जो 'multiprocessing.pool' बनाता है और कार्यकर्ता (program.py) थ्रेड को स्पैन करता है, उदाहरणों को समाप्त करने के रूप में कार्य को पुन: आवंटित करता है। – jozzas

+0

धन्यवाद, मैं कोशिश करूँगा; किसी कारण से मेरे पहले प्रयास में मैं इस निष्कर्ष पर पहुंचा कि मल्टीप्रोसेसिंग.pool वह नहीं था जो मैं चाहता था, लेकिन अब यह सही लगता है। तो इस मामले में, वर्कर्स थ्रेड सिर्फ प्रोग्राम.py (एक थ्रेड के रूप में? Subprocess.Popen के साथ) spawn होगा? क्या आप कृपया एक मोटा उदाहरण या टेम्पलेट कार्यान्वयन पोस्ट कर सकते हैं जिसका मैं पालन कर सकता हूं? – steadfast

उत्तर

2

बैश बल्कि अजगर से स्क्रिप्ट, लेकिन मैं सरल समानांतर प्रसंस्करण के लिए अक्सर इसका इस्तेमाल:

#!/usr/bin/env bash 
waitForNProcs() 
{ 
nprocs=$(pgrep -f $procName | wc -l) 
while [ $nprocs -gt $MAXPROCS ]; do 
    sleep $SLEEPTIME 
    nprocs=$(pgrep -f $procName | wc -l) 
done 
} 
SLEEPTIME=3 
MAXPROCS=10 
procName=myPython.py 
for file in ./data/*.txt; do 
waitForNProcs 
./$procName $file & 
done 

या बहुत ही सरल मामलों के लिए, एक और विकल्प xargs जहां पी procs की संख्या सेट है

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
3

आपको एक प्रक्रिया पर्यवेक्षक का उपयोग करना चाहिए। एक दृष्टिकोण Circus द्वारा "प्रोग्रामेटिक रूप से" करने के लिए प्रदान की गई एपीआई का उपयोग करेगा, प्रलेखन साइट अब ऑफ़लाइन है, लेकिन मुझे लगता है कि यह सिर्फ एक अस्थायी समस्या है, वैसे भी, आप इसे संभालने के लिए सर्कस का उपयोग कर सकते हैं। एक अन्य दृष्टिकोण supervisord का उपयोग करेगा और प्रक्रिया के पैरामीटर numprocs को आपके पास कोर की संख्या के लिए सेट करेगा।

सर्कस का उपयोग कर एक उदाहरण:

from circus import get_arbiter 

arbiter = get_arbiter("myprogram", numprocesses=3) 
try: 
    arbiter.start() 
finally: 
    arbiter.stop() 
21

मुझे पता है तुम उल्लेख किया है कि Pool.map दृष्टिकोण आप के लिए बहुत कुछ मतलब नहीं है। नक्शा इसे काम का स्रोत देने का एक आसान तरीका है, और प्रत्येक आइटम पर लागू करने के लिए एक कॉल करने योग्य है। मानचित्र के लिए func दिए गए तर्क पर वास्तविक कार्य करने के लिए कोई प्रविष्टि बिंदु हो सकता है।

है कि आप के लिए सही नहीं लगता है, मैं एक निर्माता-उपभोक्त पद्धति का उपयोग कर के बारे में यहाँ पर एक बहुत विस्तृत जवाब है: https://stackoverflow.com/a/11196615/496445

अनिवार्य रूप से, आप एक पंक्ति बनाने के लिए, और एन श्रमिकों की संख्या शुरू करते हैं। फिर आप या तो मुख्य धागे से कतार फ़ीड करते हैं, या एक निर्माता प्रक्रिया बनाते हैं जो कतार को खिलाती है। मजदूर सिर्फ कतार से काम करते रहते हैं और आपके द्वारा शुरू की गई प्रक्रियाओं की संख्या से कहीं अधिक समवर्ती काम नहीं होगा।

आपके पास कतार पर सीमा डालने का विकल्प भी है, ताकि जब निर्माता पहले से ही बहुत अधिक उत्कृष्ट काम कर रहा हो, तो निर्माता को अवरुद्ध कर दिया जाए, अगर आपको उत्पादक उपभोग की गति और संसाधनों पर भी बाधा डालना पड़ता है।

जो कार्य फ़ंक्शन जिसे कॉल किया जाता है वह कुछ भी कर सकता है जो आप चाहते हैं। यह कुछ सिस्टम कमांड के चारों ओर एक रैपर हो सकता है, या यह आपके पायथन lib को आयात कर सकता है और मुख्य दिनचर्या चला सकता है। वहाँ विशिष्ट प्रक्रिया प्रबंधन प्रणाली हैं जो आपको सीमित संसाधनों के तहत अपने मनमानी निष्पादन योग्य चलाने के लिए कॉन्फ़िगरेशन सेट अप करने देती हैं, लेकिन यह करने के लिए यह केवल एक मूल पायथन दृष्टिकोण है।

बेसिक पूल: मेरा कि other answer से

स्निपेट्स

from multiprocessing import Pool 

def do_work(val): 
    # could instantiate some other library class, 
    # call out to the file system, 
    # or do something simple right here. 
    return "FOO: %s" % val 

pool = Pool(4) 
work = get_work_args() 
results = pool.map(do_work, work) 

एक प्रक्रिया प्रबंधक और निर्माता का उपयोग करना

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 

     # exit signal 
     if item == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = item 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    # this could also be started in a producer process 
    # instead of blocking 
    iters = itertools.chain(get_work_args(), (None,)*num_workers) 
    for item in iters: 
     work.put(item) 

    for p in pool: 
     p.join() 

    print results 
+0

बहुत अच्छा उदाहरण, मैंने सीपीयूएस की संख्या प्राप्त करके इसे सुधार लिया जैसे कि वे http://stackoverflow.com/questions/6905264/python-multiprocessing-utilizes-only-one-core में समझाते हैं और इसलिए मैं डायनामिक रूप से num_workers को सेट कर सकता हूं मशीन के सीपीयू। –

0

जबकि वहाँ बहु का उपयोग कर के बारे में कई जवाब हैं .पूल, एच पर कई कोड स्निपेट नहीं हैं multiprocessing.Process का उपयोग करने के लिए, जो स्मृति उपयोग के मामले में वास्तव में अधिक फायदेमंद है। 1000 प्रक्रियाओं को शुरू करने से सीपीयू अधिभारित हो जाएगा और स्मृति को मार दिया जाएगा। यदि प्रत्येक प्रक्रिया और इसकी डेटा पाइपलाइन स्मृति गहन हैं, तो ओएस या पायथन स्वयं समानांतर प्रक्रियाओं की संख्या को सीमित कर देगा। मैंने बैच में सीपीयू को सबमिट की गई नौकरियों की एक साथ संख्या को सीमित करने के लिए नीचे दिया गया कोड विकसित किया। बैच आकार को CPU कोर की संख्या के अनुपात में स्केल किया जा सकता है। मेरे विंडोज पीसी में, प्रति बैच की नौकरियों की संख्या सीपीयू कूप उपलब्ध होने के 4 गुना तक कुशल हो सकती है।

import multiprocessing 
def func_to_be_multiprocessed(q,data): 
    q.put(('s')) 
q = multiprocessing.Queue() 
worker = [] 
for p in range(number_of_jobs): 
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ 
     args=(q,data)...)) 
num_cores = multiprocessing.cpu_count() 
Scaling_factor_batch_jobs = 3.0 
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs 
num_of_batches = number_of_jobs // num_jobs_per_batch 
for i_batch in range(num_of_batches): 
    floor_job = i_batch * num_jobs_per_batch 
    ceil_job = floor_job + num_jobs_per_batch 
    for p in worker[floor_job : ceil_job]: 
             worker.start() 
    for p in worker[floor_job : ceil_job]: 
             worker.join() 
for p in worker[ceil_job :]: 
          worker.start() 
for p in worker[ceil_job :]: 
          worker.join() 
for p in multiprocessing.active_children(): 
          p.terminate() 
result = [] 
for p in worker: 
    result.append(q.get()) 

केवल समस्या यह है, यदि कोई हो बैच में नौकरी के किसी भी पूरा नहीं कर सका और एक फांसी स्थिति की ओर जाता है, रोजगार के बैच के बाकी शुरू नहीं की जाएगी। इसलिए, संसाधित किए जाने वाले फ़ंक्शन में नियमित त्रुटियों को संभालने में उचित त्रुटि होनी चाहिए।

संबंधित मुद्दे