2016-09-29 9 views
7

मैं एक स्काला समारोह है कि एक प्रदान की इनपुट स्ट्रिंग के आधार पर स्पार्क DataTypes अनुमान लगा सकते हैं लिखने के लिए कोशिश कर रहा हूँ:,,का निष्कर्ष निकालते स्पार्क डेटा प्रकार

/** 
* Example: 
* ======== 
* toSparkType("string") => StringType 
* toSparkType("boolean") => BooleanType 
* toSparkType("date") => DateType 
* etc. 
*/ 
def toSparkType(inputType : String) : DataType = { 
    var dt : DataType = null 

    if(matchesStringRegex(inputType)) { 
     dt = StringType 
    } else if(matchesBooleanRegex(inputType)) { 
     dt = BooleanType 
    } else if(matchesDateRegex(inputType)) { 
     dt = DateType 
    } else if(...) { 
     ... 
    } 

    dt 
} 

मेरा लक्ष्य एक बड़े सबसेट का समर्थन है सब नहीं तो की उपलब्ध DataTypes। मैं इस समारोह को लागू करने शुरू कर दिया है, मैं सोच को मिला: "स्पार्क/स्काला शायद पहले से ही एक सहायक/util विधि है कि मेरे लिए यह करना होगा।" सब के बाद, मैं जानता हूँ कि मैं कुछ ऐसा कर सकते हैं:

var structType = new StructType() 

structType.add("some_new_string_col", "string", true, Metadata.empty) 
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty) 
structType.add("some_new_date_col", "date", true, Metadata.empty) 

और या तो स्कैला और/या स्पार्क मेरे "string" तर्क को StringType इत्यादि में परिवर्तित कर देगा। इसलिए मैं पूछता हूं: मेरी कनवर्टर विधि को लागू करने में मेरी सहायता के लिए स्पार्क या स्कैला के साथ मैं क्या जादू कर सकता हूं?

+1

ऐसा करने के लिए आपके उपयोग के मामले में क्या है:

तो तुम बस के रूप में अपने विधि लागू कर सकते हैं? स्पार्क पहले से ही अंतर्निहित डेटा स्रोतों के लिए स्कीमा और डेटा प्रकारों का अनुमान लगाता है। क्या आप अपना खुद का डेटा स्रोत लागू करना चाहते हैं? –

+0

मैं वास्तव में आपकी समस्या को समझ नहीं पा रहा हूं, क्या आप कृपया सचिन की तरह कहा जा सकता है? आप स्ट्रिंग से अनुमान लगाना चाहते हैं ??? मुझे यह नहीं मिला। –

उत्तर

12

स्पार्क/स्कैला में शायद पहले से ही एक सहायक/उपयोग विधि है जो मेरे लिए यह करेगी।

आप सही हैं। स्पार्क में पहले से ही अपनी स्कीमा और डेटा प्रकार अनुमान कोड है जो इसका उपयोग अंतर्निहित डेटा स्रोतों (सीएसवी, जेसन इत्यादि) से स्कीमा का अनुमान लगाने के लिए करता है ताकि आप इसे स्वयं लागू करने के लिए देख सकें (वास्तविक कार्यान्वयन स्पार्क के लिए निजी चिह्नित है और है आरडीडी और आंतरिक कक्षाओं से जुड़ा हुआ है, इसलिए इसका उपयोग सीधे स्पार्क के बाहर कोड से नहीं किया जा सकता है, लेकिन आपको इसके बारे में एक अच्छा विचार देना चाहिए।)

यह देखते हुए कि सीएसवी फ्लैट प्रकार है (और जेसन नेस्टेड हो सकता है संरचना), सीएसवी स्कीमा अनुमान अपेक्षाकृत अधिक सीधे आगे है और आपको उस कार्य के साथ आपकी मदद करनी चाहिए जो आप ऊपर प्राप्त करने की कोशिश कर रहे हैं। तो मैं समझाऊंगा कि सीएसवी अनुमान कैसे काम करता है (जेसन अनुमान केवल खाते में घोंसला वाली संरचना को लेने की जरूरत है लेकिन डेटा प्रकार अनुमान बहुत समान है)।

उस प्रस्ताव के साथ, जिस चीज को आप देखना चाहते हैं वह CSVInferSchema ऑब्जेक्ट है। विशेष रूप से, infer विधि देखें जो RDD[Array[String]] लेता है और पूरे आरडीडी में सरणी के प्रत्येक तत्व के लिए डेटा प्रकार का अनुमान लगाता है। जिस तरह से यह होता है - यह NullType के रूप में प्रत्येक फ़ील्ड को RDD में मूल्यों की अगली पंक्ति (Array[String]) पर पुनरावृत्त करता है और यह DataType को नए DataType पर पहले से अनुमानित DataType अपडेट करता है यदि नया DataType अधिक विशिष्ट है।यह here हो रहा है:

val rootTypes: Array[DataType] = 
     tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes) 

अब inferRowTypecalls पंक्ति में क्षेत्र से प्रत्येक के लिए inferFieldinferFieldimplementation वह है जो आप शायद खोज रहे हैं - यह अब तक किसी विशेष फ़ील्ड के लिए अवरक्त प्रकार और पैरामीटर के रूप में वर्तमान पंक्ति के लिए फ़ील्ड का स्ट्रिंग मान लेता है। इसके बाद यह मौजूदा अनुमानित प्रकार लौटाता है या यदि नया प्रकार अनुमानित नया प्रकार है तो नया प्रकार। कोड की

प्रासंगिक अनुभाग इस प्रकार है:

typeSoFar match { 
     case NullType => tryParseInteger(field, options) 
     case IntegerType => tryParseInteger(field, options) 
     case LongType => tryParseLong(field, options) 
     case _: DecimalType => tryParseDecimal(field, options) 
     case DoubleType => tryParseDouble(field, options) 
     case TimestampType => tryParseTimestamp(field, options) 
     case BooleanType => tryParseBoolean(field, options) 
     case StringType => StringType 
     case other: DataType => 
      throw new UnsupportedOperationException(s"Unexpected data type $other") 
     } 

कृपया ध्यान दें कि यदि typeSoFar NullType है तो यह पहले Integer के रूप में यह पार्स करने के लिए कोशिश करता है लेकिन tryParseInteger कॉल कम प्रकार पार्स करने के लिए कॉल की एक श्रृंखला है। इसलिए यदि यह मान को इंटीजर के रूप में पार्स करने में सक्षम नहीं है तो यह tryParseLong का आह्वान करेगा जो विफलता पर tryParseDecimal का आह्वान करेगा जो विफलता पर tryParseDouble w.o.f.w.i. tryParseTimestamp w.o.f.w.i tryParseBoolean w.o.f.w.i. अंत में stringType

तो आप अपने उपयोग के मामले को लागू करने के लिए बहुत समान तर्क का उपयोग कर सकते हैं। (आप पंक्तियों में मर्ज करने के लिए की जरूरत नहीं है, तो आप बस सभी tryParse* तरीकों शब्दशः अपनी खुद की regex लिखने के लिए कोई ज़रूरत नहीं लागू करने और बस tryParseInteger आह्वान।।)

आशा इस मदद करता है।

+2

अच्छा जवाब !!!! – gsamaras

0

से, यह प्रतीत नहीं होता है कि आप उदाहरण के लिए जाँच, क्या आप जादुई इच्छा कर सकते हैं इस उदाहरण:

import com.scalakata._ 

@instrument class Playground { 
    val x = 5 
    def f[T](v: T) = v 
    f(x) 
    val y = "boolean" 
    f(y) 
    def manOf[T: Manifest](t: T): Manifest[T] = manifest[T] 
    println(manOf(y)) 
} 

जो मैं I want to get the type of a variable at runtime पढ़ने के बाद की रचना की।


अब से

, के बाद से मैं जगह में एक स्थापना की जरूरत नहीं है अभी, मैं एक उदाहरण रचना कर सकता है, लेकिन उपयोग करने के लिए स्पष्ट बात नहीं है, इसलिए मैं आप के रूप में toSparkType() लेखन जारी रखने के लिए आप सुझाव है शुरू कर दिया है, लेकिन पहले Source code for pyspark.sql.types पर एक नज़र डालें।


आप समस्या देखते हैं कि आप हमेशा एक स्ट्रिंग पास कर रहे हैं।

7

हां, बेशक स्पार्क में आपको आवश्यक जादू है।

स्पार्क 2.x में यह CatalystSqlParser ऑब्जेक्ट है, here परिभाषित किया गया है।

उदाहरण के लिए:

import org.apache.spark.sql.catalyst.parser.CatalystSqlParser 

CatalystSqlParser.parseDataType("string") // StringType 
CatalystSqlParser.parseDataType("int") // IntegerType 

और इतने पर।

लेकिन जैसा कि मैं समझता हूं, यह सार्वजनिक एपीआई का हिस्सा नहीं है और इसलिए बिना किसी चेतावनी के अगले संस्करणों में बदल सकता है।

def toSparkType(inputType: String): DataType = CatalystSqlParser.parseDataType(inputType) 
+0

हाय मेरे पास इस "इंटीजर टाइप" की तरह एक स्ट्रिंग है और मैं डेटाटाइप ऑब्जेक्ट बनाना चाहता हूं। मैं उसे कैसे कर सकता हूँ? उपर्युक्त विधि मेरे लिए काम नहीं करती है – user1870400

संबंधित मुद्दे