में पाइपिंग के माध्यम से विंडोज बैच फ़ाइल चलाना मेरे पास एक आवश्यकता है जिसमें मुझे स्पार्क क्लस्टर के एकाधिक नोड्स पर अपाचे स्पार्क का उपयोग करके विंडोज बैच फ़ाइल चलाने की आवश्यकता है।अपाचे स्पार्क
तो क्या यह अपाचे स्पार्क की पाइपिंग अवधारणा का उपयोग करके ऐसा करना संभव है?
मैंने उबंटू मशीन पर स्पार्क में पाइपिंग का उपयोग करके एक खोल फ़ाइल चलाने से पहले किया है। मेरे नीचे कोड एक ही कर रही है ठीक चलाता है:
data = ["hi","hello","how","are","you"]
distScript = "/home/aawasthi/echo.sh"
distScriptName = "echo.sh"
sc.addFile(distScript)
RDDdata = sc.parallelize(data)
print RDDdata.pipe(SparkFiles.get(distScriptName)).collect()
मैं एक Windows मशीन होने स्पार्क पर एक Windows बैच फ़ाइल स्थापित (Hadoop 2.6 के लिए 1.6 पहले से बनाए गए) को चलाने के लिए एक ही कोड अनुकूलन करने की कोशिश की। लेकिन यह मुझे sc.addFile
चरण पर त्रुटि देता है। कोड के नीचे है:
batchFile = "D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv"
batchFileName = "runOpenCv"
sc.addFile(batchFile)
त्रुटि स्पार्क द्वारा फेंका नीचे है:
Py4JJavaError Traceback (most recent call last)
<ipython-input-11-9e13c265cbae> in <module>()
----> 1 sc.addFile(batchFile)`
Py4JJavaError: An error occurred while calling o160.addFile.
: java.io.FileNotFoundException: Added file D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv does not exist.
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1364)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1340)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
हालांकि बैच फ़ाइल दिए गए स्थान पर मौजूद है।
अद्यतन:
जोड़ा गया .bat
फ़ाइल पथ की शुरुआत में batchFile
& batchFileName
& file:///
में विस्तार के रूप में। संशोधित कोड है:
from pyspark import SparkFiles
from pyspark import SparkContext
sc
batchFile = "file:///D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv.bat"
batchFileName = "runOpenCv.bat"
sc.addFile(batchFile)
RDDdata = sc.parallelize(["hi","hello"])
print SparkFiles.get("runOpenCv.bat")
print RDDdata.pipe(SparkFiles.get(batchFileName)).collect()
अब यह addFile
चरण में त्रुटि दिखा सकते हैं नहीं है, और print SparkFiles.get("runOpenCv.bat")
प्रिंट पथ
C:\Users\abhilash.awasthi\AppData\Local\Temp\spark-c0f383b1-8365-4840-bd0f-e7eb46cc6794\userFiles-69051066-f18c-45dc-9610-59cbde0d77fe\runOpenCv.bat
तो फ़ाइल जोड़ा जाता है। लेकिन कोड के अंतिम चरण में यह नीचे त्रुटि फेंकता है:
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-bf2b8aea3ef0> in <module>()
----> 1 print RDDdata.pipe(SparkFiles.get(batchFileName)).collect()
D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.pyc in collect(self)
769 """
770 with SCCallSiteSync(self.context) as css:
--> 771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
772 return list(_load_from_socket(port, self._jrdd_deserializer))
773
D:\spark-1.6.2-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\sql\utils.pyc in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
D:\spark-1.6.2-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main
File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process
File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 317, in func
return f(iterator)
File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 715, in func
shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
File "C:\Anaconda2\lib\subprocess.py", line 710, in __init__
errread, errwrite)
File "C:\Anaconda2\lib\subprocess.py", line 958, in _execute_child
startupinfo)
WindowsError: [Error 2] The system cannot find the file specified
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main
File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process
File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 317, in func
return f(iterator)
File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 715, in func
shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
File "C:\Anaconda2\lib\subprocess.py", line 710, in __init__
errread, errwrite)
File "C:\Anaconda2\lib\subprocess.py", line 958, in _execute_child
startupinfo)
WindowsError: [Error 2] The system cannot find the file specified
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
विंडोज बैच फ़ाइलों में '.cmd' या' .bat' एक्सटेंशन है। क्या आपने इसे शामिल करने की कोशिश की है? –
@MCND ओह मूर्खतापूर्ण .. हाँ एक्सटेंशन नाम में होना चाहिए। 'बैचफाइल' और 'बैचफाइलनाम' में '.bat' जोड़ने के बाद, मुझे नहीं मिलता कि फ़ाइल त्रुटि मौजूद नहीं है। लेकिन अद्यतन अद्यतन में दिखाए गए अनुसार मुझे अलग-अलग त्रुटि मिलती है। –
'योजना के लिए कोई फ़ाइल सिस्टम नहीं: डी', इसलिए' डी: 'को आवश्यकतानुसार संभाला नहीं जा सकता है, शायद (माफ करना अगर यह कुछ बेवकूफ है, तो मुझे बैच फाइलों के बारे में कुछ पता है, लेकिन जावा मेरा क्षेत्र नहीं है) आपको यूआरआई चाहिए जैसे 'फ़ाइल: /// डी:/...' की आवश्यकता है –