2015-04-21 13 views
18

मैं स्कैला 2.11.2 पर अपाचे स्पार्क 1.3.1 चला रहा हूं, और जब बड़े पैमाने पर पर्याप्त डेटा वाले एचपीसी क्लस्टर पर चल रहा हूं, तो मुझे अपनी पोस्ट के नीचे की तरह कई त्रुटियां मिलती हैं (प्रति सेकंड कई बार दोहराया जाता है, जब तक कि समय के साथ नौकरी की मौत न हो जाए)। त्रुटियों के आधार पर, निष्पादक अन्य नोड्स से डेटा को घुमाने का प्रयास कर रहा है लेकिन ऐसा करने में असमर्थ है।अपाचे स्पार्क: निष्पादकों के बीच नेटवर्क त्रुटियां

यह वही प्रोग्राम या तो (ए) डेटा की एक छोटी राशि, या (बी) स्थानीय-केवल मोड में ठीक से निष्पादित करता है, इसलिए नेटवर्क पर भेजे गए डेटा के साथ इसका कुछ संबंध होता है (और ट्रिगर नहीं किया जाता है) बहुत कम मात्रा में डेटा के साथ)।

val partitioned_data = data // data was read as sc.textFile(inputFile) 
    .zipWithIndex.map(x => (x._2, x._1)) 
    .partitionBy(partitioner) // A custom partitioner 
    .map(_._2) 

// Force previous lazy operations to be evaluated. Presumably adds some 
// overhead, but hopefully the minimum possible... 
// Suggested on Spark user list: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-RDD-computation-with-something-else-than-count-td707.html 
sc.runJob(partitioned_data, (iter: Iterator[_]) => {}) 

एक बग के इस संकेत है, या वहाँ कुछ मैं गलत कर रहा हूँ है:

कोड उस समय ऐसा होता है चारों ओर निष्पादित किया जा रहा है इस प्रकार है?

15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=26501223, length=6227612}} to /10.0.0.5:41160; closing connection 
java.io.IOException: Resource temporarily unavailable 
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) 
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415) 
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516) 
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96) 
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89) 
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237) 
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233) 
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707) 
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) 
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669) 
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) 
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741) 
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895) 
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240) 
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147) 
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119) 
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:619) 
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/27/shuffle_0_5_0.data, offset=3792987, length=2862285}} to /10.0.0.5:41160; closing connection 
java.nio.channels.ClosedChannelException 
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593002, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=0, length=10993212}} to /10.0.0.6:42426; closing connection 
java.io.IOException: Resource temporarily unavailable 
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) 
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415) 
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516) 
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96) 
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89) 
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237) 
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233) 
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707) 
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) 
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669) 
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688) 
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706) 
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741) 
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895) 
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240) 
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147) 
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119) 
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:619) 
15/04/21 14:59:28 WARN TransportChannelHandler: Exception in connection from node5.someuniversity.edu/10.0.0.5:60089 
java.io.IOException: Connection reset by peer 
    at sun.nio.ch.FileDispatcher.read0(Native Method) 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233) 
    at sun.nio.ch.IOUtil.read(IOUtil.java:206) 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236) 
    at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:234) 
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) 
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:619) 
15/04/21 14:59:28 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from node5.someuniversity.edu/10.0.0.5:60089 is closed 
15/04/21 14:59:28 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 outstanding blocks after 5000 ms 

उत्तर

15

यह Netty नेटवर्किंग प्रणाली (ब्लॉक हस्तांतरण सेवा) से संबंधित एक बग प्रतीत होता है, कहा:

यहाँ (पूर्ण लॉग here है) निष्पादकों में से एक की stderr लॉग का एक छोटा सा टुकड़ा है Spark 1.2 में। मेरे स्पार्ककॉन्फ़ को .set("spark.shuffle.blockTransferService", "nio") जोड़कर बग तय किया गया, इसलिए अब सब कुछ पूरी तरह से काम करता है।

मुझे a post on the spark-user mailing list किसी ऐसे व्यक्ति से मिला जो समान त्रुटियों में चल रहा था, और उन्होंने Netty के बजाय nio को आजमाने का सुझाव दिया।

SPARK-5085 समान है, Netty से nio में बदलकर यह समस्या ठीक हो गई है; हालांकि, वे कुछ नेटवर्किंग सेटिंग्स को बदलकर इस मुद्दे को ठीक करने में भी सक्षम थे। (मैंने अभी तक यह कोशिश नहीं की है, क्योंकि मुझे यकीन नहीं है कि क्लस्टर पर ऐसा करने के लिए मेरे पास सही पहुंच विशेषाधिकार हैं।)

+1

मैं स्पार्क संस्करण 1.4.1 का उपयोग कर रहा हूं और समस्या को सुलझाने के लिए स्विच कर रहा हूं। – firemonkey

+0

एचडीपी 2.3 के साथ स्पार्क 1.3.1 का उपयोग करके, हमें एक ही समस्या थी। निओ को स्विच करने से समस्या हल हो गई। –

+8

एनओओ पर स्विचिंग स्पार्क 1.5.1 में समस्या का समाधान नहीं किया, कोई विचार? –

0

यह भी संभव है कि आपकी मेवेन कॉन्फ़िगरेशन आपके स्पार्क सर्वर स्थापना से अलग हो।

उदाहरण के लिए अपने एक ब्लॉग पोस्ट ट्यूटोरियल

<dependencies> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_1.3</artifactId> 
     <version>1.3</version> 
    </dependency> 

</dependencies> 

से एक pom.xml उठाया लेकिन तुम अपाचे स्पार्क वेबसाइट पर नवीनतम 2.3 संस्करण को डाउनलोड किया जा सकता था।

संबंधित मुद्दे