2013-05-15 15 views
8

मैं स्पार्क स्ट्रीमिंग के साथ प्रोग्रामिंग कर रहा हूं लेकिन स्कैला के साथ कुछ परेशानी है। मैं समारोह StreamingContext.fileStreamस्पार्क स्ट्रीमिंग फ़ाइलस्ट्रीम

इस समारोह की परिभाषा का उपयोग करने के कोशिश कर रहा हूँ इस तरह है:

def fileStream[K, V, F <: InputFormat[K, V]](directory: String)(implicit arg0: ClassManifest[K], arg1: ClassManifest[V], arg2: ClassManifest[F]): DStream[(K, V)] 

एक इनपुट धारा है कि नई फ़ाइलों के लिए एक Hadoop संगत फाइल सिस्टम पर नज़र रखता है और का उपयोग कर उन्हें पढ़ता बनाएं दिया गया कुंजी-मूल्य प्रकार और इनपुट प्रारूप। फ़ाइल नाम से शुरू हो रहा है। अनदेखा कर रहे हैं। कश्मीर पढ़ने के लिए कुंजी प्रकार HDFS पढ़ने HDFS निर्देशिका नई फ़ाइल

के लिए नजर रखने के लिए दायर HDFS निर्देशिका के लिए HDFS फ़ाइल एफ इनपुट प्रारूप को पढ़ने के लिए फ़ाइल वी मूल्य प्रकार मैं पारित करने के लिए पता नहीं कैसे कुंजी और मूल्य का प्रकार। चिंगारी स्ट्रीमिंग में मेरे कोड:

public class MyDriver { 

private static final String[] DATA = { "One, two, buckle my shoe", 
     "Three, four, shut the door", "Five, six, pick up sticks", 
     "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; 

public static void main(String[] args) throws IOException { 
    String uri = args[0]; 
    Configuration conf = new Configuration(); 
    FileSystem fs = FileSystem.get(URI.create(uri), conf); 
    Path path = new Path(uri); 
    IntWritable key = new IntWritable(); 
    Text value = new Text(); 
    SequenceFile.Writer writer = null; 
    try { 
     writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), 
       value.getClass()); 
     for (int i = 0; i < 100; i++) { 
      key.set(100 - i); 
      value.set(DATA[i % DATA.length]); 
      System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, 
        value); 
      writer.append(key, value); 
     } 
    } finally { 
     IOUtils.closeStream(writer); 
    } 
} 

}

+0

आप क्या मुद्दों देख रहे हैं? क्या आप संकलन त्रुटियां प्राप्त कर रहे हैं? यदि ऐसा है, तो वो क्या हैं? जब आप अपना कोड चलाते हैं तो क्या आपको त्रुटियां/अप्रत्याशित व्यवहार मिल रहा है? यदि आप अधिक संदर्भ प्रदान करते हैं कि आप कौन सी त्रुटियों/अप्रत्याशित व्यवहार देख रहे हैं तो आपको उपयोगी उत्तर प्राप्त होने की अधिक संभावना है। – cmbaxter

उत्तर

5

आप fileStream का उपयोग करना चाहते हैं, तो आप सभी की आपूर्ति करने के लिए जा रहे हैं:

val ssc = new StreamingContext(args(0), "StreamingReceiver", Seconds(1), 
    System.getenv("SPARK_HOME"), Seq("/home/mesos/StreamingReceiver.jar")) 

// Create a NetworkInputDStream on target ip:port and count the 
val lines = ssc.fileStream("/home/sequenceFile") 

जावा कोड Hadoop फ़ाइल में लिखने का इसे कॉल करते समय इसे 3 प्रकार के पैरा। आपको यह जानने की जरूरत है कि Key, Value और InputFormat प्रकार इसे कॉल करने से पहले क्या हैं। यदि आपका प्रकार LongWritable, Text और TextInputFormat थे, तो आप तो जैसे fileStream कहेंगे:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/sequenceFile") 

उन 3 प्रकार के अपने प्रकार के होने के लिए होता है, तो आप के बजाय textFileStream उपयोग करने के लिए के रूप में यह किसी भी प्रकार की आवश्यकता नहीं है चाहते हो सकता है पैरामीटर और प्रतिनिधि fileStream पर उन 3 प्रकारों का उपयोग करते हुए मैंने उल्लेख किया। कि इस प्रकार दिखाई देगा का उपयोग करना:

val lines = ssc.textFileStream("/home/sequenceFile") 
+0

अरे, मैं ऐसा करने की कोशिश कर रहा हूं लेकिन बाइनरी फाइलों के साथ, मैंने यहां निर्देश का पालन किया है, दुर्भाग्य से यह काम नहीं करता है। क्या आप कुछ सुझाव दे सकते हैं? https://stackoverflow.com/questions/45778016/reading-binaryfile-with-spark-streaming – MaatDeamon

2
val filterF = new Function[Path, Boolean] { 
    def apply(x: Path): Boolean = { 
     val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis) true else false 
     return flag 
    } 
} 

val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_input",filterF,false).map(_._2.toString).map(u => u.split('\t')) 
संबंधित मुद्दे