项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参参考如下pom.xml;其中各个依赖包必须版本协调一致。

   
4.0.0
   
com.test
   
test-kafka
   
jar
   
test-kafka
   
http://maven.apache.org
1.0.0
   
   
log4j
log4j
1.2.14
   
org.apache.kafka
kafka_2.8.0
0.8.0-beta1
log4j
log4j
org.scala-lang
scala-library
2.8.1
com.yammer.metrics
metrics-core
2.2.0
com.101tec
zkclient
0.3
   
   
   
test-kafka-1.0
       
           
               
src/main/resources
               
true
           
       
       
           
               
maven-compiler-plugin
               
2.3.2
               
                   
1.5                    
1.5
                   
gb2312
               
           
           
               
maven-resources-plugin
               
2.2
               
                   
gbk
               
           
       
   

四.Producer端代码

1) producer.properties文件:此文件放在/resources目录下

#partitioner.class= metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093 ##,127.0.0.1:9093 producer.type=sync compression.codec=0 serializer.class=kafka.serializer.StringEncoder ##在producer.type=async时有效 #batch.num.messages=100

2) LogProducer.java代码样例

package com.test.kafka; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class LogProducer {
private Producer
inner; public LogProducer() throws Exception{
Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer
(config); } public void send(String topicName,String message) {
if(topicName == null || message == null){
return; } KeyedMessage
km = new KeyedMessage
(topicName,message); inner.send(km); } public void send(String topicName,Collection
messages) {
if(topicName == null || messages == null){
return; } if(messages.isEmpty()){
return; } List
> kms = new ArrayList
>(); for(String entry : messages){ KeyedMessage
km = new KeyedMessage
(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } /** * @param args */ public static void main(String[] args) { LogProducer producer = null; try{ producer = new LogProducer(); int i=0; while(true){ producer.send("test-topic", "this is a sample" + i); i++; Thread.sleep(2000); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){ producer.close(); } } } }

五.Consumer端

1) consumer.properties:文件位于/resources目录下

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 ##,127.0.0.1:2182,127.0.0.1:2183 # timeout in ms for connecting to zookeeper zookeeper.connectiontimeout.ms=1000000 #consumer group id group.id=test-group #consumer timeout #consumer.timeout.ms=5000

2) LogConsumer.java代码样例

package com.test.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class LogConsumer {
private ConsumerConfig config; private String topic; private int partitionsNum; private MessageExecutor executor; private ConsumerConnector connector; private ExecutorService threadPool; public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties")); config = new ConsumerConfig(properties); this.topic = topic; this.partitionsNum = partitionsNum; this.executor = executor; } public void start() throws Exception{
connector = Consumer.createJavaConsumerConnector(config); Map
topics = new HashMap
(); topics.put(topic, partitionsNum); Map
>> streams = connector.createMessageStreams(topics); List
> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum); for(KafkaStream
partition : partitions){ threadPool.execute(new MessageRunner(partition)); } }     public void close(){ try{ threadPool.shutdownNow(); }catch(Exception e){ // }finally{ connector.shutdown(); } } class MessageRunner implements Runnable{ private KafkaStream
partition; MessageRunner(KafkaStream
partition) { this.partition = partition; } public void run(){ ConsumerIterator
it = partition.iterator(); while(it.hasNext()){ MessageAndMetadata
item = it.next(); System.out.println("partiton:" + item.partition()); System.out.println("offset:" + item.offset()); executor.execute(new String(item.message()));//UTF-8 } } } interface MessageExecutor { public void execute(String message); } /** * @param args */ public static void main(String[] args) { LogConsumer consumer = null; try{ MessageExecutor executor = new MessageExecutor() { public void execute(String message) { System.out.println(message); } }; consumer = new LogConsumer("test-topic", 2, executor); consumer.start(); }catch(Exception e){ e.printStackTrace(); }finally{ // if(consumer != null){ // consumer.close(); // } } } }

在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。