2013-08-27 35 views
9

मैं अभी उठ गया और कफका 0.8 बीटा के साथ दौड़ रहा हूं 1. मेरे पास वास्तव में एक सरल उदाहरण है और चल रहा है, समस्या यह है कि मैं केवल एक संदेश उपभोक्ता को काम करने के लिए प्राप्त कर सकता हूं, कई नहीं। यही है, runSingleWorker() विधि वर्क्स। रन() विधि काम नहीं करता:काफ्का: एकाधिक स्ट्रीम उपभोक्ता बनाएं

import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.consumer.ConsumerConfig; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.Map; 
import java.util.List; 
import java.util.HashMap; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ExecutorService; 

import org.springframework.context.ApplicationContext; 
import org.springframework.context.annotation.AnnotationConfigApplicationContext; 

import com.truecar.inventory.worker.core.application.config.AppConfig; 

public class ConsumerThreadPool { 

    private final ConsumerConnector consumer; 
    private final String topic; 

    private ExecutorService executor; 
    private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); 

    public ConsumerThreadPool(String topic) { 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig")); 
     this.topic = topic; 
    } 

    public void shutdown() { 
     if (consumer != null) consumer.shutdown(); 
     if (executor != null) executor.shutdown(); 
    } 

    public void run(Integer numThreads) { 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 

     topicCountMap.put(topic, numThreads); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     List<KafkaStream<byte[], byte[]>> topicListeners = consumerMap.get(topic); 

     executor = Executors.newFixedThreadPool(numThreads); 

     for(Integer i = 0; i < numThreads; i++){ 
      KafkaStream<byte[], byte[]> stream = topicListeners.get(i); 
      executor.submit(new Consumer(stream, i)); 
     } 
    } 


    public void runSingleWorker(Integer numThreads) { 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 

     topicCountMap.put(topic, new Integer(1)); 

     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 

     KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
     ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
     while(true) { 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      while(it.hasNext()){ 
       System.out.println(new String(it.next().message())); 

      } 
     } 
    } 
} 

और मेरे खिलौना उपभोक्ता के अंदर:

Created iterator empty iterator thread number 3 
Created iterator empty iterator thread number 6 
Created iterator empty iterator thread number 9 
Created iterator empty iterator thread number 7 
Created iterator empty iterator thread number 0 
Created iterator empty iterator thread number 0 
Created iterator empty iterator thread number 8 
Created iterator empty iterator thread number 3 
etc... 

:

import kafka.consumer.KafkaStream; 
import kafka.consumer.ConsumerIterator; 

public class Consumer implements Runnable { 

    private KafkaStream kafkaStream; 
    private Integer threadNumber; 

    public Consumer(KafkaStream kafkaStream, Integer threadNumber) { 
     this.threadNumber = threadNumber; 
     this.kafkaStream = kafkaStream; 
    } 

    public void run() { 
     ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); 
     System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber); 
     while(true) { 

      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       break; 
      } 

      while(it.hasNext()) { 
       System.out.println("Thread " + threadNumber + ": " + new String(it.next().message())); 
      } 
     } 
     System.out.println("Shutting down Thread: " + threadNumber); 
    } 
} 

समस्या है, श्रमिकों के पूल संदेशों लेने नहीं करता है जब मैं उपज कमांड लाइन के माध्यम से संदेश जोड़ता हूं, तो संदेशों को एकल थ्रेडेड वर्कर संस्करण के तहत मुद्रित किया जाता है, लेकिन बहु-स्ट्रीम स्थिति के तहत संदेशों को मुद्रित नहीं किया जाता है। यहाँ क्या चल रहा है? मैं इसे कैसे ठीक करूं?

बीटीडब्ल्यू, kafka 0.8 के लिए pom.xml एक मान्य पोम नहीं है और निर्भरता प्राप्त नहीं करेगा, इसलिए यहां पूर्ण निर्भरताओं वाला एक पोम है।

<?xml version="1.0" encoding="UTF-8"?> 
<project 
xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation=" 
    http://maven.apache.org/POM/4.0.0 
    http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 
<groupId>group1</groupId> 
<artifactId>artifact1</artifactId> 
<version>0.1.0</version> 
<packaging>jar</packaging> 
<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <org.springframework.version>3.2.4.RELEASE</org.springframework.version> 
</properties> 
<dependencies> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-core</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-context</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.9.2</artifactId> 
     <version>0.8.0-beta1</version> 
    </dependency> 
    <dependency> 
     <groupId>javax.inject</groupId> 
     <artifactId>javax.inject</artifactId> 
     <version>1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>2.9.2</version> 
    </dependency> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>com.yammer.metrics</groupId> 
     <artifactId>metrics-core</artifactId> 
     <version>2.2.0</version> 
    </dependency> 
</dependencies> 
<build> 
    <finalName>inventory-core</finalName> 
    <plugins> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-compiler-plugin</artifactId> 
      <version>3.0</version> 
      <configuration> 
       <source>1.7</source> 
       <target>1.7</target> 
      </configuration> 
     </plugin> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-jar-plugin</artifactId> 
      <configuration> 
       <archive> 
        <manifest> 
         <mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass> 
        </manifest> 
       </archive> 
      </configuration> 
     </plugin> 
     <plugin> 
      <groupId>org.dstovall</groupId> 
      <artifactId>onejar-maven-plugin</artifactId> 
      <version>1.4.4</version> 
      <executions> 
       <execution> 
        <configuration> 
         <onejarVersion>0.97</onejarVersion> 
         <classifier>onejar</classifier> 
        </configuration> 
        <goals> 
         <goal>one-jar</goal> 
        </goals> 
       </execution> 
      </executions> 
     </plugin> 
    </plugins> 
</build> 
<pluginRepositories> 
    <pluginRepository> 
     <id>onejar-maven-plugin.googlecode.com</id> 
     <url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url> 
    </pluginRepository> 
</pluginRepositories> 
</project> 
+0

आपके पास कितने विभाजन हैं? यदि आपके पास विषय के लिए केवल '1' विभाजन है .. आपके पास एक ही विभाजन से पढ़ने के लिए एकाधिक धागे नहीं हो सकते हैं .. यदि आप अधिक कार्यकर्ता धागे चाहते हैं, तो आपको अधिक विभाजन की आवश्यकता होगी .. –

उत्तर

1

शायद प्रश्नकर्ता के लिए बहुत देर हो चुकी है लेकिन अन्य डेवलपर्स के लिए उपयोगी हो सकती है। लगता है कि आपने उपभोक्ताओं के लिए केवल एक विभाजन का उपयोग किया है - यह गलत है। Documentation से उद्धरण:

यह अभी भी कई उपभोक्ता उदाहरणों से अधिक भार को संतुलित करता है के बाद से वहाँ कई विभाजन कर रहे हैं। ध्यान दें कि विभाजन से अधिक उपभोक्ता उदाहरण नहीं हो सकते हैं।

तो जब आप उपभोक्ताओं के बारे में सोचते हैं तो आपको विभाजनों द्वारा संदेशों को विभाजित करने के बारे में सोचना चाहिए। ज्यादातर मामलों में आपको कुछ उच्च स्तरीय समूहिंग का उपयोग करना चाहिए या इसे डिफ़ॉल्ट रूप से यादृच्छिक भी होना चाहिए।

संबंधित मुद्दे