2015-11-05 15 views
5

तो क्या मैं निम्नलिखित कोड के साथ क्या करना कोशिश कर रहा हूँ सूचियों की एक सूची पढ़ सकते हैं और कहा जाता है समारोह checker के माध्यम से उन्हें रखा और फिर समारोह checker के परिणाम के साथ log_result सौदा है। मैं मल्टीथ्रेडिंग का उपयोग करके ऐसा करने की कोशिश कर रहा हूं क्योंकि वास्तविकता में परिवर्तनीय नाम rows_to_parse में लाखों पंक्तियां हैं, इसलिए एकाधिक कोरों का उपयोग करके इस प्रक्रिया को काफी मात्रा में तेज करना चाहिए।Multiprocessing पांडा के लिए लिख dataframe

वर्तमान समय में कोड काम नहीं करता है और पाइथन दुर्घटनाग्रस्त हो जाता है।

चिंताओं और मुद्दों मेरे पास है:

  1. मौजूदा df जो चर df में आयोजित इस प्रक्रिया के दौरान सूचकांक बनाए रखने के लिए क्योंकि अन्यथा log_result मिल जाएगा उलझन के रूप में करने के लिए कौन सी पंक्ति को अद्यतन करने की जरूरत है चाहता हूँ।
  2. मैं काफी विश्वास है कि apply_async उचित बहु इस कर्तव्य को करने के लिए समारोह क्योंकि मेरा मानना ​​है कि जिस क्रम में कंप्यूटर पढ़ता है और df संभवतः भ्रष्ट कर सकते हैं यह लिखते नहीं है कर रहा हूँ ???
  3. मुझे लगता है कि df लिखने और पढ़ने के लिए एक कतार स्थापित करने की आवश्यकता हो सकती है, लेकिन मुझे यह सुनिश्चित नहीं है कि मैं ऐसा करने के बारे में कैसे जाऊंगा।

किसी भी सहायता के लिए धन्यवाद।

import pandas as pd 
import multiprocessing 
from functools import partial 

def checker(a,b,c,d,e): 
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)] 
    index_of_match = match.index.tolist() 
    if len(index_of_match) == 1: #one match in df 
     return index_of_match 
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__: 
     return [index_of_match[0]] 
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df 
     return [a,b,c,d,e] 



def log_result(result, dataf): 
    if len(result) == 1: # 
     dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df 
     new_row = pd.DataFrame([result],columns=cols) 
     dataf = dataf.append(new_row,ignore_index=True) 


def apply_async_with_callback(parsing_material, dfr): 
    pool = multiprocessing.Pool() 
    for var_a, var_b, var_c, var_d, var_e in parsing_material: 
     pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr)) 
    pool.close() 
    pool.join() 



if __name__ == '__main__': 
    #setting up main dataframe 
    cols = ['a','b','c','d','e'] 
    existing_data = [["YES","A","16052011","13031999",3], 
        ["NO","Q","11022003","15081999",3], 
        ["YES","A","22082010","03012001",9]] 

    #main dataframe 
    df = pd.DataFrame(existing_data,columns=cols) 

    #new data 
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5], 
        ['YES', 'W', '17061992', '26032012', 6], 
        ['YES', 'G', '01122006', '07082014', 2], 
        ['YES', 'N', '06081992', '21052008', 9], 
        ['YES', 'Y', '18051995', '24011996', 6], 
        ['NO', 'Q', '11022003', '15081999', 3], 
        ['NO', 'O', '20112004', '28062008', 0], 
        ['YES', 'R', '10071994', '03091996', 8], 
        ['NO', 'C', '09091998', '22051992', 1], 
        ['YES', 'Q', '01051995', '02012000', 3], 
        ['YES', 'Q', '26022015', '26092007', 5], 
        ['NO', 'F', '15072002', '17062001', 8], 
        ['YES', 'I', '24092006', '03112003', 2], 
        ['YES', 'A', '22082010', '03012001', 9], 
        ['YES', 'I', '15072016', '30092005', 7], 
        ['YES', 'Y', '08111999', '02022006', 3], 
        ['NO', 'V', '04012016', '10061996', 1], 
        ['NO', 'I', '21012003', '11022001', 6], 
        ['NO', 'P', '06041992', '30111993', 6], 
        ['NO', 'W', '30081992', '02012016', 6]] 


    apply_async_with_callback(rows_to_parse, df) 
+0

और क्या है: #no मैच, डीएफ को लिखने के लिए तर्क देने के लिए तर्क दें? मुझे लगता है कि अगर आप वापस आते हैं [ए, बी, सी, डी, ई] 'आपका कोड वास्तव में पूरा हो जाएगा लेकिन आपको अन्य समस्याएं होंगी, आप कहीं भी डेटाफ़ का उपयोग कभी भी नहीं करेंगे –

+0

यह इंगित करने के लिए धन्यवाद कि मैंने कोड में संशोधन किया है। इसलिए '[ए, बी, सी, डी, ई] 'समारोह में df को लिखा जाता है' log_result'। – user3374113

+0

'आंशिक (log_result, dataf = dfr)' 'log_results' के हस्ताक्षर से मेल नहीं खाता है – mdurant

उत्तर

8

बहु में इस तरह अपडेट किया जा रहा DataFrames काम करने के लिए नहीं जा रहा है:

dataf = dataf.append(new_row,ignore_index=True) 

एक बात के लिए यह बहुत अक्षम है (ओ (एन) प्रत्येक इसलिए हे संलग्न के लिए (एन^2) में कुल मिलाकर। कुछ वस्तुओं को एक पास में एक साथ जोड़ना है।

किसी और के लिए, और अधिक महत्वपूर्ण बात यह है कि डेटाफ प्रत्येक अद्यतन के लिए लॉक नहीं कर रहा है, इसलिए कोई गारंटी नहीं है कि दो ऑपरेशन संघर्ष नहीं करेंगे (मैं अनुमान लगा रहा हूं यह पाइथन क्रैश हो रहा है)

अंत में, संलग्न जगह पर कार्य नहीं करता है, इसलिए कॉलबैक समाप्त हो जाने के बाद परिवर्तनीय dataf हटा दिया जाता है !! और पैरेंट dataf में कोई बदलाव नहीं किए गए हैं।


हम MultiProcessing list या एक dict इस्तेमाल कर सकते हैं। यदि आप ऑर्डर या निर्देश के बारे में परवाह नहीं करते हैं तो आपको सूचीबद्ध करें (उदा। गणना करें), क्योंकि आपको नोट करना होगा कि मूल्यों को एसिंक से एक अच्छी तरह परिभाषित क्रम में वापस नहीं किया गया है।
(या हम, एक वस्तु जो अपने आप लॉक लागू करता है बना सकते हैं Eli Bendersky देखें।)
तो निम्न परिवर्तन किए जाते हैं:

df = pd.DataFrame(existing_data,columns=cols) 
# becomes 
df = pd.DataFrame(existing_data,columns=cols) 
d = MultiProcessing.list([df]) 

dataf = dataf.append(new_row,ignore_index=True) 
# becomes 
d.append(new_row) 

अब, एक बार async समाप्त हो गया है आप के MultiProcessing.list DataFrames।

pd.concat(d, ignore_index=True) 

चाल करना चाहिए: आप इन (और ignore_index) वांछित परिणाम प्राप्त करने के लिए concat कर सकते हैं।


नोट: प्रत्येक चरण में newrow DataFrame बनाने भी कम कुशल है कि दे पांडा सीधे एक ही बार में एक DataFrame को सूचियों की सूची पार्स है। उम्मीद है कि यह एक खिलौना उदाहरण है, वास्तव में आप चाहते हैं कि आपके भाग मल्टीप्रोसेसिंग के साथ जीत पाने के लिए काफी बड़े हों (मैंने 50kb को नियम-थंब के रूप में सुना है ...), एक समय में एक पंक्ति कभी नहीं होगी यहाँ जीतो


एक तरफ: आप (df) की तरह अपने कोड में वैश्विक का उपयोग कर से बचना चाहिए, यह आपके कार्यों में उनके आसपास पारित करने के लिए अधिक स्वच्छ है (इस मामले में, के रूप में एक तर्क जाँचकर्ता में)।

संबंधित मुद्दे