2016-09-05 11 views
5

मैं एक जावा एप्लिकेशन के लिए बहुत सारे काफ्का दस्तावेज देख रहा हूं जिस पर मैं काम कर रहा हूं। मैंने जावा 8 में पेश किए गए लैम्ब्डा सिंटैक्स में आने का प्रयास किया है, लेकिन मैं उस जमीन पर थोड़ा स्केची हूं और मुझे बहुत भरोसा नहीं है कि यह अभी तक उपयोग किया जाना चाहिए।प्रिंट कफका स्ट्रीम इनपुट कंसोल के लिए बाहर?

मेरे पास कोई कफका/जुकीपर सेवा बिना किसी परेशानी के चल रही है, और मैं जो करना चाहता हूं वह एक छोटा सा उदाहरण प्रोग्राम लिखता है जो इनपुट के आधार पर इसे लिखता है, लेकिन शब्दकोष नहीं करता क्योंकि कई उदाहरण हैं पहले से ही

उदाहरण डेटा

This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC. 

प्रश्न

मैं 3 पत्र कीवर्ड निकालें और उन्हें मुद्रित करने के लिए सक्षम होना चाहते हैं:

नमूना डेटा के लिए के रूप में मैं निम्नलिखित संरचना के एक स्ट्रिंग हो रही होगी System.out.println के साथ। मैं इनपुट युक्त एक स्ट्रिंग चर कैसे प्राप्त करूं? मुझे पता है कि नियमित अभिव्यक्तियों को कैसे लागू किया जाए या यहां तक ​​कि केवल कीवर्ड प्राप्त करने के लिए स्ट्रिंग के माध्यम से खोजना।

कोड

public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id"); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092"); 
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181"); 
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

    final Serde<String> stringSerde = Serdes.String(); 

    KStreamBuilder builder = new KStreamBuilder(); 

    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream"); 

    KafkaStreams streams = new KafkaStreams(builder, props); 
    streams.start(); 

    //How do I assign the input from in-stream to the following variable? 
    String variable = ? 
} 

मैं चिड़ियाघर संचालक, काफ्का, उत्पादक और उपभोक्ता सभी एक ही विषय को झुका चल रहा है, इसलिए मैं मूल रूप से एक ही String उदाहरणों (निर्माता, उपभोक्ता और धारा के सभी पर प्रकट देखना चाहते हैं)।

उत्तर

11

यदि आप काफ्का स्ट्रीम का उपयोग करते हैं, तो आपको अपने डेटा स्ट्रीम पर फ़ंक्शंस/ऑपरेटरों को लागू करने की आवश्यकता है। आपके मामले में, आप KStream ऑब्जेक्ट बनाते हैं, इस प्रकार, आप source पर ऑपरेटर को लागू करना चाहते हैं।

आप जो करना चाहते हैं उसके आधार पर ऑपरेटरों को स्वतंत्र रूप से स्ट्रीम में प्रत्येक रिकॉर्ड में एक फ़ंक्शन लागू होता है (उदाहरण के लिए map()), या अन्य ऑपरेटरों जो एक साथ कई रिकॉर्ड में फ़ंक्शन लागू करते हैं (उदाहरण के लिए aggregateByKey())। आप प्रलेखन में एक देखो होना चाहिए: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl और उदाहरण https://github.com/confluentinc/examples/tree/kafka-0.10.0.0-cp-3.0.0/kafka-streams

इस प्रकार, आप कभी नहीं काफ्का धाराओं का इस्तेमाल करके स्थानीय चर बनाने के रूप में आप ऊपर अपने उदाहरण में दिखाने के लिए, बल्कि ऑपरेटरों/कार्य है कि एक साथ श्रृंखलित हो में सब कुछ को एम्बेड।

उदाहरण के लिए, यदि आप stdout में सभी इनपुट रिकॉर्ड प्रिंट करना चाहते हैं, तो आप

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream"); 
source.foreach(new ForeachAction<String, String>() { 
    void apply(String key, String value) { 
     System.out.println(key + ": " + value); 
    } 
}); 

इस प्रकार यह होगा उपभोक्ता आप से रिकॉर्ड इनपुट विषय और प्रत्येक के लिए कर सकता है, के बाद आप streams.start() के माध्यम से अपने आवेदन शुरू, आपके विषय का रिकॉर्ड, apply(...) पर कॉल किया गया है, जो स्टडआउट पर रिकॉर्ड प्रिंट करता है।

बेशक, कंसोल के लिए धारा मुद्रण के लिए एक अधिक देशी तरीका source.print() उपयोग करने के लिए किया जाएगा (जो आंतरिक रूप से मूल रूप से एक पहले से ही दिए गए ForeachAction के साथ दिखाया गया foreach() ऑपरेटर के रूप में एक ही है।)

के साथ अपने उदाहरण के लिए स्थानीय चर के लिए स्ट्रिंग को असाइन करना, आपको अपना कोड apply(...) में रखना होगा और "रेगेक्स-सामान इत्यादि" को "3 अक्षर कीवर्ड निकालने" के लिए करना होगा।

इसे व्यक्त करने का सबसे अच्छा तरीका, हालांकि flatMapValues() और print() (यानी source.flatMapValues(...).print()) के संयोजन के माध्यम से होगा।प्रत्येक इनपुट रिकॉर्ड के लिए flatMapValues() कहा जाता है (आपके मामले में, मुझे लगता है कि कुंजी null होगी ताकि आप इसे अनदेखा कर सकें)। अपने flatMapValue फ़ंक्शन के भीतर, आप अपना रेगेक्स लागू करते हैं और प्रत्येक मैच के लिए, आप मैच को उन मूल्यों की सूची में जोड़ते हैं जिन्हें आप अंततः वापस करते हैं।

source.flatMapValues(new ValueMapper<String, Iterable<String>>() { 
    @Override 
    public Iterable<String> apply(String value) { 
     ArrayList<String> keywords = new ArrayList<String>(); 

     // apply regex to value and for each match add it to keywords 

     return keywords; 
    } 
} 

flatMapValues के उत्पादन में एक KStream फिर से किया जाएगा, प्रत्येक पाया कीवर्ड (यानी, उत्पादन धारा सभी सूचियों ValueMapper#apply() में अपनी वापसी पर एक "यूनियन" है) के लिए एक रिकॉर्ड से युक्त। अंत में, आप print() के माध्यम से अपने परिणाम को कंसोल पर प्रिंट करें। (बेशक, आप flatMapValue + print के बजाय एक foreach का भी उपयोग कर सकते हैं लेकिन यह कम मॉड्यूलर होगा।)

+0

वाह। महान उत्तर साथी। यह वही है जिसे मैं देख रहा था! – Zeliax

+0

वाईडब्ल्यू। फॉरच लूप के अंत में लापता होने के लिए स्वतंत्र महसूस करें :) –

+1

')। – asitm9

संबंधित मुद्दे