2016-11-05 12 views
7
में

मैं एक स्पार्क क्लस्टर एक जावा अनुप्रयोग के भीतर से आईपी 10.20.30.50 और बंदरगाह 7077 साथ एक आभासी मशीन के भीतर चल उदाहरण कनेक्ट और गिनती शब्द को चलाने के लिए कोशिश कर रहा हूँ फेंक दिया:स्पार्क और जावा: अपवाद awaitResult

SparkConf conf = new SparkConf().setMaster("spark://10.20.30.50:7077").setAppName("wordCount"); 
JavaSparkContext sc = new JavaSparkContext(conf); 
JavaRDD<String> textFile = sc.textFile("hdfs://localhost:8020/README.md"); 
String result = Long.toString(textFile.count()); 
JavaRDD<String> words = textFile.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator()); 
JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)); 
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b); 
counts.saveAsTextFile("hdfs://localhost:8020/tmp/output"); 
sc.stop(); 
return result; 

जावा एप्लिकेशन के पास निम्न स्टैक ट्रेस दिखाता है:

Running Spark version 2.0.1 
Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
Changing view acls to: lii5ka 
Changing modify acls to: lii5ka 
Changing view acls groups to: 
Changing modify acls groups to: 
SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(lii5ka); groups with view permissions: Set(); users with modify permissions: Set(lii5ka); groups with modify permissions: Set() 
Successfully started service 'sparkDriver' on port 61267. 
Registering MapOutputTracker 
Registering BlockManagerMaster 
Created local directory at /private/var/folders/4k/h0sl02993_99bzt0dzv759000000gn/T/blockmgr-51de868d-3ba7-40be-8c53-f881f97ced63 
MemoryStore started with capacity 2004.6 MB 
Registering OutputCommitCoordinator 
Logging initialized @48403ms 
jetty-9.2.z-SNAPSHOT 
Started [email protected]{/jobs,null,AVAILABLE} 
Started [email protected]{/jobs/json,null,AVAILABLE} 
Started [email protected]{/jobs/job,null,AVAILABLE} 
Started [email protected]{/jobs/job/json,null,AVAILABLE} 
Started [email protected]{/stages,null,AVAILABLE} 
Started [email protected]{/stages/json,null,AVAILABLE} 
Started [email protected]{/stages/stage,null,AVAILABLE} 
Started [email protected]{/stages/stage/json,null,AVAILABLE} 
Started [email protected]{/stages/pool,null,AVAILABLE} 
Started [email protected]{/stages/pool/json,null,AVAILABLE} 
Started [email protected]{/storage,null,AVAILABLE} 
Started [email protected]{/storage/json,null,AVAILABLE} 
Started [email protected]{/storage/rdd,null,AVAILABLE} 
Started [email protected]{/storage/rdd/json,null,AVAILABLE} 
Started [email protected]{/environment,null,AVAILABLE} 
Started [email protected]{/environment/json,null,AVAILABLE} 
Started [email protected]{/executors,null,AVAILABLE} 
Started [email protected]{/executors/json,null,AVAILABLE} 
Started [email protected]{/executors/threadDump,null,AVAILABLE} 
Started [email protected]{/executors/threadDump/json,null,AVAILABLE} 
Started [email protected]{/static,null,AVAILABLE} 
Started [email protected]{/,null,AVAILABLE} 
Started [email protected]{/api,null,AVAILABLE} 
Started [email protected]{/stages/stage/kill,null,AVAILABLE} 
Started [email protected]{HTTP/1.1}{0.0.0.0:4040} 
Started @48698ms 
Successfully started service 'SparkUI' on port 4040. 
Bound SparkUI to 0.0.0.0, and started at http://192.168.0.104:4040 
Connecting to master spark://10.20.30.50:7077... 
Successfully created connection to /10.20.30.50:7077 after 25 ms (0 ms spent in bootstraps) 
Connecting to master spark://10.20.30.50:7077... 
Still have 2 requests outstanding when connection from /10.20.30.50:7077 is closed 
Failed to connect to master 10.20.30.50:7077 

org.apache.spark.SparkException: Exception thrown in awaitResult 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[scala-library-2.11.8.jar:na] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) ~[scala-library-2.11.8.jar:na] 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_102] 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_102] 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102] 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102] 
     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102] 
Caused by: java.io.IOException: Connection from /10.20.30.50:7077 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     ... 1 common frames omitted 

स्पार्क मास्टर लॉग में 10.20.30.50 पर, मैं निम्न त्रुटि संदेश:

16/11/05 14:47:20 ERROR OneForOneStrategy: Error while decoding incoming Akka PDU of length: 1298 
akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 1298 
Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed. 
    at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167) 
    at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:580) 
    at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:375) 
    at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:343) 
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
    at akka.actor.FSM$class.processEvent(FSM.scala:604) 
    at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:269) 
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598) 
    at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
    at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:269) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). 
    at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89) 
    at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108) 
    at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6643) 
    at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6607) 
    at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6703) 
    at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6698) 
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) 
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) 
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) 
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) 
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) 
    at akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:6821) 
    at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168) 
    ... 19 more 

अतिरिक्त सूचना

  • उदाहरण ठीक काम करता है जब मैं new SparkConf().setMaster("local") बजाय का उपयोग
  • मैं बहुत ही मशीन पर spark-shell --master spark://10.20.30.50:7077 साथ स्पार्क मास्टर से कनेक्ट कर सकते
+0

आप इस आईपी '10.20.30.50: 7077' – pamu

+0

के साथ स्थानीय मशीन पर नोड से कनेक्ट नहीं कर सकते हैं क्यों नहीं? स्पार्क मेरे होस्ट पर वर्चुअल मशीन में चल रहा है जो इस आईपी के माध्यम से सुलभ है - इसलिए मुझे नहीं पता कि मुझे इससे कनेक्ट क्यों नहीं होना चाहिए? क्या स्पार्क में यह कोई विशेष प्रतिबंध है? –

+0

आपने मुझे कभी नहीं बताया कि – pamu

उत्तर

7

नेटवर्क की तरह लग रहा स्पार्क के संस्करण विसंगति के छिपाने में पहली जगह में त्रुटि (लेकिन वास्तव में नहीं)। आप ज्यादातर विधानसभा जार स्पार्क जार के सही संस्करण को इंगित कर सकते हैं।

यह समस्या प्रोटोबफर का उपयोग करके हाडोप आरपीसी कॉल में संस्करण मिस मैच के कारण हो सकती है।

when a protocol message being parsed is invalid in some way, e.g. it contains a malformed varint or a negative byte length.

  • Protobuf साथ मेरा अनुभव, InvalidProtocolBufferException भी हो सकता है, जब संदेश पार्स करने के लिए सक्षम नहीं था (प्रोग्राम के लिए यदि आप Protobuf संदेश को पार्स कर रहे हैं, हो सकता है संदेश legth शून्य है या संदेश दूषित है। ..)।

  • स्पार्क संदेश मास्टर/चालक और श्रमिक और Internally akka uses googles protobuf to communicate. see method below from AkkaPduCodec.scala)

    override def decodePdu(raw: ByteString): AkkaPdu = { 
        try { 
         val pdu = AkkaProtocolMessage.parseFrom(raw.toArray) 
         if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) 
         else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction) 
         else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null) 
        } catch { 
         case e: InvalidProtocolBufferException ⇒ throw new PduCodecException("Decoding PDU failed.", e) 
        } 
        } 
    

लेकिन आपके मामले में बीच पासिंग के लिए अक्का अभिनेता का उपयोग करता है, क्योंकि इसकी संस्करण बेमेल, नई Protobuf संस्करण संदेश पुराने संस्करण से पार्स किया जा नहीं कर सकते पार्सर ... या कुछ ...

यदि आप मेवेन अन्य निर्भरताओं का उपयोग कर रहे हैं, कृपया। समीक्षा।

+1

मेरे लिए यह स्कैला संस्करण (पैच) था। धन्यवाद! – combinatorist

2

यह पता चला कि मेरे पास वर्चुअल मशीन में स्पार्क संस्करण 1.5.2 चल रहा था और जावा में स्पार्क लाइब्रेरी का संस्करण 2.0.1 था। मैं अपने pom.xml में उपयुक्त स्पार्क पुस्तकालय संस्करण का उपयोग करके समस्या का समाधान हो गया है जो

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.5.2</version> 
</dependency> 

एक और समस्या (जो बाद में हुआ) था, कि मैं भी स्काला संस्करण है जो साथ पुस्तकालय का निर्माण था पिन करने के लिए किया था। ArtifactId में यह _2.10 प्रत्यय है।

मूल रूप से @ रामप्रसाद के जवाब ने मुझे सही दिशा में इंगित किया लेकिन मेरी समस्या को ठीक करने के लिए मुझे क्या करने की ज़रूरत है, यह स्पष्ट सलाह नहीं दी।

वैसे: मैं वर्चुअल मशीन में स्पार्क को अपडेट नहीं कर सका, क्योंकि यह मुझे हॉर्टनवर्क्स वितरण द्वारा लाया गया था ...

+0

"यदि आप मेवेन अन्य निर्भरताओं का उपयोग कर रहे हैं तो कृपया समीक्षा करें।" ... मुझे यह भी पता नहीं था कि आप मेवेन का उपयोग कर रहे हैं और मेरे उत्तर में ऊपर सुझाए गए हैं। –

संबंधित मुद्दे