Java类:team.bangbang.common.queue.QueueProxy
操作队列的代理,用这个类来获取队列的Publisher,登记队列的Subscriber。 这里的队列使用发布/订阅模式,支持RabbitMQ、Kafka两种队列产品。使用RabbitMQ还是Kafka,由项目的配置决定,根据不同的配置,QueueProxy针对相应队列产品创建Publisher、处理Subscriber。
开发人员使用QueueProxy时,除了在配置文件中配置队列产品及参数外,在代码中不需要关注RabbitMQ还是Kafka。
Java类:team.bangbang.common.queue.Publisher
队列发布者,详情参见Java doc文档。
Java类:team.bangbang.common.queue.Message
队列消息体,详情参见Java doc文档。
Java类:team.bangbang.common.queue.Subscriber
队列订阅者,详情参见Java doc文档。
一、配置说明
使用rabbit MQ:
# 队列配置
mq:
# 产品选项,可选:rabbit/kafka
product: "rabbit"
# rabbitmq配置
# amqp://" + username + ":" + password + "@" + hostName + ":" + portNumber
rabbitmq:
uri: "amqp://guest:guest@127.0.0.1:5672"
使用Kafka:
# 队列配置
mq:
# 产品选项,可选:rabbit/kafka
product: "kafka"
# kafka配置
kafka:
bootstrap-servers: "10.76.2.150:9092,10.76.2.150:9093,10.76.2.150:9094"
二、使用示例
发布订阅模式,发布消息:
QueueProxy proxy = new QueueProxy();
// 创建发布者,使用topic:member_register
Publisher p = proxy.createPublisher("member_register");
for(int i = 0; i < 100; i++) {
Message<String> msg = new Message<String>();
// 按照日志规范,填充数据
msg.setSender((i + 1) + ". 书店静默注册");
msg.setType((i + 1) + ". 注册");
msg.setData((i + 1) + ". 爱唱歌的小狐狸");
// 发送消息
p.send(msg);
}
发布订阅模式,订阅消息:
QueueProxy proxy = new QueueProxy();
// 创建订阅者,使用topic:member_register
Subscriber subscriber = new Subscriber("member_register", 1) {
/**
* 实现订阅的回调方法
*
* @param json 消息数据Message对象序列化形成的JSON数据
*
* @return 消费是否成功。队列自动ack,因此此结果暂时没有意义
*/
public boolean consume(JSONObject json) {
try {
logger.info("Subscriber received: " + json);
} catch (Exception ex) {
json.put("ERROR_MESSAGE", ex.getMessage());
logger.error(String.valueOf(json));
}
return true;
}
};
// 订阅消息
proxy.subscribe(subscriber);