के प्रत्येक विभाजन में कफका विषय के प्रत्येक विभाजन में कमेट्स और वर्तमान ऑफसेट की संख्या कैसे प्राप्त करें। मैं काफ्का का उपयोग कर रहा v0.8.1.1एक कफका विषय के प्रत्येक विभाजन में काम और ऑफसेट की संख्या
उत्तर
यह अपने प्रश्न, ऑफसेट की किस तरह आप में रुचि रखते हैं से स्पष्ट नहीं है वास्तव में ऑफसेट के तीन प्रकार हैं:।
- ऑफसेट विषय के विभाजन में पहले उपलब्ध संदेश का। GetOffsetShell टूल
- विषय के विभाजन में अंतिम उपलब्ध संदेश का ऑफ़सेट -(जल्द से जल्द) का उपयोग करें। -1 (नवीनतम) का उपयोग करें - समय पैरामीटर।
- अंतिम पढ़ने/संसाधित संदेश ऑफसेट कफका उपभोक्ता द्वारा बनाए रखा गया। उच्च स्तरीय उपभोक्ता जुकीपर (अलग-अलग उपभोक्ता समूह के लिए अलग-अलग) में स्टोर करता है और पर ध्यान देता है जब आप प्रतिबद्ध() या जब ऑटो-प्रतिबद्ध सेटिंग को सत्य पर सेट करते हैं, तो इसे अद्यतित रखते हैं। सरल उपभोक्ता के लिए, आपके कोड को ऑफसेट प्रबंधित करने के बारे में देखभाल करना है।
कमांड लाइन उपयोगिता के अतिरिक्त, # 1 और # 2 के लिए ऑफ़सेट जानकारी SimpleConsumer.earliestOrLatestOffset() के माध्यम से भी उपलब्ध है।
यदि संदेशों की संख्या बहुत बड़ी नहीं है, तो आप GetOffsetShell पर एक बड़े - ऑफसेट पैरामीटर निर्दिष्ट कर सकते हैं और फिर टूल द्वारा लौटाई गई लाइनों की संख्या गिन सकते हैं। अन्यथा, आप स्कैला/जावा में एक साधारण पाश लिख सकते हैं जो सभी उपलब्ध ऑफसेट को जल्द से जल्द शुरू कर देगा।
Get Offset Shell
get offsets for a topic
bin/kafka-run-class.sh kafka.tools.GetOffsetShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default)
--time <Long: timestamp in milliseconds/-1(latest)/-2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore >
--topic <topic> REQUIRED: The topic to get offsets from.
विषय की भरपाई और विभाजन आप kafka.tools.GetOffsetShell उपयोग कर सकते हैं के बारे में। उदाहरण इन कमांड (मैं विषय games
है) का उपयोग कर के लिए:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic games --time -1
मैं games:0:47841
जिसका अर्थ है विषय games
और 0
विभाजन के लिए मैं नवीनतम प्रयोग नहीं किया 47841
(नवीनतम उपलब्ध संदेश) ऑफसेट है कि मिल जाएगा।
आप पहले उपलब्ध संदेश को देखने के लिए -2
का उपयोग कर सकते हैं।
एक चेतावनी: लॉग कॉम्पैक्शन चालू होने पर यह सटीक नहीं होगा। – Meredith
क्या यह वही मूल्य प्रोग्रामेटिक रूप से प्राप्त करने का कोई तरीका है? –
यह जानकारी किसी विषय के लिए विभाजन (कमांड लाइन से) पर संदेशों की संख्या देखने के लिए स्क्रिप्ट बनाने में भी सहायक थी। जबकि काफ्का-वेब-कंसोल जैसे उपकरण अच्छे हैं, हम में से कुछ गैर-जीयूआई दुनिया में रहते हैं।
यहाँ स्क्रिप्ट ... इस्तेमाल होता है और अपने जोखिम पर संशोधित :-)
#!/bin/bash
topic=$1
if [[ -z "${topic}" ]] ; then
echo "Usage: ${0} <topic>"
exit 1
fi
if [[ -z "${KAFKA_HOME}" ]] ; then
# $KAFKA_HOME not set, using default /kafka
KAFKA_HOME="/kafka"
fi
if [ ! -d ${KAFKA_HOME} ] ; then
echo "\$KAFKA_HOME does not point to a valid directory [$KAFKA_HOME]"
exit 1
fi
cd $KAFKA_HOME
echo
echo "Topic: ${topic}: "
#
printf "Partition Count\n"
printf "~~~~~~~~~~ ~~~~~~~~~~~~\n"
idx=0
for msg in `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ${topic} --broker-list localhost:9092 --time -1` ; do
name=`echo ${msg} | awk -F ":" '{print $1}'`
partition=`echo ${msg} | awk -F ":" '{print $2}'`
total=`echo ${msg} | awk -F ":" '{print $3}'`
printf "%10d %12d\n" ${partition} ${total}
idx=$((idx + 1))
done
if [ ${idx} -eq 0 ] ; then
echo "Topic name not found!"
exit 1
fi
echo
exit ${rc}
प्रत्येक विभाजन का नवीनतम ऑफसेट जरूरी नहीं है कि लॉग प्रतिधारण के कारण प्रत्येक विभाजन में कितने संदेश वर्तमान में हैं। अगर लॉग कॉम्पैक्शन चालू नहीं है, तो आप नवीनतम से जल्द से जल्द ऑफसेट को घटाकर उस गिनती को प्राप्त कर सकते हैं। – Meredith
संस्करण 0.9.0.x शुरू आप kafka.admin.ConsumerGroupCommand उपकरण का उपयोग करने शुरू कर देना चाहिए ।नीचे तर्क है कि यह उपकरण एक consumerGroup_Y के लिए एक Topic_X के लिए
List all consumer groups, describe a consumer group, or delete consumer group info.
Option Description
------ -----------
--bootstrap-server <server to connect REQUIRED (only when using new-
to> consumer): The server to connect to.
--command-config <command config Property file containing configs to be
property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
Pass in groups with a single topic to
just delete the given topic's
partition offsets and ownership
information for the given consumer
groups. For instance --group g1 --
group g2 --topic t1
Pass in just a topic to delete the
given topic's partition offsets and
ownership information for every
consumer group. For instance --topic
t1
WARNING: Group deletion only works for
old ZK-based consumer groups, and
one has to use it carefully to only
delete groups that are not active.
--describe Describe consumer group and list
offset lag related to given group.
--group <consumer group> The consumer group we wish to act on.
--list List all consumer groups.
--new-consumer Use new consumer.
--topic <topic> The topic whose consumer group
information should be deleted.
--zookeeper <urls> REQUIRED (unless new-consumer is
used): The connection string for the
zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
ऑफसेट प्राप्त करने के लिए लेने के रूप में
नीचेbin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper <zookeeper urls> --describe --group consumerGroup_Y
रिस्पांस
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumerGroup, Topic_X, 0, 3030460, 3168412, 137952, none
consumerGroup, Topic_X, 1, 3030903, 3168884, 137981, none
consumerGroup, Topic_X, 2, 801564, 939540, 137976, none
consumerGroup, Topic_X, 3, 737290, 875262, 137972, none
consumerGroup, Topic_X, 4, 737288, 875254, 137966, none
consumerGroup, Topic_X, 5, 737276, 875241, 137965, none
consumerGroup, Topic_X, 6, 737290, 875251, 137961, none
consumerGroup, Topic_X, 7, 737290, 875248, 137958, none
consumerGroup, Topic_X, 8, 737288, 875246, 137958, none
consumerGroup, Topic_X, 9, 737293, 875251, 137958, none
consumerGroup, Topic_X, 10, 737289, 875244, 137955, none
consumerGroup, Topic_X, 11, 737273, 875226, 137953, none
क्या किसी ने अभी तक 'बिन/कफका-उपभोक्ता-समूह.एसएच' का उपयोग करके ऑफसेट को संशोधित करने के लिए नए कफका 0.10.एक्स के साथ '--command-config' के साथ संशोधित किया है? –
- 1. मुझे कफका विषय का नवीनतम ऑफसेट कैसे मिल सकता है?
- 2. काफ्का टॉपिक बनाम विभाजन विषय
- 3. कफका विषय के लिए विभाजन के लिए नवीनतम ऑफ़सेट कैसे प्राप्त करें?
- 4. क्यों मेरे कफका में एक विभाजन में संदेश है?
- 5. एचडीएफएस में कफका विषय कैसे लोड करें?
- 6. कैसे कफका विभाजन विभाजन को संतुलित करता है?
- 7. क्या कफका 0.8.2
- 8. कफका
- 9. कफका
- 10. एक काफ्का विषय से दूसरे काफ्का विषय
- 11. प्रत्येक पंक्ति की संख्या।
- 12. कफका
- 13. कफका
- 14. रूबी - प्रत्येक प्रारंभ ऑफसेट
- 15. एक समूह में कफका एकल उपभोक्ता विफलता
- 16. कफका पुनर्विक्रय क्या है?
- 17. कफका में दलालों पर विभाजन पुन: संतुलन 0.8
- 18. MYSQL - प्रत्येक तालिका में पंक्तियों की संख्या
- 19. सीटीई पंक्ति संख्या विभाजन
- 20. स्पार्क में विभाजन कैसे काम करता है?
- 21. स्पार्क: बिना किसी शफल के विभाजन की संख्या बढ़ाएं?
- 22. सूची में प्रत्येक आइटम की घटनाओं की संख्या की गणना
- 23. MapReduce में विभाजन कैसे काम करता है?
- 24. उद्देश्य क्षेत्र में समय क्षेत्र ऑफसेट संख्या?
- 25. कफका उपभोक्ता ऑफसेट्स सीमा से बाहर विभाजन के लिए कोई कॉन्फ़िगर रीसेट नीति नहीं
- 26. क्या एक उपभोक्ता के साथ एक कफका विषय के विभाजन के बीच संदेश क्रम को बनाए रखने का कोई तरीका है?
- 27. Plinq की रेंज विभाजन बनाम खंड विभाजन?
- 28. खोजें कितनी बार एन और एम के बीच प्रत्येक संख्या अभाज्य संख्या की एक जोड़ी की राशि
- 29. कफका घटना प्रणाली के रूप में
- 30. ऑफसेट शीर्ष काम नहीं कर में आईओएस
कैसा दिखेगा आदेश का उपयोग मैं दो कार्यक्षमता की जरूरत है 1. प्रत्येक विभाजन के लिए अंतराल की निगरानी करें 2. पूर्ण सिस्टम रीबूट (ज़ूकीपर, ब्रोकर, निर्माता और उपभोक्ता) के मामले में, मैं अंतिम रीसा से कैसे फिर से शुरू कर सकता हूं डी/संसाधित संदेश ऑफसेट हाई लेवल उपभोक्ता –