2016-10-12 11 views
6

पर छाया नहीं डाला जा सकता है मुझे अपने कैसंड्रा डेटाबेस में डेटा लिखने का प्रयास करने में त्रुटि हुई।जावा, स्पार्क और कैसंड्रा java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture को

क्या मैं यहाँ मिल गया: 1) Dictionary.java

package com.chatSparkConnactionTest; 

import java.io.Serializable; 

public class Dictionary implements Serializable{ 
private String value_id; 
private String d_name; 
private String d_value; 

public Dictionary(){} 

public Dictionary (String value_id, String d_name, String d_value) { 
    this.setValue_id(value_id); 
    this.setD_name(d_name); 
    this.setD_value(d_value); 
} 

public String getValue_id() { 
    return value_id; 
} 
public void setValue_id(String value_id) { 
    this.value_id = value_id; 
} 
public String getD_name() { 
    return d_name; 
} 
public void setD_name(String d_name) { 
    this.d_name = d_name; 
} 
public String getD_value() { 
    return d_value; 
} 
public void setD_value(String d_value) { 
    this.d_value = d_value; 
    } 
} 

मेरा मुख्य वर्ग:

package com.chatSparkConnactionTest; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import java.io.Serializable; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 

import com.datastax.spark.connector.japi.CassandraJavaUtil; 
import com.datastax.spark.connector.japi.CassandraRow; 
import com.datastax.spark.connector.japi.SparkContextJavaFunctions; 
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD; 
import com.datastax.driver.core.Session; 
import com.datastax.spark.connector.cql.CassandraConnector; 
import com.datastax.spark.connector.japi.CassandraRow; 
import com.google.common.base.Objects; 

import org.apache.avro.data.Json; 
import org.apache.hadoop.util.StringUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
//import org.apache.spark.sql.SchemaRDD; 
//import org.apache.spark.sql.cassandra.CassandraSQLContext; 

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Date; 
import java.util.List; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; 
import com.datastax.spark.connector.japi.CassandraRow; 
import com.fasterxml.jackson.core.JsonParseException; 
import com.fasterxml.jackson.databind.JsonNode; 
import org.apache.spark.api.java.function.Function; 


public class JavaDemoRDDWrite implements Serializable { 
    private static final long serialVersionUID = 1L; 
    public static void main(String[] args) { 

     SparkConf conf = new SparkConf(). 
      setAppName("chat"). 
      setMaster("local"). 
      set("spark.cassandra.connection.host", "127.0.0.1"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 

     List<Dictionary> dictionary = Arrays.asList(
       new Dictionary("7", "n1", "v1"), 
       new Dictionary("8", "n2", "v2"), 
       new Dictionary("9", "n3", "v3") 
     ); 
     for (Dictionary dictionaryRow : dictionary) { 
      System.out.println("id: " + dictionaryRow.getValue_id()); 
      System.out.println("name: " + dictionaryRow.getD_name()); 
      System.out.println("value: " + dictionaryRow.getD_value()); 
     } 

     JavaRDD<Dictionary> rdd = sc.parallelize(dictionary); 
     System.out.println("Total rdd rows: " + rdd.collect().size()); 
     javaFunctions(rdd) 
     .writerBuilder("chat", "dictionary", 
       mapToRow(Dictionary.class)) 
     .saveToCassandra(); 

    }; 
} 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>chat_connaction_test</groupId> 
    <artifactId>ChatSparkConnectionTest</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
<dependencies> 
    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-core</artifactId> 
     <version>3.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.11</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.11</artifactId> 
     <version>2.0.0-M3</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

</dependencies> 
</project> 

और यहाँ त्रुटि पाठ है:

java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture 
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
16/10/11 17:43:03 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture 
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

16/10/11 17:43:03 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 
16/10/11 17:43:03 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/10/11 17:43:03 INFO TaskSchedulerImpl: Cancelling stage 1 
16/10/11 17:43:03 INFO DAGScheduler: ResultStage 1 (runJob at RDDFunctions.scala:37) failed in 0.274 s 
16/10/11 17:43:03 INFO DAGScheduler: Job 1 failed: runJob at RDDFunctions.scala:37, took 0.291592 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture 
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904) 
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:61) 
    at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:486) 
    at com.chatSparkConnactionTest.JavaDemoRDDWrite.main(JavaDemoRDDWrite.java:69) 
Caused by: java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture 
    at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

बात यह है कि सिस्टम आरडीडी से डेटाबेस तालिका में पहला मान डालता है, भले ही मुझे कोई त्रुटि हो। और अन्य 2 पंक्तियों को अभी अनदेखा किया गया।

CREATE TABLE dictionary (
    value_id text, 
    d_value  text, 
    d_name text, 
    PRIMARY KEY (value_id, d_name) 
) WITH comment = 'dictionary values' 
AND CLUSTERING ORDER BY (d_name ASC); 

अपडेट किया गया pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>chat_connaction_test</groupId> 
    <artifactId>ChatSparkConnectionTest</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
<dependencies> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.11</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.11</artifactId> 
     <version>2.0.0-M3</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>2.0.0</version> 
    </dependency> 

</dependencies> 
</project> 
+0

अगर आपको इस समस्या का समाधान मिल गया है तो बस उत्सुक है। यह स्पार्क 2.1.0, स्पार्क-कैसंड्रा-कनेक्टर 2.0.1 है और जैसा आपने वर्णन किया है वही समस्या है। –

उत्तर

11

नीचे निकालें "कैसेंड्रा-चालक-कोर" अपने pom.xml पर निर्भरता को

लेकिन, सिर्फ मामले में, यह मेरा कैसेंड्रा तालिका है फ़ाइल के रूप में यह मुद्दा पैदा कर रहा है। कैसंड्रा डीबी के साथ बातचीत करने के लिए आपको केवल स्पार्क निर्भरताओं के साथ "स्पार्क-कैसंड्रा-कनेक्टर" निर्भरता की आवश्यकता है।

<dependency> 
    <groupId>com.datastax.cassandra</groupId> 
    <artifactId>cassandra-driver-core</artifactId> 
    <version>3.1.0</version> 
</dependency> 
+0

मैंने कोशिश की, एक ही परिणाम: java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture –

+0

यह अद्यतन किया जाता है पोम में ढाला नहीं जा सकता .xml –

+0

वैसे भी, उत्तर के लिए बहुत बहुत धन्यवाद! –

संबंधित मुद्दे