14

से अधिक है, मैं कुछ सरल मशीन सीखने के कार्य के लिए स्पार्क का उपयोग करने की कोशिश कर रहा हूं। मैंने एक साधारण लॉजिस्टिक रिग्रेशन समस्या करने के लिए pyspark और स्पार्क 1.2.0 का उपयोग किया। मेरे पास प्रशिक्षण के लिए 1.2 मिलियन रिकॉर्ड हैं, और मैंने रिकॉर्ड की विशेषताओं को छोड़ दिया है। जब मैं 1024 के रूप में टुकड़ों में बांटा सुविधाओं की संख्या निर्धारित करते हैं, कार्यक्रम निम्न त्रुटि के साथ ठीक काम करता है, लेकिन जब मैं 16384 के रूप में टुकड़ों में बांटा सुविधाओं की संख्या निर्धारित करते हैं, प्रोग्राम विफल कई बार:स्पार्क जावा त्रुटि: आकार Integer.MAX_VALUE

Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) 
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) 
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) 
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) 
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) 
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) 
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:745) 

    at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

यह त्रुटि तब होता है जब मैं लेबल किए गए डेटा में डेटा स्थानांतरित करने के बाद LogisticRegressionWithSGD को प्रशिक्षण देता हूं।

क्या किसी को इस पर कोई विचार है?

मेरे कोड (मैं इस के लिए एक IPython नोटबुक का उपयोग कर रहा) इस प्रकार है: जब अंतिम पंक्ति को क्रियान्वित करने

from pyspark.mllib.regression import LabeledPoint 
from pyspark.mllib.classification import LogisticRegressionWithSGD 
from numpy import array 
from sklearn.feature_extraction import FeatureHasher 
from pyspark import SparkContext 
sf = SparkConf().setAppName("test").set("spark.executor.memory", "50g").set("spark.cores.max", 30) 
sc = SparkContext(conf=sf) 
training_file = sc.textFile("train_small.txt") 
def hash_feature(line): 
    values = [0, dict()] 
    for index, x in enumerate(line.strip("\n").split('\t')): 
     if index == 0: 
      values[0] = float(x) 
     else: 
      values[1][str(index)+"_"+x] = 1 
    return values 
n_feature = 2**14 
hasher = FeatureHasher(n_features=n_feature) 
training_file_hashed = training_file.map(lambda line: [hash_feature(line)[0], hasher.transform([hash_feature(line)[1]])]) 
def build_lable_points(line): 
    values = [0.0] * n_feature 
    for index, value in zip(line[1].indices, line[1].data): 
     values[index] = value 
    return LabeledPoint(line[0], values) 
parsed_training_data = training_file_hashed.map(lambda line: build_lable_points(line)) 
model = LogisticRegressionWithSGD.train(parsed_training_data) 

त्रुटि होता है।

+1

क्या आप अपना कोड दिखा सकते हैं? –

+0

कोड को मूल पोस्ट में जोड़ा गया है, धन्यवाद – peng

+0

क्या आप और विभाजन कर सकते हैं? (मुझे लगता है कि विभाजन से कम विभाजन का मतलब कम डेटा है, इसलिए इसे चाल करना चाहिए)। –

उत्तर

1

किसी बिंदु पर, यह सुविधाओं को स्टोर करने का प्रयास करता है और 1.2M * 16384 Integer.MAX_INT से अधिक है, इसलिए आप स्पार्क द्वारा समर्थित सुविधाओं के अधिकतम आकार से अधिक स्टोर करने का प्रयास कर रहे हैं।

आप शायद अपाचे स्पार्क की सीमाओं में चल रहे हैं।

+1

धन्यवाद। क्या आप इसे विस्तार से बताएंगे? मैंने स्पार्क द्वारा समर्थित सुविधाओं के अधिकतम आकार के बारे में कभी नहीं सुना। मुझे पता है कि स्पार्क के लिए ब्लॉक आकार पर सीमा है, https://issues.apache.org/jira/browse/SPARK-1476 देखें, मुझे यकीन नहीं है कि मैं इसे मार रहा हूं, लेकिन अगर मैं इसे मार रहा हूं, तो मुझे आश्चर्य है कि कैसे मैं इससे बच सकता हूं कि सुविधाओं की संख्या और रिकॉर्ड्स की संख्या को कम किए बिना – peng

11

Integer.MAX_INT प्रतिबंध संग्रहीत फ़ाइल के आकार पर है। 1.2 एम पंक्तियां एक बड़ी बात नहीं है, मुझे यकीन नहीं है कि आपकी समस्या "स्पार्क की सीमा" है। अधिक संभावना है कि, आपके काम का कुछ हिस्सा किसी दिए गए निष्पादक द्वारा संभाला जाने वाला कुछ बड़ा बना रहा है।

मैं कोई पायथन कोडर नहीं हूं, लेकिन जब आपने "रिकॉर्ड की विशेषताओं को धोया" तो आप नमूना के लिए रिकॉर्ड का एक बहुत ही अलग सेट ले सकते हैं और एक गैर-स्पैस सरणी बना सकते हैं। इसका मतलब 16384 फीचर्स के लिए बहुत सारी मेमोरी होगी। विशेष रूप से, जब आप zip(line[1].indices, line[1].data) करते हैं। एकमात्र कारण जो आपको स्मृति से बाहर नहीं निकालता है, उसमें शिटलोड होता है जिसे आपने कॉन्फ़िगर किया है (50 जी)।

विभाजन करने के लिए एक और चीज जो मदद कर सकती है वह है। इसलिए यदि आप अपनी पंक्तियों को कम स्मृति का उपयोग नहीं कर सकते हैं, तो कम से कम आप किसी भी दिए गए कार्य पर कम पंक्तियों का प्रयास कर सकते हैं। बनाई जा रही किसी भी अस्थायी फाइलें इस पर निर्भर होने की संभावना है, इसलिए आप फ़ाइल सीमाओं को हिट करने की अधिक संभावना नहीं होगी।


और, पूरी तरह से त्रुटि से संबंधित नहीं है, लेकिन प्रासंगिक है कि आप क्या करने की कोशिश कर रहे हैं के लिए:

16384 वास्तव में सुविधाओं की एक बड़ी संख्या है, आशावादी मामले में जहां हर एक को सिर्फ एक बूलियन विशेषता है, आपके पास सीखने के लिए कुल 2^16384 संभावित क्रमिक क्रम हैं, यह एक बड़ी संख्या है (इसे यहां आज़माएं: https://defuse.ca/big-number-calculator.htm)।

यह बहुत है, बहुत संभव है कि कोई भी एल्गोरिदम केवल 1.2 एम नमूने के साथ निर्णय सीमा सीखने में सक्षम नहीं होगा, आपको शायद इस तरह के फीचर स्पेस पर कम से कम कुछ ट्रिलियन ट्रिलियन उदाहरणों की आवश्यकता होगी। मशीन लर्निंग की इसकी सीमाएं हैं, इसलिए आश्चर्यचकित न हों अगर आपको यादृच्छिक सटीकता से बेहतर न हो।

मैं निश्चित रूप से पहले कुछ आयामी कमी की कोशिश करने की सिफारिश करता हूं !!

+1

धन्यवाद। यह समस्या डेटा लोड करते समय अधिक विभाजन का उपयोग कर तय की जाती है। हम सिर्फ छोटे डेटा सेट पर परीक्षण कर रहे हैं और कुछ विचार हासिल कर रहे हैं, तो हम बहुत शक्तिशाली मशीन के साथ बड़े डेटा सेट पर आवेदन करने जा रहे हैं। – peng

संबंधित मुद्दे