2012-07-30 16 views
87

में समूहबद्ध पांडों डेटाफ्रेम में एक फ़ंक्शन को कुशलतापूर्वक लागू करना मुझे अक्सर बहुत बड़े DataFrame (मिश्रित डेटा प्रकारों) के समूहों में एक फ़ंक्शन लागू करने की आवश्यकता होती है और कई कोर का लाभ उठाना चाहती है।समांतर

मैं समूहों से एक इटरेटर बना सकता हूं और मल्टीप्रोसेसिंग मॉड्यूल का उपयोग कर सकता हूं, लेकिन यह कुशल नहीं है क्योंकि प्रत्येक समूह और फ़ंक्शन के परिणामों को प्रक्रियाओं के बीच मैसेजिंग के लिए चुना जाना चाहिए।

वहाँ किसी भी तरह से अचार बनाने से बचने के लिए या यहां तक ​​कि बचने DataFrame पूरी तरह से की नकल है? ऐसा लगता है कि मल्टीप्रोसेसिंग मॉड्यूल के साझा मेमोरी फ़ंक्शंस numpy सरणी तक सीमित हैं। क्या कोई अन्य विकल्प भी हैं?

+0

जहाँ तक मुझे पता है, वहाँ मनमाने ढंग से वस्तुओं साझा करने के लिए कोई रास्ता नहीं है: पर के बाएं एक DataFrame बना सकते हैं और परिणाम लौटाना है। मैं सोच रहा हूं, अगर मल्टीप्रोसेसिंग के माध्यम से पिकलिंग में इतना अधिक समय लगता है। शायद आपको रिश्तेदार पिकलिंग समय को कम करने के लिए प्रत्येक प्रक्रिया के लिए बड़े कार्य-पैकेज बनाने की संभावना तलाशनी चाहिए। जब आप समूह बनाते हैं तो एक और संभावना मल्टीप्रोसेसिंग का उपयोग करना होगा। –

+3

मैं ऐसा कुछ करता हूं लेकिन यूडब्ल्यूएसजीआई, फ्लास्क और प्रीफोर्किंग का उपयोग करके: मैं पांडा डेटाफ्रेम को एक प्रक्रिया में लोड करता हूं, इसे x बार फोर्क करता हूं (इसे एक साझा मेमोरी ऑब्जेक्ट बना देता है) और फिर उन प्रक्रियाओं को अन्य पायथन प्रक्रिया से कॉल करें जहां मैं परिणाम जोड़ता हूं। एटीएम मैं एक संचार प्रक्रिया के रूप में JSON का उपयोग करता हूं, लेकिन यह आ रहा है (अभी भी अत्यधिक प्रयोगात्मक है): http://pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental – Carst

+0

वैसे, क्या आपने किया कभी भी एचडीएफ 5 को चंकिंग के साथ देखें? (एचडीएफ 5 समवर्ती लेखन के लिए सहेज नहीं है, लेकिन आप अलग-अलग फाइलों को सहेज सकते हैं और अंत में कॉन्सटेनेट सामग्री) – Carst

उत्तर

12

उपर्युक्त टिप्पणियों से, ऐसा लगता है कि यह pandas कुछ समय के लिए योजनाबद्ध है (यहां एक दिलचस्प दिखने वाला rosetta project भी है जिसे मैंने अभी देखा है)।

हालांकि, जब तक हर समानांतर कार्यक्षमता pandas में शामिल किया जाता है, मैंने देखा है कि यह कुशल लिखने के & pandas के समानांतर augmentations गैर स्मृति नकल सीधे cython + OpenMP और सी का उपयोग कर ++ बहुत आसान है।

import pandas as pd 
import para_group_demo 

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) 
print para_group_demo.sum(df.a, df.b) 

और आउटपुट है::

यहाँ एक समानांतर GroupBy-राशि है, जिसका उपयोग कुछ इस तरह है लेखन की एक छोटी सी उदाहरण है

 sum 
key  
0  6 
1  11 
2  4 

नोट Doubtlessly, इस सरल उदाहरण की कार्यक्षमता अंततः pandas का हिस्सा होगी। हालांकि, कुछ चीजें कुछ समय के लिए सी ++ में समानांतर होने के लिए अधिक प्राकृतिक होंगी, और यह जानना महत्वपूर्ण है कि इसे pandas में संयोजित करना कितना आसान है।


ऐसा करने के लिए, मैं एक साधारण एकल स्रोत-फाइल एक्सटेंशन जिसका कोड इस प्रकार लिखा था।

यह कुछ आयात और प्रकार परिभाषाएं

from libc.stdint cimport int64_t, uint64_t 
from libcpp.vector cimport vector 
from libcpp.unordered_map cimport unordered_map 

cimport cython 
from cython.operator cimport dereference as deref, preincrement as inc 
from cython.parallel import prange 

import pandas as pd 

ctypedef unordered_map[int64_t, uint64_t] counts_t 
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t 
ctypedef vector[counts_t] counts_vec_t 

सी ++ unordered_map प्रकार किसी एकल थ्रेड द्वारा संक्षेप के लिए है के साथ शुरू होता है, और vector सभी धागे द्वारा संक्षेप के लिए है।

अब समारोह sum करने के लिए

।यह तेजी से पहुँच के लिए typed memory views साथ शुरू होता है:

cdef uint64_t num_threads = 4 
    cdef uint64_t l = len(crit) 
    cdef uint64_t s = l/num_threads + 1 
    cdef uint64_t i, j, e 
    cdef counts_vec_t counts 
    counts = counts_vec_t(num_threads) 
    counts.resize(num_threads) 
    with cython.boundscheck(False): 
     for i in prange(num_threads, nogil=True): 
      j = i * s 
      e = j + s 
      if e > l: 
       e = l 
      while j < e: 
       counts[i][crit_view[j]] += vals_view[j] 
       inc(j) 
:

def sum(crit, vals): 
    cdef int64_t[:] crit_view = crit.values 
    cdef int64_t[:] vals_view = vals.values 

समारोह धागे (यहाँ से 4 hardcoded) करने के लिए अर्द्ध समान रूप से विभाजित है, और प्रत्येक धागा राशि अपनी सीमा में प्रविष्टियों होने से जारी है

cdef counts_t total 
    cdef counts_it_t it, e_it 
    for i in range(num_threads): 
     it = counts[i].begin() 
     e_it = counts[i].end() 
     while it != e_it: 
      total[deref(it).first] += deref(it).second 
      inc(it)   

सभी वें:

जब धागे पूरा कर लिया है, समारोह (विभिन्न श्रेणियों से) सभी परिणामों को एक भी unordered_map में विलीन हो जाती

key, sum_ = [], [] 
    it = total.begin() 
    e_it = total.end() 
    while it != e_it: 
     key.append(deref(it).first) 
     sum_.append(deref(it).second) 
     inc(it) 

    df = pd.DataFrame({'key': key, 'sum': sum_}) 
    df.set_index('key', inplace=True) 
    return df