2013-10-17 15 views
5

में सॉर्ट करते समय NotSerializableException मैं एक साधारण स्ट्रीम प्रोसेसिंग स्पार्क जॉब लिखने की कोशिश कर रहा हूं जो संदेशों की एक सूची (जेएसओएन-स्वरूपित) ले लेगा, प्रत्येक उपयोगकर्ता से संबंधित है, प्रत्येक उपयोगकर्ता के संदेशों की गणना करेगा और शीर्ष दस प्रिंट करें उपयोगकर्ताओं।स्पार्क

हालांकि, जब मैं तुलनात्मक परिभाषित करता हूं> कम गणना को सॉर्ट करने के लिए पूरी चीज java.io.NotSerializableException फेंक दिया जा रहा है।

मेरे स्पार्क के लिए Maven निर्भरता:

<groupId>org.apache.spark</groupId> 
<artifactId>spark-core_2.9.3</artifactId> 
<version>0.8.0-incubating</version> 

मैं उपयोग कर रहा हूँ जावा कोड:

public static void main(String[] args) { 

    JavaSparkContext sc = new JavaSparkContext("local", "spark"); 

    JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache(); 

    JavaPairRDD<String, Long> words = lines 
     .map(new Function<String, JsonElement>() { 
      // parse line into JSON 
      @Override 
      public JsonElement call(String t) throws Exception { 
       return (new JsonParser()).parse(t); 
      } 

     }).map(new Function<JsonElement, String>() { 
      // read User ID from JSON 
      @Override 
      public String call(JsonElement json) throws Exception { 
       return json.getAsJsonObject().get("userId").toString(); 
      } 

     }).map(new PairFunction<String, String, Long>() { 
      // count each line 
      @Override 
      public Tuple2<String, Long> call(String arg0) throws Exception { 
       return new Tuple2(arg0, 1L); 
      } 

     }).reduceByKey(new Function2<Long, Long, Long>() { 
      // count messages for every user 
      @Override 
      public Long call(Long arg0, Long arg1) throws Exception { 
       return arg0 + arg1; 
      } 

     }); 

    // sort result in a descending order and take 10 users with highest message count 
    // This causes the exception 
    List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>>(){ 

     @Override 
     public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) { 
      return -1 * o1._2().compareTo(o2._2()); 
     } 

    }); 

    // print result 
    for (Tuple2<String, Long> tuple : sorted) { 
     System.out.println(tuple._1() + ": " + tuple._2()); 
    } 

} 

जिसके परिणामस्वरूप स्टैक ट्रेस: ​​

java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:601) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297) 
    at java.lang.Thread.run(Thread.java:722) 
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668) 
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) 
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) 

मैं स्पार्क एपीआई के माध्यम से चला गया प्रलेखन लेकिन मुझे कुछ भी नहीं मिला जो मुझे सही दिशा बताएगा। क्या मैं कुछ गलत कर रहा हूं या स्पार्क में यह एक बग है? किसी भी मदद की खुशी से सराहना की जाएगी।

+0

अद्यतन: जाहिर है, यह सब तुलनात्मक वस्तु को उबालता है जिसे * टेकऑर्डर्ड() * के दूसरे तर्क के रूप में पारित किया जा रहा है। के रूप में तुलनाकारी इंटरफ़ेस आदेश में इस काम आप एक 'serializable' तुलनित्र बनाने की जरूरत बनाने के लिए Serializable विस्तार नहीं करता: 'सार्वजनिक इंटरफ़ेस SerializableComparator तुलनाकारी फैली हुई है, Serializable {}' बाद में, एक वस्तु गुजर इस इंटरफेस को लागू करता है जो क्योंकि तुलनित्र मूल अपवाद को रोकता है। अनुमोदित, यह शायद इस समस्या का सबसे सुंदर समाधान नहीं है और मैं निश्चित रूप से कुछ सुझावों का स्वागत करता हूं :) –

उत्तर

2

के रूप में @ vanco.anton उल्लेख, आप निम्न का उपयोग कर जावा 8 कार्यात्मक इंटरफेस की तरह कुछ कर सकते हैं:

public interface SerializableComparator<T> extends Comparator<T>, Serializable { 

    static <T> SerializableComparator<T> serialize(SerializableComparator<T> comparator) { 
    return comparator; 
    } 

} 

और फिर अपने कोड में:

import static SerializableComparator.serialize; 
... 
rdd.top(10, serialize((a, b) -> -a._2.compareTo(b._2)));