में सॉर्ट करते समय NotSerializableException मैं एक साधारण स्ट्रीम प्रोसेसिंग स्पार्क जॉब लिखने की कोशिश कर रहा हूं जो संदेशों की एक सूची (जेएसओएन-स्वरूपित) ले लेगा, प्रत्येक उपयोगकर्ता से संबंधित है, प्रत्येक उपयोगकर्ता के संदेशों की गणना करेगा और शीर्ष दस प्रिंट करें उपयोगकर्ताओं।स्पार्क
हालांकि, जब मैं तुलनात्मक परिभाषित करता हूं> कम गणना को सॉर्ट करने के लिए पूरी चीज java.io.NotSerializableException फेंक दिया जा रहा है।
मेरे स्पार्क के लिए Maven निर्भरता:
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
<version>0.8.0-incubating</version>
मैं उपयोग कर रहा हूँ जावा कोड:
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local", "spark");
JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache();
JavaPairRDD<String, Long> words = lines
.map(new Function<String, JsonElement>() {
// parse line into JSON
@Override
public JsonElement call(String t) throws Exception {
return (new JsonParser()).parse(t);
}
}).map(new Function<JsonElement, String>() {
// read User ID from JSON
@Override
public String call(JsonElement json) throws Exception {
return json.getAsJsonObject().get("userId").toString();
}
}).map(new PairFunction<String, String, Long>() {
// count each line
@Override
public Tuple2<String, Long> call(String arg0) throws Exception {
return new Tuple2(arg0, 1L);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
// count messages for every user
@Override
public Long call(Long arg0, Long arg1) throws Exception {
return arg0 + arg1;
}
});
// sort result in a descending order and take 10 users with highest message count
// This causes the exception
List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>>(){
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return -1 * o1._2().compareTo(o2._2());
}
});
// print result
for (Tuple2<String, Long> tuple : sorted) {
System.out.println(tuple._1() + ": " + tuple._2());
}
}
जिसके परिणामस्वरूप स्टैक ट्रेस:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
मैं स्पार्क एपीआई के माध्यम से चला गया प्रलेखन लेकिन मुझे कुछ भी नहीं मिला जो मुझे सही दिशा बताएगा। क्या मैं कुछ गलत कर रहा हूं या स्पार्क में यह एक बग है? किसी भी मदद की खुशी से सराहना की जाएगी।
अद्यतन: जाहिर है, यह सब तुलनात्मक वस्तु को उबालता है जिसे * टेकऑर्डर्ड() * के दूसरे तर्क के रूप में पारित किया जा रहा है। के रूप में तुलनाकारी इंटरफ़ेस आदेश में इस काम आप एक 'serializable' तुलनित्र बनाने की जरूरत बनाने के लिए Serializable विस्तार नहीं करता: 'सार्वजनिक इंटरफ़ेस SerializableComparator तुलनाकारी फैली हुई है, Serializable {}' बाद में, एक वस्तु गुजर इस इंटरफेस को लागू करता है जो क्योंकि तुलनित्र मूल अपवाद को रोकता है। अनुमोदित, यह शायद इस समस्या का सबसे सुंदर समाधान नहीं है और मैं निश्चित रूप से कुछ सुझावों का स्वागत करता हूं :) –