से डेटा मैं एक चिंगारी संदर्भ बनाने के लिए एक JSON फ़ाइल, अमेज़न S3 से पढ़ा है, और डेटा की प्रक्रिया करने के लिए इसका इस्तेमाल करने की कोशिश कर रहा हूँ।कनेक्ट S3 के लिए PySpark
स्पार्क मूल रूप से एक डॉकर कंटेनर में है। इसलिए डॉकर पथ में फाइल डालना भी पिटा है। इसलिए इसे एस 3 पर धक्का दिया।
नीचे दिया गया कोड शेष सामग्री बताता है।
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-b94543fb0e8e> in <module>()
9 'org.apache.hadoop.io.Text',
10 'org.apache.hadoop.io.LongWritable',
---> 11 conf=config_dict)
12
/usr/local/spark/python/pyspark/context.pyc in hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
558 jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
559 valueClass, keyConverter, valueConverter,
--> 560 jconf, batchSize)
561 return RDD(jrdd, self)
562
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at org.apache.hadoop.fs.s3native.$Proxy20.initialize(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
at org.apache.spark.rdd.RDD.first(RDD.scala:1093)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
at org.apache.spark.api.python.PythonRDD$.hadoopFile(PythonRDD.scala:543)
at org.apache.spark.api.python.PythonRDD.hadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:744)
मैं स्पष्ट रूप से aswSecretAccessKey और awsAccessId प्रदान की है -
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("first")
sc = SparkContext(conf=conf)
config_dict = {"fs.s3n.awsAccessKeyId":"**",
"fs.s3n.awsSecretAccessKey":"**"}
bucket = "nonamecpp"
prefix = "dataset.json"
filename = "s3n://{}/{}".format(bucket, prefix)
rdd = sc.hadoopFile(filename,
'org.apache.hadoop.mapred.TextInputFormat',
'org.apache.hadoop.io.Text',
'org.apache.hadoop.io.LongWritable',
conf=config_dict)
मैं निम्नलिखित त्रुटि मिलती है। क्या गलत हो रहा है?
आप मेरा दिन बचाया! – SRC
एस 3 पर किसी फ़ोल्डर से लकड़ी की फ़ाइलों को कैसे पढ़ा जाए? (Pyspark)। उपरोक्त कोड पैराक्वेट्स पढ़ने की कोशिश करते समय मेरे लिए काम नहीं कर रहा है – Viv
क्या आप दिखा सकते हैं कि आप लकड़ी की फाइलों से कैसे पढ़ रहे हैं? – Franzi