2015-11-16 7 views
5

के साथ लंबे समय तक रहने वाला राज्य बस प्रोग्रामिंग मॉडल के आसपास अपना सिर प्राप्त करने का प्रयास कर रहा है। परिदृश्य है कि मैं एक वेब फोरम के लिए उपकरण विश्लेषिकी के लिए पब/उप + डेटाफ्लो का उपयोग कर रहा हूं। मैं पब से आने वाले डेटा की एक धारा/उप कि तरह दिखता है:Google डेटाफ्लो

ID | TS | EventType 
1 | 1 | Create 
1 | 2 | Comment 
2 | 2 | Create 
1 | 4 | Comment 

और मैं ऐसा दिखता है जैसे एक धारा Dataflow से आने के साथ खत्म करना चाहते हैं:

ID | TS | num_comments 
1 | 1 | 0 
1 | 2 | 1 
2 | 2 | 0 
1 | 4 | 2 

मैं काम करना चाहता है कि क्या यह रोलअप स्ट्रीम प्रक्रिया के रूप में चलाने के लिए करता है, नई घटनाओं के रूप में नई संख्याओं के रूप में आबादी आ रही है। मेरा सवाल यह है कि, वर्तमान विषय आईडी और टिप्पणी की गणना के लिए राज्य को स्टोर करने के लिए नौकरी के लिए बेवकूफ जगह कहां है? यह मानते हुए कि विषय वर्षों से जी सकते हैं। वर्तमान विचार कर रहे हैं:

  • बिगटेबल के लिए और एक DoFn क्वेरी क्या विषय आईडी के लिए वर्तमान टिप्पणियों की संख्या में आ रहा है में विषय आईडी के लिए एक 'वर्तमान' प्रविष्टि लिखें यहां तक ​​कि के रूप में मैं यह लिख मैं नहीं एक हूँ। पंखा।
  • किसी भी तरह से साइड इनपुट का उपयोग करें? ऐसा लगता है कि यह जवाब है, लेकिन यदि ऐसा है तो मैं पूरी तरह समझ नहीं पा रहा हूं।
  • एक वैश्विक विंडो के साथ एक स्ट्रीमिंग नौकरी सेट करें, हर बार जब यह रिकॉर्ड हो जाता है तो एक ट्रिगर होता है, और पूरे फलक इतिहास को कहीं भी रखने के लिए डेटाफ्लो पर भरोसा करता है। (असीम भंडारण आवश्यकता?)

संपादित करें: बस, स्पष्ट करने के लिए मैं किसी भी मुसीबत इन तीन रणनीतियों, या यह करने का एक लाख विभिन्न अन्य तरीकों में से किसी को लागू नहीं होगा, मैं क्या कर रहा हूँ में अधिक रुचि डेटाफ्लो के साथ ऐसा करने के लिए सर्वोत्तम तरीका। विफलता के लिए सबसे लचीला क्या होगा, बैकफिल आदि के लिए इतिहास को पुन: संसाधित करना आदि।

EDIT2: वर्तमान में डेटाफ्लो सेवा के साथ एक बग है जहां फ़्लैटन रूपांतरण में इनपुट जोड़ने पर अपडेट विफल हो जाते हैं, जिसका अर्थ होगा यदि आप नौकरी में बदलाव करते हैं तो आपको नौकरी में अर्जित किसी भी राज्य को त्यागना और पुनर्निर्माण करना होगा जिसमें फ़्लैटन ऑपरेशन में कुछ जोड़ना शामिल है।

उत्तर

7

आप इसे पूरा करने के लिए ट्रिगर और एक गठबंधन का उपयोग करने में सक्षम होना चाहिए।

PCollection<ID> comments = /* IDs from the source */; 
PCollection<KV<ID, Long>> commentCounts = comments 
    // Produce speculative results by triggering as data comes in. 
    // Note that this won't trigger after *every* element, but it will 
    // trigger relatively quickly (as the system divides incoming data 
    // into work units). You could also throttle this with something 
    // like: 
    // AfterProcessingTime.pastFirstElementInPane() 
    //  .plusDelayOf(Duration.standardMinutes(5)) 
    // which will produce output every 5 minutes 
    .apply(Window.triggering(
      Repeatedly.forever(AfterPane.elementCountAtLeast(1))) 
     .accumulatingFiredPanes()) 
    // Count the occurrences of each ID 
    .apply(Count.perElement()); 

// Produce an output String -- in your use case you'd want to produce 
// a row and write it to the appropriate source 
commentCounts.apply(new DoFn<KV<ID, Long>, String>() { 
    public void processElement(ProcessContext c) { 
    KV<ID, Long> element = c.element(); 
    // This includes details about the pane of the window being 
    // processed, and including a strictly increasing index of the 
    // number of panes that have been produced for the key.   
    PaneInfo pane = c.pane(); 
    return element.key() + " | " + pane.getIndex() + " | " + element.value(); 
    } 
}); 

अपने डेटा के आधार पर, आप भी पूरे टिप्पणियां स्रोत से, पढ़ सकता है आईडी प्राप्त है, और फिर Count.perKey() का उपयोग प्रत्येक आईडी के लिए मायने रखता है पाने के लिए। यदि आप एक अधिक जटिल संयोजन चाहते हैं, तो आप कस्टम CombineFn को परिभाषित करने और Combine.perKey का उपयोग करके देख सकते हैं।

+0

सही, तो यह संभावित कार्यान्वयन की मेरी सूची में नंबर 3 है। मेरा सवाल है, क्या यह एक अच्छा विचार * है? यहां राज्य को डेटाफ्लो द्वारा निहित रूप से बनाए रखा जा रहा है। अगर मुझे नौकरी को फिर से शुरू करने की ज़रूरत है तो क्या होगा? ऐतिहासिक बैकफिल कैसे कार्यान्वित किया जाएगा? – bfabry

+1

आपके द्वारा किए गए परिवर्तनों के आधार पर, आप [मौजूदा पाइपलाइन अपडेट करें] (https://cloud.google.com/dataflow/pipelines/updating-a-pipeline) सक्षम कर सकते हैं। यदि परिवर्तन अधिक महत्वपूर्ण हैं, तो दृष्टिकोण का उल्लेख किया गया है यदि आप एक कस्टम स्रोत का उपयोग कर रहे हैं जो सभी पुराने डेटा को पढ़ने की अनुमति देता है। –

+0

बैकफिल आदि से निपटने के तरीके के रूप में एक कस्टम स्रोत एक दिलचस्प विचार है। ऐसा लगता है कि उस सवाल को हल करना प्रतीत होता है। क्या यह एक अच्छा विचार है कि वह राज्य है जो हमेशा के लिए बढ़ता है? क्या होगा यदि एक फोरम विषय बंद किया जा सकता है, क्या कहने का कोई तरीका है "अब ऐसी घटनाएं नहीं रहेंगी जिनकी हम इस आईडी के बारे में परवाह करते हैं" ताकि इसे त्याग दिया जा सके? – bfabry

2

के बाद से BigQuery अधिलेखन पंक्तियों का समर्थन नहीं करता, एक तरह से इस बारे में जाने के लिए COUNT का उपयोग कर डेटा BigQuery में घटनाओं लिखते हैं, और क्वेरी करने के लिए है:

चयन आईडी, COUNT टेबल ग्रुप से (NUM_COMMENTS) आईडी के आधार पर;

आप BigQuery में प्रविष्टियों को लिखने से पहले डेटाफ्लो के भीतर num_comments के प्रति-विंडो समेकन भी कर सकते हैं; ऊपर दी गई क्वेरी काम जारी रखेगी।

+0

इसे एक शॉट देने के लिए धन्यवाद :-)। समस्या के प्रयोजनों के लिए आप अनदेखा कर सकते हैं कि गंतव्य बीक्यू है। गंतव्य हालांकि ही संलग्न होना चाहिए। वास्तविक जीवन में गणना केवल एक योग की तुलना में अधिक जटिल है, और हम बीक्यू प्रश्नों के जटिल (और चलाने के लिए महंगा) की बजाय डेटाफ्लो में हमारे ईटीएल में व्यक्त करना पसंद करेंगे। साथ ही, यह समाधान हमें किसी विषय पर टिप्पणियों की संख्या का समय श्रृंखला इतिहास नहीं देगा। – bfabry

संबंधित मुद्दे