2015-09-14 11 views
10

में कैशिंग इंटरमीडिएट परिणाम हाल ही में मैं अपने स्टैंडअलोन पायथन एमएल कोड को स्पार्क करने के लिए माइग्रेट करने की योजना बना रहा हूं। spark.ml में एमएल पाइपलाइन एल्गोरिदम चरणों और हाइपर-पैरामीटर ग्रिड खोज को चेन करने के लिए सुव्यवस्थित एपीआई के साथ काफी आसान हो जाती है।स्पार्क एमएल पाइपलाइन

फिर भी, मुझे मौजूदा दस्तावेज़ों में अस्पष्ट एक महत्वपूर्ण विशेषता के लिए इसका समर्थन मिला: मध्यवर्ती परिणामों के कैशिंग। इस सुविधा का महत्व तब उठता है जब पाइपलाइन में गहन चरणों की गणना शामिल होती है।

उदाहरण के लिए, मेरे मामले में मैं इनपुट सुविधाओं को बनाने के लिए समय श्रृंखला डेटा पर एकाधिक चलती औसत करने के लिए एक विशाल स्पैर मैट्रिक्स का उपयोग करता हूं। मैट्रिक्स की संरचना कुछ हाइपर-पैरामीटर द्वारा निर्धारित की जाती है। यह चरण पूरे पाइपलाइन के लिए एक बाधा बन गया है क्योंकि मुझे रनटाइम में मैट्रिक्स बनाना है।

पैरामीटर खोज के दौरान, मेरे पास आमतौर पर इस "संरचना पैरामीटर" के अलावा अन्य परीक्षण करने के लिए अन्य पैरामीटर होते हैं। इसलिए यदि मैं "मैट्रिक्स पैरामीटर" अपरिवर्तित होने पर विशाल मैट्रिक्स का पुन: उपयोग कर सकता हूं, तो मैं कई समय बचा सकता हूं। इस कारण से, मैंने जानबूझकर इन इंटरमीडिएट परिणामों को कैश करने और पुन: उपयोग करने के लिए अपना कोड बनाया।

तो मेरा सवाल है: स्पार्क की एमएल पाइपलाइन हैंडल इंटरमीडिएट कैशिंग स्वचालित रूप से कर सकते हैं? या क्या मुझे ऐसा करने के लिए मैन्युअल रूप से कोड बनाना है? यदि हां, तो क्या सीखने के लिए कोई सर्वोत्तम अभ्यास है?

पीएस मैंने आधिकारिक दस्तावेज और कुछ अन्य सामग्री को देखा है, लेकिन उनमें से कोई भी इस विषय पर चर्चा नहीं कर रहा है।

+0

मैं एक [संबंधित प्रश्न] है (http://stackoverflow.com/questions/33161320/distributed-batch-computation साथ-साथ दीर्घकालिक-दृढ़ता-और-जांच बिंदु) दुर्भाग्यवश दुर्भाग्य से कोई प्रतिक्रिया नहीं है। –

उत्तर

4

तो मैं एक ही समस्या में भाग गया और जिस तरह से मैंने हल किया है वह है कि मैंने अपना खुद का पाइपलाइनस्टेज लागू किया है, जो इनपुट डेटासेट को कैश करता है और इसे वापस देता है।

import org.apache.spark.ml.Transformer 
import org.apache.spark.ml.param.ParamMap 
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable} 
import org.apache.spark.sql.{DataFrame, Dataset} 
import org.apache.spark.sql.types.StructType 

class Cacher(val uid: String) extends Transformer with DefaultParamsWritable { 
    override def transform(dataset: Dataset[_]): DataFrame = dataset.toDF.cache() 

    override def copy(extra: ParamMap): Transformer = defaultCopy(extra) 

    override def transformSchema(schema: StructType): StructType = schema 

    def this() = this(Identifiable.randomUID("CacherTransformer")) 
} 

तो यह उपयोग करने के लिए आप कुछ इस तरह करना होगा:

new Pipeline().setStages(Array(stage1, new Cacher(), stage2)) 
+1

मुझे लगता है कि इसके साथ एकमात्र मुद्दा समाधान है (जिसे मैं वास्तव में ऊपर उठाता हूं!) यह है कि आप मौजूदा कैश किए गए डेटाफ्रेम को असम्बद्ध नहीं करते हैं (यदि आप कई कैचर को लिंक करते हैं)। आप तर्क दे सकते हैं कि यह कोई समस्या नहीं है क्योंकि स्पार्क स्वचालित रूप से जीसी समय पर unpersists है, लेकिन यह आपके यूआई से काफी भ्रमित हो सकता है उदा। इतने सारे कैश किए गए डेटा देख रहे हैं। –

संबंधित मुद्दे