2014-12-30 18 views
9

में KafkaUtils क्लास नहीं मिला है मैंने अभी स्पार्क स्ट्रीमिंग के साथ शुरुआत की है और मैं एक नमूना एप्लिकेशन बनाने की कोशिश कर रहा हूं जो कफका धारा से शब्दों की गणना करता है। हालांकि यह sbt package के साथ संकलित करता है, जब मैं इसे चलाता हूं, तो मुझे NoClassDefFoundError मिलता है। यह post एक ही समस्या प्रतीत होता है, लेकिन समाधान मैवेन के लिए है और मैं इसे एसबीटी के साथ पुन: उत्पन्न करने में सक्षम नहीं हूं।स्पार्क स्ट्रीमिंग

KafkaApp.scala:

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 

object KafkaApp { 
    def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]") 
    val ssc = new StreamingContext(conf, Seconds(1)) 
    val kafkaParams = Map(
     "zookeeper.connect" -> "localhost:2181", 
     "zookeeper.connection.timeout.ms" -> "10000", 
     "group.id" -> "sparkGroup" 
    ) 

    val topics = Map(
     "test" -> 1 
    ) 

    // stream of (topic, ImpressionLog) 
    val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK) 
    println(s"Number of words: %{messages.count()}") 
    } 
} 

build.sbt:

name := "Simple Project" 

version := "1.1" 

scalaVersion := "2.10.4" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1", 
    "org.apache.spark" %% "spark-streaming" % "1.1.1", 
    "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1" 
) 

resolvers += "Akka Repository" at "http://repo.akka.io/releases/" 

और मैं के साथ जमा करें:

bin/spark-submit \ 
    --class "KafkaApp" \ 
    --master local[4] \ 
    target/scala-2.10/simple-project_2.10-1.1.jar 

त्रुटि:

14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:65077/user/HeartbeatReceiver 
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ 
    at KafkaApp$.main(KafkaApp.scala:28) 
    at KafkaApp.main(KafkaApp.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 

उत्तर

14

स्पार्क-सबमिट स्वचालित रूप से पैकेज को KafkaUtils में नहीं डालता है। आपको अपनी परियोजना जार में होना चाहिए। इसके लिए आपको sbt assembly का उपयोग करके, सभी समावेशी uber-jar बनाने की आवश्यकता है। यहां एक उदाहरण build.sbt है।

https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

आप स्पष्ट रूप से भी एसबीटी के लिए विधानसभा प्लगइन जोड़ने की जरूरत है।

https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project

+0

मैं भी यही मुद्दा हो रही है, जबकि मैं कर रहा हूँ मेवेन का उपयोग करना उसके बाद मैंने अपने pom.xml में "org.apache.maven.plugins" शामिल किया लेकिन समस्या हल नहीं हुई है। मुझे कोई अन्य पैरामीटर जांचना है? परिवर्तन के साथ –

+0

, अगर मैं stb पैकेज चलाता हूं, तो मुझे त्रुटि मिली। : त्रुटि: नहीं मिला: ऑब्जेक्ट असेंबलीकेस आयात असेंबलीKeys._ ^ [त्रुटि] अभिव्यक्ति में त्रुटि टाइप करें – johnsam

+0

@johnsam बस पहली आयात लाइन और "असेंबली सेटिंग्स" लाइन छोड़ दें, मेरे लिए काम करता है। – pederpansen

6

जबकि आवेदन जमा करने सभी निर्भरता जार शामिल करके प्रयास करें:

./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2.10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark-example-1.0-SNAPSHOT.jar

2

build.sbt मेरे लिए काम किया गया। यह आपको निर्देशिका के तहत फ़ाइल में sbt-assembly प्लगइन भी डालने की आवश्यकता है।

build.sbt

name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt 

libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-streaming_2.10" % "1.4.1", 
    "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1",   // kafka 
    "org.apache.hbase" % "hbase" % "0.92.1", 
    "org.apache.hadoop" % "hadoop-core" % "1.0.2", 
    "org.apache.spark" % "spark-mllib_2.10" % "1.3.0" 
) 

mergeStrategy in assembly := { 
    case m if m.toLowerCase.endsWith("manifest.mf")   => MergeStrategy.discard 
    case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => MergeStrategy.discard 
    case "log4j.properties"         => MergeStrategy.discard 
    case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines 
    case "reference.conf"         => MergeStrategy.concat 
    case _             => MergeStrategy.first 
} 

परियोजना/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

0

एक ही समस्या से मिलने, मैं इसे से निर्भरता के साथ जार निर्माण को हल किया। > गुण -

<build> 
    <sourceDirectory>src/main/java</sourceDirectory> 
    <testSourceDirectory>src/test/java</testSourceDirectory> 
    <plugins> 
     <!-- 
        Bind the maven-assembly-plugin to the package phase 
     this will create a jar file without the storm dependencies 
     suitable for deployment to a cluster. 
     --> 
     <plugin> 
     <artifactId>maven-assembly-plugin</artifactId> 
     <configuration> 
      <descriptorRefs> 
      <descriptorRef>jar-with-dependencies</descriptorRef> 
      </descriptorRefs> 
      <archive> 
      <manifest> 
       <mainClass></mainClass> 
      </manifest> 
      </archive> 
     </configuration> 
     <executions> 
      <execution> 
      <id>make-assembly</id> 
      <phase>package</phase> 
      <goals> 
       <goal>single</goal> 
      </goals> 
      </execution> 
     </executions> 
     </plugin> 
    </plugins> 
</build>  

mvn पैकेज "उदाहरण-जार-साथ-dependencies.jar"

0

जोड़ा गया निर्भरता बाह्य, परियोजना प्रस्तुत pom.xml करने के लिए नीचे दिए गए कोड को जोड़ने -> जावा बिल्ड पथ -> पुस्तकालय -> बाहरी जार जोड़ें और आवश्यक जार जोड़ें।

इसने मेरी समस्या हल की।

0

का उपयोग करते हुए स्पार्क 1.6 इतने सारे बाहरी जार से निपटने की परेशानी के बिना मेरे लिए काम करते हैं ... काफी प्रबंधन करने के लिए मुश्किल हो सकता है ...

0

तुम भी जार फ़ाइल डाउनलोड करने और स्पार्क में रख सकता है lib फ़ोल्डर, क्योंकि यह काम करने के लिए एसबीटी build.sbt शर्त लगाने की कोशिश कर अपने सिर को मारने के बजाय स्पार्क के साथ स्थापित नहीं है।

http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.10/2.1.1/spark-streaming-kafka-0-10_2.10-2.1.1.jar

में कॉपी:

/usr/local/spark/spark-2.1.0-bin-hadoop2.6/jars/

संबंधित मुद्दे