2017-09-01 20 views
14

समस्या विवरण
मैंने this answer से कोड को थोड़ा सा समायोजित किया (नीचे देखें)। हालांकि लिनक्स पर इस स्क्रिप्ट को चलाने पर (इसलिए कमांड लाइन: python script_name.py) यह सभी नौकरियों के लिए jobs running: x प्रिंट करेगा लेकिन उसके बाद बस अटक जाएगा। हालांकि जब मैं स्पॉन विधि (mp.set_start_method('spawn')) का उपयोग करता हूं तो यह ठीक काम करता है और counter चर के मान को प्रिंट करना प्रारंभ करता है (listener विधि देखें)।पाइथन 3.6.1 के साथ लिनक्स/इंटेल ज़ीऑन पर "कांटा" संदर्भ ब्लॉक के साथ मल्टीप्रोसेसिंग?

प्रश्न

  • क्यों यह केवल जब प्रक्रियाओं को उत्पन्न करने के काम करता है?
  • कैसे तो यह forc साथ काम करता है (क्योंकि यह शायद सबसे तेज़ है)

कोड

import io 
import csv 
import multiprocessing as mp 

NEWLINE = '\n' 

def file_searcher(file_path): 
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t') 

    manager = mp.Manager() 
    q = manager.Queue() 
    pool = mp.Pool(mp.cpu_count()) 

    # put listener to work first 
    watcher = pool.apply_async(listener, (q,)) 

    jobs = [] 
    for row in parsed_file: 
     print('jobs running: ' + str(len(jobs) + 1)) 
     job = pool.apply_async(worker, (row, q)) 
     jobs.append(job) 

    # collect results from the workers through the pool result queue 
    for job in jobs: 
     job.get() 

    #now we are done, kill the listener 
    q.put('kill') 
    pool.close() 
    pool.join() 

def worker(genome_row, q): 
    complete_data = [] 
    #data processing 
    #ftp connection to retrieve data 
    #etc. 
    q.put(complete_data) 
    return complete_data 

def listener(q): 
    '''listens for messages on the q, writes to file. ''' 
    f = io.open('output.txt', 'w', encoding='utf-8') 
    counter = 0 
    while 1: 
     m = q.get() 
     counter +=1 
     print(counter) 
     if m == 'kill': 
      break 
     for x in m: 
      f.write(x + NEWLINE) 
     f.flush() 
    f.close() 

if __name__ == "__main__": 
    file_searcher('path_to_some_tab_del_file.txt') 

प्रोसेसर की जानकारी

Architecture:   x86_64 
CPU op-mode(s):  32-bit, 64-bit 
Byte Order:   Little Endian 
CPU(s):    20 
On-line CPU(s) list: 0-19 
Thread(s) per core: 1 
Core(s) per socket: 1 
Socket(s):    20 
NUMA node(s):   2 
Vendor ID:    GenuineIntel 
CPU family:   6 
Model:     45 
Model name:   Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz 
Stepping:    2 
CPU MHz:    2596.501 
BogoMIPS:    5193.98 
Hypervisor vendor:  VMware 
Virtualization type: full 
L1d cache:    32K 
L1i cache:    32K 
L2 cache:    256K 
L3 cache:    25600K 
NUMA node0 CPU(s):  0-19 

लिनक्स मैं कोड समायोजित कर सकते हैं कर्नेल संस्करण

3.10.0-514.26.2.el7.x86_64 

अजगर संस्करण

Python 3.6.1 :: Continuum Analytics, Inc. 

लॉग रूप @yacc ने सुझाव दिया
मैं कोड जोड़ा, यह निम्न लॉग दे देंगे:

[server scripts]$ python main_v3.py 
[INFO/SyncManager-1] child process calling self.run() 
[INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6 
[INFO/SyncManager-1] manager serving at '/tmp/pymp-2a9stjh6/listener-jxwseclw' 
[DEBUG/MainProcess] requesting creation of a shared 'Queue' object 
[DEBUG/SyncManager-1] 'Queue' callable returned object with id '7f0842da56a0' 
[DEBUG/MainProcess] INCREF '7f0842da56a0' 
[DEBUG/MainProcess] created semlock with handle 139673691570176 
[DEBUG/MainProcess] created semlock with handle 139673691566080 
[DEBUG/MainProcess] created semlock with handle 139673691561984 
[DEBUG/MainProcess] created semlock with handle 139673691557888 
[DEBUG/MainProcess] added worker 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0' 
[DEBUG/MainProcess] added worker 
[INFO/ForkPoolWorker-2] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-4] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-3] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-6] child process calling self.run() 
[INFO/ForkPoolWorker-5] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-7] child process calling self.run() 
[INFO/ForkPoolWorker-8] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-9] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-10] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-11] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-12] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-13] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-14] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-15] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-16] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-17] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-18] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-19] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-20] child process calling self.run() 
jobs running: 1 
jobs running: 2 
jobs running: 3 
jobs running: 4 
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0' 
[INFO/ForkPoolWorker-21] child process calling self.run() 
jobs running: 5 
jobs running: 6 
jobs running: 7 
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0' 
jobs running: 8 
written to file 
jobs running: 9 
jobs running: 10 
[DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection 
[DEBUG/ForkPoolWorker-2] making connection to manager 
jobs running: 11 
jobs running: 12 
jobs running: 13 
jobs running: 14 
jobs running: 15 
[DEBUG/SyncManager-1] starting server thread to service 'ForkPoolWorker-2' 
jobs running: 16 
jobs running: 17 
jobs running: 18 
jobs running: 19 
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0' 
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0' 
+0

क्या आप लिनक्स, पायथन, एमपी पैकेज और हार्डवेयर/प्रोसेसर के संस्करणों के बारे में विवरण प्रदान कर सकते हैं? – yacc

+0

मैंने आपके द्वारा मांगी गई जानकारी को जोड़ा (संपादित देखें) @yacc मैं MP पैकेज संस्करण को कैसे प्राप्त करना है, यह समझने में सक्षम नहीं था। मुझे उम्मीद है कि आप समस्या पा सकते हैं – CodeNoob

+0

मल्टीप्रोसेसिंग कोर लाइब्रेरी का हिस्सा है, इसलिए यह बाकी के समान संस्करण है। मेरे लिए (पायथन 3.4.3) कोड ठीक से काम करता है (केवल एक चीज जो मैंने बदल दी थी वह csvreader को हटा रहा था और इसके बजाय सामान्य फ़ाइल पढ़ रहा था)। क्या आपने इसे कहीं और पुन: उत्पन्न करने की कोशिश की? – MacHala

उत्तर

0

कोई विचार नहीं कि आपको यह समस्या क्यों है, लेकिन क्या यह कोड एक आसान चीज नहीं है जो वही करता है?

import csv 
import multiprocessing as mp 

NEWLINE = '\n' 

def file_searcher(file_path): 
    with open(file_path, 'r', encoding='utf-8') as input_file, 
      open('output.txt', 'w', encoding='utf-8') as f, 
      mp.Pool() as pool: 
     parsed_file = csv.DictReader(input_file, delimiter='\t') 
     for result in pool.imap(worker, parsed_file): 
      f.write(result + NEWLINE) 

def worker(genome_row): 
    complete_data = [] 
    #data processing 
    #ftp connection to retrieve data 
    #etc. 
    return complete_data 

if __name__ == "__main__": 
    file_searcher('path_to_some_tab_del_file.txt') 
संबंधित मुद्दे