2016-09-15 13 views
6

में पाइपिंग के माध्यम से विंडोज बैच फ़ाइल चलाना मेरे पास एक आवश्यकता है जिसमें मुझे स्पार्क क्लस्टर के एकाधिक नोड्स पर अपाचे स्पार्क का उपयोग करके विंडोज बैच फ़ाइल चलाने की आवश्यकता है।अपाचे स्पार्क

तो क्या यह अपाचे स्पार्क की पाइपिंग अवधारणा का उपयोग करके ऐसा करना संभव है?

मैंने उबंटू मशीन पर स्पार्क में पाइपिंग का उपयोग करके एक खोल फ़ाइल चलाने से पहले किया है। मेरे नीचे कोड एक ही कर रही है ठीक चलाता है:

data = ["hi","hello","how","are","you"] 
distScript = "/home/aawasthi/echo.sh" 
distScriptName = "echo.sh" 
sc.addFile(distScript) 
RDDdata = sc.parallelize(data) 
print RDDdata.pipe(SparkFiles.get(distScriptName)).collect() 

मैं एक Windows मशीन होने स्पार्क पर एक Windows बैच फ़ाइल स्थापित (Hadoop 2.6 के लिए 1.6 पहले से बनाए गए) को चलाने के लिए एक ही कोड अनुकूलन करने की कोशिश की। लेकिन यह मुझे sc.addFile चरण पर त्रुटि देता है। कोड के नीचे है:

batchFile = "D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv" 
batchFileName = "runOpenCv" 
sc.addFile(batchFile) 

त्रुटि स्पार्क द्वारा फेंका नीचे है:

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-11-9e13c265cbae> in <module>() 
----> 1 sc.addFile(batchFile)` 

Py4JJavaError: An error occurred while calling o160.addFile. 
: java.io.FileNotFoundException: Added file D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv does not exist. 
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1364) 
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1340) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 

हालांकि बैच फ़ाइल दिए गए स्थान पर मौजूद है।

अद्यतन:
जोड़ा गया .bat फ़ाइल पथ की शुरुआत में batchFile & batchFileName & file:/// में विस्तार के रूप में। संशोधित कोड है:

from pyspark import SparkFiles 
from pyspark import SparkContext  
sc  
batchFile = "file:///D:/spark-1.6.2-bin-hadoop2.6/data/OpenCV/runOpenCv.bat" 
batchFileName = "runOpenCv.bat" 
sc.addFile(batchFile) 
RDDdata = sc.parallelize(["hi","hello"]) 
print SparkFiles.get("runOpenCv.bat") 
print RDDdata.pipe(SparkFiles.get(batchFileName)).collect() 

अब यह addFile चरण में त्रुटि दिखा सकते हैं नहीं है, और print SparkFiles.get("runOpenCv.bat") प्रिंट पथ
C:\Users\abhilash.awasthi\AppData\Local\Temp\spark-c0f383b1-8365-4840-bd0f-e7eb46cc6794\userFiles-69051066-f18c-45dc-9610-59cbde0d77fe\runOpenCv.bat
तो फ़ाइल जोड़ा जाता है। लेकिन कोड के अंतिम चरण में यह नीचे त्रुटि फेंकता है:

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-6-bf2b8aea3ef0> in <module>() 
----> 1 print RDDdata.pipe(SparkFiles.get(batchFileName)).collect() 

D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.pyc in collect(self) 
    769   """ 
    770   with SCCallSiteSync(self.context) as css: 
--> 771    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    772   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    773 

D:\spark-1.6.2-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\sql\utils.pyc in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

D:\spark-1.6.2-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 317, in func 
    return f(iterator) 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 715, in func 
    shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
    File "C:\Anaconda2\lib\subprocess.py", line 710, in __init__ 
    errread, errwrite) 
    File "C:\Anaconda2\lib\subprocess.py", line 958, in _execute_child 
    startupinfo) 
WindowsError: [Error 2] The system cannot find the file specified 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 317, in func 
    return f(iterator) 
    File "D:\spark-1.6.2-bin-hadoop2.6\python\pyspark\rdd.py", line 715, in func 
    shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
    File "C:\Anaconda2\lib\subprocess.py", line 710, in __init__ 
    errread, errwrite) 
    File "C:\Anaconda2\lib\subprocess.py", line 958, in _execute_child 
    startupinfo) 
WindowsError: [Error 2] The system cannot find the file specified 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 
+2

विंडोज बैच फ़ाइलों में '.cmd' या' .bat' एक्सटेंशन है। क्या आपने इसे शामिल करने की कोशिश की है? –

+0

@MCND ओह मूर्खतापूर्ण .. हाँ एक्सटेंशन नाम में होना चाहिए। 'बैचफाइल' और 'बैचफाइलनाम' में '.bat' जोड़ने के बाद, मुझे नहीं मिलता कि फ़ाइल त्रुटि मौजूद नहीं है। लेकिन अद्यतन अद्यतन में दिखाए गए अनुसार मुझे अलग-अलग त्रुटि मिलती है। –

+0

'योजना के लिए कोई फ़ाइल सिस्टम नहीं: डी', इसलिए' डी: 'को आवश्यकतानुसार संभाला नहीं जा सकता है, शायद (माफ करना अगर यह कुछ बेवकूफ है, तो मुझे बैच फाइलों के बारे में कुछ पता है, लेकिन जावा मेरा क्षेत्र नहीं है) आपको यूआरआई चाहिए जैसे 'फ़ाइल: /// डी:/...' की आवश्यकता है –

उत्तर

0

बचने कृपया/

batchFile = "D://spark-1.6.2-bin-hadoop2.6//data//OpenCV//runOpenCv"

इसके अलावा, जैसा ए.ए. ऊपर का सुझाव दिया, यह .cmd या .bat विस्तार हो सकता है।

+0

बचने का पात्र \ है, इसलिए '/ 'से बचने की कोई आवश्यकता नहीं है –

संबंधित मुद्दे