- Producer
- Consumer
- GroupConsumer
1、MqProducer.java
package com.neohope.kafka.test; import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MqProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "com.neohope.kafka.test.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for(int i=0;i<100;i++) { KeyedMessage<String, String> data = new KeyedMessage<String, String>("neoTopic", "key"+i, "value"+i); producer.send(data); } KeyedMessage<String, String> data = new KeyedMessage<String, String>("neoTopic", "-=END=-", "-=END=-"); producer.send(data); producer.close(); } }
2、SimplePartitioner.java
package com.neohope.kafka.test; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; } }
3、测试
#开启zookeeper bin\windows\zookeeper-server-start.bat config\zookeeper.properties #开启kafka-server bin\windows\kafka-server-start.bat config\server.properties #开启一个consumer bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic neoTopic --from-beginning #运行MqProducer.Main即可