2015-08-22 11 views
5

से डेटा मैं एक चिंगारी संदर्भ बनाने के लिए एक JSON फ़ाइल, अमेज़न S3 से पढ़ा है, और डेटा की प्रक्रिया करने के लिए इसका इस्तेमाल करने की कोशिश कर रहा हूँ।कनेक्ट S3 के लिए PySpark

स्पार्क मूल रूप से एक डॉकर कंटेनर में है। इसलिए डॉकर पथ में फाइल डालना भी पिटा है। इसलिए इसे एस 3 पर धक्का दिया।

नीचे दिया गया कोड शेष सामग्री बताता है।

Py4JJavaError        Traceback (most recent call last) 
<ipython-input-2-b94543fb0e8e> in <module>() 
     9      'org.apache.hadoop.io.Text', 
    10      'org.apache.hadoop.io.LongWritable', 
---> 11      conf=config_dict) 
    12 

/usr/local/spark/python/pyspark/context.pyc in hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize) 
    558   jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, 
    559            valueClass, keyConverter, valueConverter, 
--> 560            jconf, batchSize) 
    561   return RDD(jrdd, self) 
    562 

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile. 
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). 
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) 
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) 
    at org.apache.hadoop.fs.s3native.$Proxy20.initialize(Unknown Source) 
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272) 
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397) 
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) 
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) 
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) 
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) 
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) 
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1060) 
    at org.apache.spark.rdd.RDD.first(RDD.scala:1093) 
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) 
    at org.apache.spark.api.python.PythonRDD$.hadoopFile(PythonRDD.scala:543) 
    at org.apache.spark.api.python.PythonRDD.hadoopFile(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:744) 

मैं स्पष्ट रूप से aswSecretAccessKey और awsAccessId प्रदान की है -

from pyspark import SparkContext, SparkConf 
conf = SparkConf().setAppName("first") 
sc = SparkContext(conf=conf) 

config_dict = {"fs.s3n.awsAccessKeyId":"**", 
       "fs.s3n.awsSecretAccessKey":"**"} 

bucket = "nonamecpp" 
prefix = "dataset.json" 
filename = "s3n://{}/{}".format(bucket, prefix) 
rdd = sc.hadoopFile(filename, 
        'org.apache.hadoop.mapred.TextInputFormat', 
        'org.apache.hadoop.io.Text', 
        'org.apache.hadoop.io.LongWritable', 
        conf=config_dict) 

मैं निम्नलिखित त्रुटि मिलती है। क्या गलत हो रहा है?

उत्तर

12

मैं में --packages org.apache.hadoop:hadoop-aws:2.7.1 जोड़ने समाधान कर लिया है आदेश चिंगारी सबमिट करें।

यह सब Hadoop लापता संकुल है कि आप एस 3 के साथ चिंगारी नौकरियों पर अमल करने की अनुमति देगा डाउनलोड करेगा।

फिर अपने काम में आप की तरह अपने AWS क्रेडेंशियल निर्धारित करने की आवश्यकता:

#!/usr/bin/env bash 
AWS_ACCESS_KEY_ID='xxxx' 
AWS_SECRET_ACCESS_KEY='xxxx' 

SPARK_WORKER_CORES=1 # to set the number of cores to use on this machine 
SPARK_WORKER_MEMORY=1g # to set how much total memory workers have to give executors (e.g. 1000m, 2g) 
SPARK_EXECUTOR_INSTANCES=10 #, to set the number of worker processes per node 

अधिक:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_id) 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_key) 

आपके क्रेडेंशियल्स स्थापना के बारे में अन्य विकल्प उन्हें चिंगारी/conf/चिंगारी env में परिभाषित है जानकारी:

+0

आप मेरा दिन बचाया! – SRC

+0

एस 3 पर किसी फ़ोल्डर से लकड़ी की फ़ाइलों को कैसे पढ़ा जाए? (Pyspark)। उपरोक्त कोड पैराक्वेट्स पढ़ने की कोशिश करते समय मेरे लिए काम नहीं कर रहा है – Viv

+0

क्या आप दिखा सकते हैं कि आप लकड़ी की फाइलों से कैसे पढ़ रहे हैं? – Franzi