2016-12-18 10 views
5

मुझे लगता है कि निम्नलिखित करता अपाचे Flink का उपयोग कर एक नमूना आवेदन का निर्माण करने की कोशिश कर रहा हूँ एक स्ट्रीमिंग डेटा स्रोत बनाने के लिए:अपाचे Flink एक डेटा स्ट्रीम से उपयोग मान डायनामिक

  1. स्टॉक प्रतीक की एक धारा पढ़ता है (उदाहरण के लिए एक काफ्का कतार से 'सीएससीओ', 'एफबी')।
  2. प्रत्येक प्रतीक के लिए वर्तमान कीमतों का रीयल-टाइम लुकअप करता है और डाउनस्ट्रीम प्रोसेसिंग के मानों को स्ट्रीम करता है।

* मूल पोस्ट *

को अद्यतन मैं एक अलग वर्ग में नक्शा समारोह चले गए और रन-टाइम त्रुटि संदेश "MapFunction के कार्यान्वयन किसी भी अधिक serializable नहीं है नहीं मिलता है। ऑब्जेक्ट में शायद गैर धारावाहिक फ़ील्ड "शामिल या संदर्भित है।

अब जिस मुद्दे का सामना कर रहा हूं वह यह है कि काफ्का विषय "स्टॉकप्रिसेस" मैं कीमतों को लिखने की कोशिश कर रहा हूं, उन्हें प्राप्त नहीं कर रहा है। मैं परेशानी शूट करने की कोशिश कर रहा हूं और कोई अपडेट पोस्ट करूंगा।

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "stocks"); 

     DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

     DataStream<String> stockPrice = 
      streamOfStockSymbols 
      //get unique keys 
      .keyBy(new KeySelector<String, String>() { 
       @Override 
       public String getKey(String trend) throws Exception { 
        return trend; 
       } 
       }) 
      //collect events over a window 
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
      //return the last event from the window...all elements are the same "Symbol" 
      .apply(new WindowFunction<String, String, String, TimeWindow>() { 
       @Override 
       public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
        out.collect(input.iterator().next().toString()); 
       } 
      }) 
      .map(new StockSymbolToPriceMapFunction()); 

     streamExecEnv.execute("Retrieve Stock Prices"); 
    } 
} 

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { 
    @Override 
    public String map(String stockSymbol) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 
     System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); 

     DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); 
     stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); 

     return "100000"; 
    } 

    private static class CustomKeySelector implements KeySelector<String, String> { 
     @Override 
     public String getKey(String arg0) throws Exception { 
      return arg0.trim(); 
     } 
    } 
} 


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
      stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
      isRunning = true; 
    } 


    @Override 
    public void cancel() { 
      isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
        throws Exception { 
      String stockPrice = "0"; 
      while (isRunning) { 
       //TODO: query Google Finance API 
       stockPrice = Integer.toString((new Random()).nextInt(100)+1); 
       ctx.collect(stockPrice); 
       Thread.sleep(10000); 
      } 
    } 
} 

उत्तर

4

StreamExecutionEnvironment स्ट्रीमिंग एप्लिकेशन के ऑपरेटरों के अंदर उपयोग करने के लिए इंडेंट नहीं किया गया है। इरादा नहीं है, यह परीक्षण और प्रोत्साहित नहीं किया जाता है। यह काम कर सकता है और कुछ कर सकता है, लेकिन अधिकतर संभवतः व्यवहार नहीं करेगा और शायद आपके आवेदन को मार डालेगा।

आपके प्रोग्राम में StockSymbolToPriceMapFunction प्रत्येक आने वाले रिकॉर्ड के लिए एक पूरी तरह से नया और स्वतंत्र नया स्ट्रीमिंग एप्लिकेशन निर्दिष्ट करता है। हालांकि, चूंकि आप streamExecEnv.execute() पर कॉल नहीं करते हैं, इसलिए प्रोग्राम शुरू नहीं होते हैं और map विधि कुछ भी किए बिना लौटाता है।

आप होगा कॉल streamExecEnv.execute(), समारोह कार्यकर्ताओं JVM में एक नया स्थानीय Flink क्लस्टर शुरू करने और इस स्थानीय Flink क्लस्टर पर आवेदन शुरू होगा। स्थानीय फ्लिंक इंस्टेंस में बहुत सारी ढेर जगह लगेगी और कुछ क्लस्टर शुरू होने के बाद, शायद OutOfMemoryError के कारण कर्मचारी मर जाएगा जो आप नहीं करना चाहते हैं।

+0

क्या आने वाले डेटा के जवाब में गतिशील रूप से धाराएं बनाना संभव है? –

+0

आप 'FlatMapFunction' को कार्यान्वित कर सकते हैं जो पहुंचने वाले रिकॉर्ड के आधार पर डेटा को गतिशील रूप से पढ़ता और निकालता है। उदाहरण के लिए, यदि आपके पास फ़ाइल नामों वाला एक स्ट्रीम है, तो 'FlatMapFunction' आप उन फ़ाइलों को खोल सकते हैं और अपना डेटा उत्सर्जित कर सकते हैं। हालांकि, सभी रिकॉर्ड के आउटपुट प्रकार समान होना चाहिए। साथ ही, ईवेंट-टाइम प्रोसेसिंग सेमेन्टिक्स को सही चुनना चुनौतीपूर्ण हो सकता है, लेकिन यह गतिशील रूप से जोड़े गए स्रोतों की एक सामान्य समस्या है। –

+0

@ फ़ैबियन ह्यूसेके मैं एक समान उपयोग केस को हल कर रहा हूं। तो अगर मुझे FlatMapFunction का उपयोग करना है तो हमें सामान्य फ़ाइल एपीआई का उपयोग स्कैला/जावा से फ़ाइल को पढ़ना होगा और फ्लिंक के रीडटेक्स्टफाइल का उपयोग नहीं करना होगा। कारण हम flatMap के अंदर StreamExecutionEnvironment का उपयोग नहीं कर सकते हैं। क्या मेरी समझ सही है? –

संबंधित मुद्दे