2015-11-17 11 views
9

मेरे पास कोड पर कुछ जुनीट परीक्षण हैं जो एक कफका विषय का उपयोग करते हैं। नकली कफका विषयों मैंने कोशिश की है काम नहीं करते हैं और ऑनलाइन पाए गए उदाहरण बहुत पुराने हैं इसलिए वे 0.8.2.1 के साथ भी काम नहीं करते हैं। मैं 0.8.2.1 का उपयोग कर नकली कफका विषय कैसे बना सकता हूं?मैं जूनिट परीक्षणों के लिए नकली काफ्का विषय कैसे चालू कर सकता हूं?

स्पष्टीकरण के लिए: मैं मॉकिटो में हाथ से मजाक करने के बजाय वास्तविक उदाहरण के साथ परीक्षण करने के लिए विषय के वास्तविक एम्बेडेड उदाहरण का उपयोग करना चुन रहा हूं। ऐसा इसलिए है कि मैं परीक्षण कर सकता हूं कि मेरे कस्टम एन्कोडर्स और डिकोडर्स वास्तव में काम करते हैं और जब मैं वास्तविक कफका उदाहरण का उपयोग करने के लिए जाता हूं तो यह असफल नहीं होता है।

उत्तर

6

https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

यह उदाहरण नई 0.8.2.2 संस्करण में काम करने के लिए अद्यतन किया गया था। यहाँ Maven निर्भरता के साथ कोड SNIPPIT है:

pom.xml:

<dependencies> 
<dependency> 
    <groupId>junit</groupId> 
    <artifactId>junit</artifactId> 
    <version>4.12</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
</dependencies> 

KafkaProducerTest.java:

import java.nio.charset.StandardCharsets; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import org.I0Itec.zkclient.ZkClient; 
import org.junit.Test; 
import kafka.admin.TopicCommand; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.producer.KeyedMessage; 
import kafka.producer.Producer; 
import kafka.producer.ProducerConfig; 
import kafka.server.KafkaConfig; 
import kafka.server.KafkaServer; 
import kafka.utils.MockTime; 
import kafka.utils.TestUtils; 
import kafka.utils.TestZKUtils; 
import kafka.utils.Time; 
import kafka.utils.ZKStringSerializer$; 
import kafka.zk.EmbeddedZookeeper; 
import static org.junit.Assert.*; 

/** 
* For online documentation 
* see 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
*/ 
public class KafkaProducerTest { 

    private int brokerId = 0; 
    private String topic = "test"; 

    @Test 
    public void producerTest() throws InterruptedException { 

     // setup Zookeeper 
     String zkConnect = TestZKUtils.zookeeperConnect(); 
     EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); 
     ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$); 

     // setup Broker 
     int port = TestUtils.choosePort(); 
     Properties props = TestUtils.createBrokerConfig(brokerId, port, true); 

     KafkaConfig config = new KafkaConfig(props); 
     Time mock = new MockTime(); 
     KafkaServer kafkaServer = TestUtils.createServer(config, mock); 

     String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"}; 
     // create topic 
     TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments)); 

     List<KafkaServer> servers = new ArrayList<KafkaServer>(); 
     servers.add(kafkaServer); 
     TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000); 

     // setup producer 
     Properties properties = TestUtils.getProducerConfig("localhost:" + port); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     Producer producer = new Producer(producerConfig); 

     // setup simple consumer 
     Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1); 
     ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties)); 

     // send message 
     KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8)); 

     List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); 
     messages.add(data); 

     producer.send(scala.collection.JavaConversions.asScalaBuffer(messages)); 
     producer.close(); 

     // deleting zookeeper information to make sure the consumer starts from the beginning 
     // see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka 
     zkClient.delete("/consumers/group0"); 

     // starting consumer 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, 1); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
     ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 

     if(iterator.hasNext()) { 
      String msg = new String(iterator.next().message(), StandardCharsets.UTF_8); 
      System.out.println(msg); 
      assertEquals("test-message", msg); 
     } else { 
      fail(); 
     } 

     // cleanup 
     consumer.shutdown(); 
     kafkaServer.shutdown(); 
     zkClient.close(); 
     zkServer.shutdown(); 
    } 
} 

अपने mvn निर्भरता को देखना न भूलें: किसी भी विरोधी पुस्तकालयों के लिए पेड़।

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

एक अन्य विकल्प मैं में अपाचे क्यूरेटर उपयोग कर रहा है देख रहा हूँ: मैं SLF और log4j के लिए बहिष्करण जोड़ने के लिए किया था Is it possible to start a zookeeper server instance in process, say for unit tests?

<dependency> 
    <groupId>org.apache.curator</groupId> 
    <artifactId>curator-test</artifactId> 
    <version>2.2.0-incubating</version> 
    <scope>test</scope> 
</dependency> 

TestingServer zkTestServer; 

@Before 
public void startZookeeper() throws Exception { 
    zkTestServer = new TestingServer(2181); 
    cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000)); 
} 

@After 
public void stopZookeeper() throws IOException { 
    cli.close(); 
    zkTestServer.stop(); 
} 
+0

क्या आप संस्करण 0.11.0.2 के लिए काम कर रहे कोड प्रदान कर सकते हैं। उपरोक्त कोड काम नहीं कर रहा है – dhroove

2

क्या आपने मॉकिटो जैसे मॉकिंग फ्रेमवर्क का उपयोग करके कफका उपभोक्ता वस्तुओं का मज़ाक उड़ाया है?

+0

मैं नहीं बल्कि का एक नकली संस्करण होगा काफ्का इसलिए मुझे पता है निर्माता और उपभोक्ता इसके साथ काम कर रहे हैं। यहां कुछ और उदाहरण हैं (उदा: https://ransilberman.wordpress.com/2013/07/19/how-to-unit-test-kafka)। हालांकि, वे पुराने संस्करणों के लिए हैं, इसलिए यह अब 0.8.2.1 के साथ काम नहीं करता है। – Chip

संबंधित मुद्दे