2016-12-08 4 views
6

मेरी स्ट्रीम में 'श्रेणी' नामक एक कॉलम है और मेरे पास प्रत्येक स्टोर में प्रत्येक श्रेणी के लिए अतिरिक्त स्थिर मेटाडेटा है, यह हर दो दिनों में एक बार अपडेट हो जाता है। इस लुकअप को करने का सही तरीका क्या है? दो विकल्प काफ्का काफ्का स्ट्रीम के बाहर स्थिरलुकअप डेटा के साथ केस्ट्रीम को समृद्ध करने का आदर्श तरीका

  1. लोड डेटा धाराओं और बस मेटाडाटा को जोड़ने के लिए KStreams#map() का उपयोग के साथ कर रहे हैं। यह संभव है क्योंकि कफका स्ट्रीम सिर्फ एक पुस्तकालय है।

  2. मेटाडाटा को एक कफका विषय में लोड करें, इसे KTable पर लोड करें और KStreams#leftJoin() करें, यह अधिक प्राकृतिक लगता है और विभाजन को छोड़कर कफका स्ट्रीम में छोड़ देता है। हालांकि, हमें सभी मूल्यों के साथ KTable लोड करने की आवश्यकता है। ध्यान दें कि हमें पूरे लुकअप डेटा को लोड करना होगा, न केवल परिवर्तनों को।

    • उदाहरण के लिए, शुरुआत में केवल एक श्रेणी 'सी 1' थी। कफका स्ट्रीम ऐप को बंद कर दिया गया था, और फिर से शुरू किया गया। पुनरारंभ करने के बाद, एक नई श्रेणी 'सी 2' जोड़ा गया था। मेरी धारणा यह है कि, तालिका = KStreamBuilder() तालिका ('मेटाडाटाटॉपिक') के पास मूल्य 'c2' होगा, क्योंकि यह एकमात्र चीज थी जो ऐप दूसरी बार शुरू होने के बाद बदल गई थी। मैं चाहता हूं कि यह 'सी 1' और 'सी 2' हो।
    • यदि इसमें 'सी 1' भी है, तो डेटा को कभी भी केटीबल से हटा दिया जाएगा (शायद कुंजी = शून्य संदेश भेजकर?)?

ऊपर से कौन सा देखने के लिए मेटाडाटा सही तरीका है?

क्या पुनरारंभ पर शुरुआत से ही पढ़ने के लिए केवल एक स्ट्रीम को मजबूर करना संभव है, ऐसा इसलिए है कि सभी मेटाडेटा को KTable में लोड किया जा सकता है।

क्या स्टोर का उपयोग करने का कोई और तरीका है?

उत्तर

3

आपका समग्र अवलोकन सही है और यह इस बात पर निर्भर करता है कि आपके लिए कौन सा ट्रेडऑफ अधिक महत्वपूर्ण है। यदि आप मेटाडेटा छोटा है, तो विकल्प 1 बेहतर होता है। अगर मेटाडाटा बड़ा है, तो ऐसा लगता है कि विकल्प 2 जाने का तरीका है।

यदि आप map() का उपयोग करते हैं, तो आपको प्रत्येक एप्लिकेशन इंस्टेंस में अपने मेटाडेटा की पूरी प्रति प्राप्त करने की आवश्यकता है (जैसा कि आप नहीं जानते कि स्ट्रीम आपको KStream डेटा कैसे विभाजित करेगा)। इस प्रकार, यदि आपका मेटाडाटा map() का उपयोग करके मुख्य-स्मृति में फिट नहीं होता है तो आसानी से काम नहीं करेगा।

यदि आप KTable का उपयोग करते हैं, तो स्ट्रीम इस बात का ख्याल रखेंगे कि सभी चल रहे एप्लिकेशन उदाहरणों पर मेटाडेटा सही ढंग से शेड किया गया है, जैसे कोई डेटा डुप्लिकेशन आवश्यक नहीं है। इसके अलावा, KTable राज्य स्टोर इंजन के रूप में RocksDB का उपयोग करता है और इस प्रकार डिस्क पर फैल सकता है।

संपादित शुरू

KTable में सभी डेटा होने के बारे में: यदि आप एक ही कुंजी के लिए दो श्रेणियों है, दूसरा मान पहला मान को अधिलेखित कर देगा अगर आप एक KTable के माध्यम से में विषय से डेटा सीधे पढ़ा builder.table(...) (चेंजलॉग अर्थशास्त्र)। हालांकि, आप इस विषय को रिकॉर्ड स्ट्रीम (यानी builder.stream(...)) के रूप में पढ़कर आसानी से काम कर सकते हैं और KTable की गणना करने के लिए एकत्रीकरण लागू कर सकते हैं। आपका एकत्रीकरण प्रत्येक कुंजी के लिए सभी मानों की एक सूची को छोड़ देगा।

हटाने के बारे में: KTable चेंजलॉग अर्थशास्त्र का उपयोग करता है और कुंजी-मूल्य जोड़े को हटाने के लिए कबूतर संदेश को समझता है। इस प्रकार, यदि आप किसी विषय से KTable पढ़ते हैं और विषय में <key:null> संदेश होता है, तो इस कुंजी के साथ KTable में वर्तमान रिकॉर्ड हटा दिया जाएगा। यह प्राप्त करना कठिन होता है जब KTable एकत्रीकरण का परिणाम होता है, क्योंकि null कुंजी या null मान के साथ एकत्रीकरण इनपुट रिकॉर्ड को अनदेखा कर दिया जाएगा और एकत्रीकरण परिणाम अपडेट नहीं किया जाएगा। आपके मामले में, आप इसे एक null-category कह सकते हैं -

वैकल्पिक हल एकत्रीकरण से पहले एक map() कदम जोड़ सकते हैं और एक NULL मूल्य (यानी, एक उपयोगकर्ता परिभाषित "वस्तु" है कि समाधि का पत्थर का प्रतिनिधित्व करता है, लेकिन null नहीं है लागू करने के लिए किया जाएगा)। आपके एकत्रीकरण में, यदि आप इनपुट रिकॉर्ड null-category मान के रूप में हैं तो आप केवल null मान को एजगेशन परिणाम के रूप में वापस कर दें। इसके बाद आपके KTable के लिए एक टॉम्बस्टोन संदेश में अनुवाद किया जाएगा और इस कुंजी के लिए श्रेणियों की वर्तमान सूची हटा दी जाएगी।

संपादित अंत

और निश्चित रूप से आप हमेशा प्रोसेसर एपीआई के माध्यम से एक कस्टम समाधान का निर्माण कर सकते हैं। हालांकि, अगर डीएसएल आपको चाहें तो आपको यह करने का कोई अच्छा कारण नहीं है।

+0

पूरे लुकअप डेटा को लोड करने के लिए उदाहरणों के साथ प्रश्न को अपडेट किया गया। मैंने गलत समझा होगा कि केटीबल क्या रखता है, आपके अपडेट की प्रतीक्षा करेगा)। –

+0

धन्यवाद। मैंने अपना जवाब अपडेट किया। –

6
  1. लोड काफ्का स्ट्रीम की और बाहर स्थिर डेटा बस KStreams # नक्शा() का उपयोग मेटाडाटा जोड़ने के लिए। यह संभव है क्योंकि कफका स्ट्रीम सिर्फ एक पुस्तकालय है।

यह काम करता है। लेकिन आमतौर पर लोग आपके द्वारा सूचीबद्ध अगले विकल्प का चयन करते हैं, क्योंकि इनपुट डेटा को समृद्ध करने के लिए साइड डेटा आमतौर पर पूरी तरह स्थिर नहीं होता है; बल्कि, यह बदल रहा है लेकिन कुछ हद तक बार बार:

  1. लोड एक काफ्का विषय से मेटाडाटा, एक KTable करने के लिए इसे लोड और कर KStreams # leftJoin(), यह और अधिक प्राकृतिक लगता है और कफका स्ट्रीम में विभाजित आदि छोड़ देता है। हालांकि, इसके लिए हमें सभी मूल्यों के साथ लोड किए गए केटीबल को रखने की आवश्यकता है। ध्यान दें कि हमें पूरे लुकअप डेटा को लोड करना होगा, न केवल परिवर्तनों को।

यह सामान्य दृष्टिकोण है, और मैं इसे करने के लिए छड़ी करने के लिए जब तक आप एक विशेष कारण नहीं करने के लिए है की सलाह देते हैं।

हालांकि, इसके लिए हमें सभी मूल्यों के साथ केटीबल लोड करने की आवश्यकता है। ध्यान दें कि हमें पूरे लुकअप डेटा को लोड करना होगा, न केवल परिवर्तनों को।

तो मुझे लगता है कि आप दूसरा विकल्प भी पसंद करते हैं, लेकिन आप इस बारे में चिंतित हैं कि यह कुशल है या नहीं।

संक्षिप्त उत्तर है: हां, केटीबल प्रति कुंजी के सभी (नवीनतम) मानों के साथ लोड किया जाएगा। तालिका में संपूर्ण लुकअप डेटा होगा, लेकिन ध्यान रखें कि केटीबल दृश्यों के पीछे विभाजित है: यदि, उदाहरण के लिए, आपके इनपुट विषय (तालिका के लिए) में विभाजन हैं, तो आप अपने आवेदन के उदाहरण चला सकते हैं , जिनमें से प्रत्येक तालिका के 1 विभाजन प्राप्त कर रहा है (माना जाता है कि डेटा विभाजन में समान रूप से फैलता है, फिर तालिका के प्रत्येक विभाजन/साझा तालिका के डेटा के 1/3 में होगा)।तो अभ्यास में यह "बस काम करता है" की तुलना में अधिक संभावना है।

क्या पुनरारंभ पर शुरुआत से ही पढ़ने के लिए केवल एक स्ट्रीम को मजबूर करना संभव है, ऐसा इसलिए है कि सभी मेटाडेटा को केटेबल में लोड किया जा सकता है।

आपको इसके बारे में चिंता करने की आवश्यकता नहीं है। सीधे शब्दों में कहें, यदि उपलब्ध तालिका का कोई स्थानीय "प्रतिलिपि" नहीं है, तो स्ट्रीम एपीआई स्वचालित रूप से यह सुनिश्चित करेगी कि तालिका का डेटा पूरी तरह से स्क्रैच से पढ़ा जाए। यदि कोई स्थानीय प्रति उपलब्ध है, तो आपका एप्लिकेशन उस प्रतिलिपि का फिर से उपयोग करेगा (और तालिका के इनपुट विषय में जब भी नया डेटा उपलब्ध होता है तो इसकी स्थानीय प्रति अपडेट करें)। अपने KTable के लिए, ध्यान दें कि यह कैसे इनपुट 6 संदेशों के होते हैं: उदाहरण के साथ

लंबे समय तक जवाब

कल्पना कीजिए निम्नलिखित इनपुट डेटा (बदलाव का धारा लगता है):

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22) 

और यहाँ विभिन्न राज्यों है "तार्किक" KTable का परिणाम जो इस इनपुट से होगा, जहां प्रत्येक नए प्राप्त इनपुट संदेश (जैसे (alice, 1)) के परिणामस्वरूप तालिका का एक नया राज्य होगा:

Key  Value 
-------------- 
alice | 1 // (alice, 1) received 

| 
V 

Key  Value 
-------------- 
alice | 1 
bob  | 40 // (bob, 40) received 

| 
V 

Key  Value 
-------------- 
alice | 2 // (alice, 2) received 
bob  | 40 

| 
V 

Key  Value 
-------------- 
alice | 2 
bob  | 40 
charlie | 600 // (charlie, 600) received 

| 
V 

Key  Value 
-------------- 
alice | 5 // (alice, 5) received 
bob  | 40 
charlie | 600 

| 
V 

Key  Value 
-------------- 
alice | 5 
bob  | 22 // (bob, 22) received 
charlie | 600 

आप यहां क्या देख सकते हैं कि, हालांकि इनपुट डेटा में कई संदेश हो सकते हैं (या "परिवर्तन" जैसा आपने कहा था; यहां, हमारे पास 6 है), जिसके परिणामस्वरूप KTable (जो नए प्राप्त इनपुट के आधार पर निरंतर उत्परिवर्तन से गुजर रहा है) में प्रविष्टियों/पंक्तियों की संख्या इनपुट में अद्वितीय कुंजी की संख्या है (यहां: 1 से शुरू होकर, रैंपिंग), जो आम तौर पर संदेशों की संख्या से काफी कम है। इसलिए, यदि इनपुट में संदेशों की संख्या N है और इन संदेशों के लिए अद्वितीय कुंजी की संख्या M है, तो आमतौर पर M << N (MN से काफी छोटा है; इसके अलावा, रिकॉर्ड के लिए, हमारे पास invariant M <= N है)।

यह पहला कारण है कि "हमें सभी मूल्यों के साथ लोड किए गए केटीबल को रखने की आवश्यकता है" आमतौर पर एक मुद्दा नहीं है, क्योंकि केवल नवीनतम मान प्रति कुंजी बनाए रखा जाता है।

दूसरा कारण यह है कि मदद करता है कि, मैथियस जे। सैक्स ने इंगित किया है कि, काफ्का स्ट्रीम इस तरह के तालिकाओं के लिए डिफ़ॉल्ट स्टोरेज इंजन के रूप में रॉक्स डीबी का उपयोग करता है (अधिक सटीक: राज्य एक दुकान को वापस रखता है)। RocksDB आपको उन तालिकाओं को बनाए रखने की अनुमति देता है जो आपके एप्लिकेशन के उपलब्ध मुख्य मेमोरी/जावा हीप स्पेस से बड़े हैं क्योंकि यह स्थानीय डिस्क तक फैल सकता है।

आखिरकार, तीसरा कारण यह है कि KTable विभाजन किया गया है। इसलिए, यदि तालिका के लिए आपका इनपुट विषय विभाजनों के साथ कॉन्फ़िगर किया गया है, तो दृश्यों के पीछे क्या हो रहा है यह है कि KTable स्वयं ही विभाजन (सोचें: sharded) को उसी तरह से विभाजित किया जाता है।उपर्युक्त उदाहरण में, यहां आप क्या समाप्त कर सकते हैं, हालांकि सटीक "विभाजन" इस बात पर निर्भर करता है कि मूल इनपुट डेटा तालिका के इनपुट विषय के विभाजन में कैसे फैलता है:

लॉजिकल केटीबल (आखिरी स्थिति I ऊपर से पता चला है):

Key  Value 
-------------- 
alice | 5 
bob  | 22 
charlie | 600 

वास्तविक KTable, (तालिका के इनपुट विषय के लिए 3 विभाजन, प्लस कुंजी संभालने विभाजित = उपयोगकर्ता नाम विभाजन भर में समान रूप से फैल जा रहा है):

Key  Value 
-------------- 
alice | 5 // Assuming that all data for `alice` is in partition 1 

Key  Value 
-------------- 
bob  | 22 // ...for `bob` is in partition 2 

Key  Value 
-------------- 
charlie | 600 // ...for `charlie` is in partition 3 

अभ्यास में, इस विभाजन inpu टी डेटा - अन्य चीजों के साथ - आपको केटीबल के वास्तविक अभिव्यक्तियों को "आकार" करने की अनुमति देता है।

एक और उदाहरण:

  • अपने KTable के नवीनतम राज्य की कल्पना होगा आम तौर पर है 1 टीबी (फिर से, अनुमानित आकार मेज के इनपुट डेटा में अद्वितीय संदेश कुंजियों की संख्या की एक समारोह है की एक आकार, संबंधित संदेश मान के औसत आकार से गुणा)।
  • यदि तालिका के इनपुट विषय में केवल 1 विभाजन है, तो केटीबल के पास 1 टीबी के आकार के साथ केवल 1 विभाजन भी है। यहां, क्योंकि इनपुट विषय में 1 विभाजन है, तो आप अपने आवेदन को 1 ऐप उदाहरणों के साथ चला सकते हैं (इसलिए वास्तव में समांतरता का एक बहुत कुछ नहीं, हे)।
  • यदि तालिका के इनपुट विषय में 500 विभाजन हैं, तो केटीबल में 500 विभाजन भी हैं, प्रत्येक ~ 2 जीबी के आकार के साथ (मानते हैं कि डेटा समान रूप से विभाजन में फैला हुआ है)। यहां, आप अपने आवेदन को 500 ऐप उदाहरणों के साथ चला सकते हैं। यदि आप वास्तव में 500 उदाहरण चला रहे थे, तो प्रत्येक ऐप इंस्टेंस को तार्किक केटीबल के 1 विभाजन/शेड मिलेगा, इस प्रकार 2 जीबी टेबल डेटा समाप्त हो जाएगा; यदि आप केवल 100 उदाहरणों को चलाने के लिए थे, तो प्रत्येक उदाहरण तालिका के तालिका के 2 GB * 5 = 10 GB के साथ समाप्त होने पर तालिका के विभाजन/shards 500/100 = 5 प्राप्त होगा।
+1

क्या होता है जब इनपुट स्ट्रीम में कई विभाजन होते हैं, मेटाडाटा स्ट्रीम में केवल एक विभाजन होता है, और ऐप के कई उदाहरण होते हैं? क्या ऐप का प्रत्येक उदाहरण मेटाडेटा स्ट्रीम लोड करेगा, या उनमें से एक इसे लोड करेगा और अन्य किसी भी तरह से उस उदाहरण से मूल्य प्राप्त करेंगे? –

संबंधित मुद्दे