मुझे लगता है कि निम्नलिखित करता अपाचे Flink का उपयोग कर एक नमूना आवेदन का निर्माण करने की कोशिश कर रहा हूँ एक स्ट्रीमिंग डेटा स्रोत बनाने के लिए:अपाचे Flink एक डेटा स्ट्रीम से उपयोग मान डायनामिक
- स्टॉक प्रतीक की एक धारा पढ़ता है (उदाहरण के लिए एक काफ्का कतार से 'सीएससीओ', 'एफबी')।
- प्रत्येक प्रतीक के लिए वर्तमान कीमतों का रीयल-टाइम लुकअप करता है और डाउनस्ट्रीम प्रोसेसिंग के मानों को स्ट्रीम करता है।
* मूल पोस्ट *
को अद्यतन मैं एक अलग वर्ग में नक्शा समारोह चले गए और रन-टाइम त्रुटि संदेश "MapFunction के कार्यान्वयन किसी भी अधिक serializable नहीं है नहीं मिलता है। ऑब्जेक्ट में शायद गैर धारावाहिक फ़ील्ड "शामिल या संदर्भित है।
अब जिस मुद्दे का सामना कर रहा हूं वह यह है कि काफ्का विषय "स्टॉकप्रिसेस" मैं कीमतों को लिखने की कोशिश कर रहा हूं, उन्हें प्राप्त नहीं कर रहा है। मैं परेशानी शूट करने की कोशिश कर रहा हूं और कोई अपडेट पोस्ट करूंगा।
public class RetrieveStockPrices {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "stocks");
DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties));
DataStream<String> stockPrice =
streamOfStockSymbols
//get unique keys
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String trend) throws Exception {
return trend;
}
})
//collect events over a window
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
//return the last event from the window...all elements are the same "Symbol"
.apply(new WindowFunction<String, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
out.collect(input.iterator().next().toString());
}
})
.map(new StockSymbolToPriceMapFunction());
streamExecEnv.execute("Retrieve Stock Prices");
}
}
public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
@Override
public String map(String stockSymbol) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);
DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));
return "100000";
}
private static class CustomKeySelector implements KeySelector<String, String> {
@Override
public String getKey(String arg0) throws Exception {
return arg0.trim();
}
}
}
public class LookupStockPrice extends RichSourceFunction<String> {
public String stockSymbol = null;
public boolean isRunning = true;
public LookupStockPrice(String inSymbol) {
stockSymbol = inSymbol;
}
@Override
public void open(Configuration parameters) throws Exception {
isRunning = true;
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void run(SourceFunction.SourceContext<String> ctx)
throws Exception {
String stockPrice = "0";
while (isRunning) {
//TODO: query Google Finance API
stockPrice = Integer.toString((new Random()).nextInt(100)+1);
ctx.collect(stockPrice);
Thread.sleep(10000);
}
}
}
क्या आने वाले डेटा के जवाब में गतिशील रूप से धाराएं बनाना संभव है? –
आप 'FlatMapFunction' को कार्यान्वित कर सकते हैं जो पहुंचने वाले रिकॉर्ड के आधार पर डेटा को गतिशील रूप से पढ़ता और निकालता है। उदाहरण के लिए, यदि आपके पास फ़ाइल नामों वाला एक स्ट्रीम है, तो 'FlatMapFunction' आप उन फ़ाइलों को खोल सकते हैं और अपना डेटा उत्सर्जित कर सकते हैं। हालांकि, सभी रिकॉर्ड के आउटपुट प्रकार समान होना चाहिए। साथ ही, ईवेंट-टाइम प्रोसेसिंग सेमेन्टिक्स को सही चुनना चुनौतीपूर्ण हो सकता है, लेकिन यह गतिशील रूप से जोड़े गए स्रोतों की एक सामान्य समस्या है। –
@ फ़ैबियन ह्यूसेके मैं एक समान उपयोग केस को हल कर रहा हूं। तो अगर मुझे FlatMapFunction का उपयोग करना है तो हमें सामान्य फ़ाइल एपीआई का उपयोग स्कैला/जावा से फ़ाइल को पढ़ना होगा और फ्लिंक के रीडटेक्स्टफाइल का उपयोग नहीं करना होगा। कारण हम flatMap के अंदर StreamExecutionEnvironment का उपयोग नहीं कर सकते हैं। क्या मेरी समझ सही है? –