2015-09-08 9 views
13

मैं मुख्य है कि चिंगारी संदर्भ बनाता है:स्पार्क एसक्यूएल Dataframe - आयात sqlContext.implicits._

val sc = new SparkContext(sparkConf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

फिर dataframe बनाता है और dataframe पर फिल्टर और सत्यापन करता है।

val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00") 

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0)) 
    // record length cannot be < 2 
    .na.drop(3) 
    // round to hours 
    .withColumn("time",convertToHourly($"time")) 

यह बहुत अच्छा काम करता है।

लेकिन जब मैं

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 

कि Dataframe & सत्यापन और परिवर्तनों करता है हो जाता है करने के लिए dataframe भेजकर एक और फाइल करने के लिए मेरी सत्यापन ले जाने का प्रयास: ऐसा लगता है जैसे मैं

import sqlContext.implicits._ 

To avoid the error: “value $ is not a member of StringContext” that happens on line: .withColumn("time",convertToHourly($"time"))

की जरूरत

लेकिन import sqlContext.implicits._ का उपयोग करने के लिए मुझे sqlContext को भी नए में परिभाषित करने की आवश्यकता है इसलिए जैसे फाइल:

val sc = new SparkContext(sparkConf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

या

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 
function 

मैं जुदाई मैं 2 फ़ाइलें (मुख्य & सत्यापन) के लिए क्या करने की कोशिश कर रहा हूँ की तरह महसूस के लिए भेज सही ढंग से किया नहीं है ...

इसे कैसे डिजाइन किया जाए इस पर कोई विचार है? या बस समारोह में sqlContext भेजें?

धन्यवाद!

+0

में इस उदाहरण पर एक नज़र डालें जब मैं की तरह है कि मैं सिर्फ नए वर्ग के निर्माता में SQLContext पारित चीजों को अलग करना चाहते हैं और फिर मैं प्रत्येक वर्ग के प्रति एक बार sqlContext.implicits._ आयात कर सकते हैं। मैं कुछ भी बेहतर नहीं कर सका इसलिए मैं इस सवाल को वोट देता हूं और बेहतर परेशानियों की प्रतीक्षा करता हूं। – Niemand

उत्तर

11

आप SQLContext के सिंगलटन इंस्टेंस के साथ काम कर सकते हैं। आप spark repository

/** Lazily instantiated singleton instance of SQLContext */ 
object SQLContextSingleton { 

    @transient private var instance: SQLContext = _ 

    def getInstance(sparkContext: SparkContext): SQLContext = { 
    if (instance == null) { 
     instance = new SQLContext(sparkContext) 
    } 
    instance 
    } 
} 
... 
//And wherever you want you can do 
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
import sqlContext.implicits._ 
+1

धन्यवाद! मैंने सिंगलटन ऑब्जेक्ट का उपयोग किया लेकिन मेरे मामले में मैं इसे केवल एक बार बनाया था: ऑब्जेक्ट SQLContextSingleton { @transient var instance: SQLContext = _ } फिर इसे मुख्य से प्रारंभ किया गया, और इसे सत्यापन पर उपयोग किया गया। सहायता के लिए धन्यवाद! –

संबंधित मुद्दे