2014-12-16 9 views
12

के प्रत्येक विभाजन में कफका विषय के प्रत्येक विभाजन में कमेट्स और वर्तमान ऑफसेट की संख्या कैसे प्राप्त करें। मैं काफ्का का उपयोग कर रहा v0.8.1.1एक कफका विषय के प्रत्येक विभाजन में काम और ऑफसेट की संख्या

उत्तर

8

यह अपने प्रश्न, ऑफसेट की किस तरह आप में रुचि रखते हैं से स्पष्ट नहीं है वास्तव में ऑफसेट के तीन प्रकार हैं:।

  1. ऑफसेट विषय के विभाजन में पहले उपलब्ध संदेश का। GetOffsetShell टूल
  2. विषय के विभाजन में अंतिम उपलब्ध संदेश का ऑफ़सेट -(जल्द से जल्द) का उपयोग करें। -1 (नवीनतम) का उपयोग करें - समय पैरामीटर।
  3. अंतिम पढ़ने/संसाधित संदेश ऑफसेट कफका उपभोक्ता द्वारा बनाए रखा गया। उच्च स्तरीय उपभोक्ता जुकीपर (अलग-अलग उपभोक्ता समूह के लिए अलग-अलग) में स्टोर करता है और पर ध्यान देता है जब आप प्रतिबद्ध() या जब ऑटो-प्रतिबद्ध सेटिंग को सत्य पर सेट करते हैं, तो इसे अद्यतित रखते हैं। सरल उपभोक्ता के लिए, आपके कोड को ऑफसेट प्रबंधित करने के बारे में देखभाल करना है।

कमांड लाइन उपयोगिता के अतिरिक्त, # 1 और # 2 के लिए ऑफ़सेट जानकारी SimpleConsumer.earliestOrLatestOffset() के माध्यम से भी उपलब्ध है।

यदि संदेशों की संख्या बहुत बड़ी नहीं है, तो आप GetOffsetShell पर एक बड़े - ऑफसेट पैरामीटर निर्दिष्ट कर सकते हैं और फिर टूल द्वारा लौटाई गई लाइनों की संख्या गिन सकते हैं। अन्यथा, आप स्कैला/जावा में एक साधारण पाश लिख सकते हैं जो सभी उपलब्ध ऑफसेट को जल्द से जल्द शुरू कर देगा।

From Kafka documentation:

Get Offset Shell 
get offsets for a topic 
bin/kafka-run-class.sh kafka.tools.GetOffsetShell 

required argument [broker-list], [topic] 
Option Description 
------ ----------- 
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to. 
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000) 
--offsets <Integer: count> number of offsets returned (default: 1) 
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default) 
--time <Long: timestamp in milliseconds/-1(latest)/-2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore > 
--topic <topic> REQUIRED: The topic to get offsets from. 
+0

कैसा दिखेगा आदेश का उपयोग मैं दो कार्यक्षमता की जरूरत है 1. प्रत्येक विभाजन के लिए अंतराल की निगरानी करें 2. पूर्ण सिस्टम रीबूट (ज़ूकीपर, ब्रोकर, निर्माता और उपभोक्ता) के मामले में, मैं अंतिम रीसा से कैसे फिर से शुरू कर सकता हूं डी/संसाधित संदेश ऑफसेट हाई लेवल उपभोक्ता –

7

विषय की भरपाई और विभाजन आप kafka.tools.GetOffsetShell उपयोग कर सकते हैं के बारे में। उदाहरण इन कमांड (मैं विषय games है) का उपयोग कर के लिए:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic games --time -1 

मैं games:0:47841 जिसका अर्थ है विषय games और 0 विभाजन के लिए मैं नवीनतम प्रयोग नहीं किया 47841 (नवीनतम उपलब्ध संदेश) ऑफसेट है कि मिल जाएगा।

आप पहले उपलब्ध संदेश को देखने के लिए -2 का उपयोग कर सकते हैं।

+0

एक चेतावनी: लॉग कॉम्पैक्शन चालू होने पर यह सटीक नहीं होगा। – Meredith

+0

क्या यह वही मूल्य प्रोग्रामेटिक रूप से प्राप्त करने का कोई तरीका है? –

1

यह जानकारी किसी विषय के लिए विभाजन (कमांड लाइन से) पर संदेशों की संख्या देखने के लिए स्क्रिप्ट बनाने में भी सहायक थी। जबकि काफ्का-वेब-कंसोल जैसे उपकरण अच्छे हैं, हम में से कुछ गैर-जीयूआई दुनिया में रहते हैं।

यहाँ स्क्रिप्ट ... इस्तेमाल होता है और अपने जोखिम पर संशोधित :-)

#!/bin/bash 

topic=$1 

if [[ -z "${topic}" ]] ; then 

    echo "Usage: ${0} <topic>" 
    exit 1 

fi 


if [[ -z "${KAFKA_HOME}" ]] ; then 

    # $KAFKA_HOME not set, using default /kafka 
    KAFKA_HOME="/kafka" 

fi 

if [ ! -d ${KAFKA_HOME} ] ; then 

    echo "\$KAFKA_HOME does not point to a valid directory [$KAFKA_HOME]" 
    exit 1 

fi 

cd $KAFKA_HOME 

echo 
echo "Topic: ${topic}: " 

# 
printf "Partition Count\n" 
printf "~~~~~~~~~~ ~~~~~~~~~~~~\n" 

idx=0 
for msg in `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ${topic} --broker-list localhost:9092 --time -1` ; do 

    name=`echo ${msg} | awk -F ":" '{print $1}'` 
    partition=`echo ${msg} | awk -F ":" '{print $2}'` 
    total=`echo ${msg} | awk -F ":" '{print $3}'` 

    printf "%10d %12d\n" ${partition} ${total} 
    idx=$((idx + 1)) 

done 

if [ ${idx} -eq 0 ] ; then 

    echo "Topic name not found!" 
    exit 1 

fi 

echo 
exit ${rc} 
+0

प्रत्येक विभाजन का नवीनतम ऑफसेट जरूरी नहीं है कि लॉग प्रतिधारण के कारण प्रत्येक विभाजन में कितने संदेश वर्तमान में हैं। अगर लॉग कॉम्पैक्शन चालू नहीं है, तो आप नवीनतम से जल्द से जल्द ऑफसेट को घटाकर उस गिनती को प्राप्त कर सकते हैं। – Meredith

3

संस्करण 0.9.0.x शुरू आप kafka.admin.ConsumerGroupCommand उपकरण का उपयोग करने शुरू कर देना चाहिए ।नीचे तर्क है कि यह उपकरण एक consumerGroup_Y के लिए एक Topic_X के लिए

List all consumer groups, describe a consumer group, or delete consumer group info. 
Option         Description 
------         ----------- 
--bootstrap-server <server to connect REQUIRED (only when using new- 
    to>          consumer): The server to connect to. 
--command-config <command config  Property file containing configs to be 
    property file>       passed to Admin Client and Consumer. 
--delete        Pass in groups to delete topic 
              partition offsets and ownership 
              information over the entire consumer 
              group. For instance --group g1 -- 
              group g2 
             Pass in groups with a single topic to 
              just delete the given topic's 
              partition offsets and ownership 
              information for the given consumer 
              groups. For instance --group g1 -- 
              group g2 --topic t1 
             Pass in just a topic to delete the 
              given topic's partition offsets and 
              ownership information for every 
              consumer group. For instance --topic 
              t1 
             WARNING: Group deletion only works for 
              old ZK-based consumer groups, and 
              one has to use it carefully to only 
              delete groups that are not active. 
--describe        Describe consumer group and list 
              offset lag related to given group. 
--group <consumer group>    The consumer group we wish to act on. 
--list         List all consumer groups. 
--new-consumer       Use new consumer. 
--topic <topic>       The topic whose consumer group 
              information should be deleted. 
--zookeeper <urls>      REQUIRED (unless new-consumer is 
              used): The connection string for the 
              zookeeper connection in the form 
              host:port. Multiple URLS can be 
              given to allow fail-over. 

ऑफसेट प्राप्त करने के लिए लेने के रूप में

नीचे
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper <zookeeper urls> --describe --group consumerGroup_Y 

रिस्पांस

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER 
consumerGroup, Topic_X, 0, 3030460, 3168412, 137952, none 
consumerGroup, Topic_X, 1, 3030903, 3168884, 137981, none 
consumerGroup, Topic_X, 2, 801564, 939540, 137976, none 
consumerGroup, Topic_X, 3, 737290, 875262, 137972, none 
consumerGroup, Topic_X, 4, 737288, 875254, 137966, none 
consumerGroup, Topic_X, 5, 737276, 875241, 137965, none 
consumerGroup, Topic_X, 6, 737290, 875251, 137961, none 
consumerGroup, Topic_X, 7, 737290, 875248, 137958, none 
consumerGroup, Topic_X, 8, 737288, 875246, 137958, none 
consumerGroup, Topic_X, 9, 737293, 875251, 137958, none 
consumerGroup, Topic_X, 10, 737289, 875244, 137955, none 
consumerGroup, Topic_X, 11, 737273, 875226, 137953, none 
+0

क्या किसी ने अभी तक 'बिन/कफका-उपभोक्ता-समूह.एसएच' का उपयोग करके ऑफसेट को संशोधित करने के लिए नए कफका 0.10.एक्स के साथ '--command-config' के साथ संशोधित किया है? –

संबंधित मुद्दे