2017-06-13 8 views
5

मैं एक डाटासेट करने के लिए अपने dataframe बदलने की आवश्यकता है और मैं निम्नलिखित कोड का इस्तेमाल किया:स्कैला में अपाचे स्पार्क में डेटाफ्रेम को डेटासेट में कैसे परिवर्तित करें?

val final_df = Dataframe.withColumn(
     "features", 
     toVec4(
     // casting into Timestamp to parse the string, and then into Int 
     $"time_stamp_0".cast(TimestampType).cast(IntegerType), 
     $"count", 
     $"sender_ip_1", 
     $"receiver_ip_2" 
    ) 
    ).withColumn("label", (Dataframe("count"))).select("features", "label") 

    final_df.show() 

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7)) 
    val TrainingDF = trainingTest(0) 
    val TestingDF=trainingTest(1) 
    TrainingDF.show() 
    TestingDF.show() 

    ///lets create our liner regression 
    val lir= new LinearRegression() 
    .setRegParam(0.3) 
    .setElasticNetParam(0.8) 
    .setMaxIter(100) 
    .setTol(1E-6) 

    case class df_ds(features:Vector, label:Integer) 
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) 

    val Training_ds = TrainingDF.as[df_ds] 

मेरे समस्या यह है कि, मैं निम्नलिखित त्रुटि मिली:

Error:(96, 36) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. 
    val Training_ds = TrainingDF.as[df_ds] 

ऐसा लगता है कि संख्या डेटाफ्रेम में मूल्यों की मेरी कक्षा में मूल्य की संख्या के साथ अलग है। हालांकि मैं अपने प्रशिक्षण डीएफ डेटाफ्रेम पर case class df_ds(features:Vector, label:Integer) का उपयोग कर रहा हूं, इसमें सुविधाओं का एक वेक्टर और एक पूर्णांक लेबल है। इसके अलावा यहां

+--------------------+-----+ 
|   features|label| 
+--------------------+-----+ 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,19...| 19| 
|[1.497325796E9,10...| 10| 
+--------------------+-----+ 

अपने मूल final_df dataframe है:: यहाँ TrainingDF dataframe है

+------------+-----------+-------------+-----+ 
|time_stamp_0|sender_ip_1|receiver_ip_2|count| 
+------------+-----------+-------------+-----+ 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.2|  10.0.0.3| 19| 
| 05:49:56| 10.0.0.3|  10.0.0.2| 10| 
+------------+-----------+-------------+-----+ 

हालांकि मैंने कहा त्रुटि मिल गया है! कोई भी मेरी मदद कर सकता हैं? अग्रिम धन्यवाद।

उत्तर

8

जो त्रुटि संदेश आप पढ़ रहे हैं वह एक बहुत अच्छा सूचक है।

जब आप परिवर्तित एक एक Dataset को DataFrame आप जो कुछ भी DataFrame पंक्तियों में संग्रहीत किया जाता है के लिए एक उचित Encoder करना होगा। आदिम की तरह प्रकार के लिए

इनकोडर्स (Int रों, String है, और इसी तरह) और case classes सिर्फ अपने SparkSession चाहते के लिए implicits आयात द्वारा प्रदान की जाती हैं इस प्रकार है:

case class MyData(intField: Int, boolField: Boolean) // e.g. 

val spark: SparkSession = ??? 
val df: DataFrame = ??? 

import spark.implicits._ 

val ds: Dataset[MyData] = df.as[MyData] 

है कि या तो काम नहीं करता है ऐसा इसलिए है क्योंकि जिस प्रकार आप कास्ट कर रहे हैंDataFrame समर्थित नहीं है। उस स्थिति में, आपको अपना खुद का Encoder लिखना होगा: आपको here के बारे में अधिक जानकारी मिल सकती है और java.time.LocalDateTime के लिए देखें)।

+0

मृत लिंक [यहां] (https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-Encoder.html) :( –

+0

@joeybaruch रिपोर्ट के लिए धन्यवाद, मैंने अपडेट किया है लेखक ने स्पार्क एसक्यूएल से संबंधित सामान अपनी पुस्तक में ले जाया। – stefanobaghino

+0

मैंने इस उदाहरण को लिखने के बाद से पुस्तक में जोड़ा गया एक उदाहरण जोड़ने का मौका भी लिया। – stefanobaghino

संबंधित मुद्दे