Java类:team.bangbang.common.queue.QueueProxy

操作队列的代理,用这个类来获取队列的Publisher,登记队列的Subscriber。 这里的队列使用发布/订阅模式,支持RabbitMQ、Kafka两种队列产品。使用RabbitMQ还是Kafka,由项目的配置决定,根据不同的配置,QueueProxy针对相应队列产品创建Publisher、处理Subscriber。

开发人员使用QueueProxy时,除了在配置文件中配置队列产品及参数外,在代码中不需要关注RabbitMQ还是Kafka。

Java类:team.bangbang.common.queue.Publisher

队列发布者,详情参见Java doc文档。

Java类:team.bangbang.common.queue.Message

队列消息体,详情参见Java doc文档。

Java类:team.bangbang.common.queue.Subscriber

队列订阅者,详情参见Java doc文档。

一、配置说明

使用rabbit MQ:

# 队列配置
mq:
  # 产品选项,可选:rabbit/kafka
  product: "rabbit"
  # rabbitmq配置
  # amqp://" + username + ":" + password + "@" + hostName + ":" + portNumber
  rabbitmq:
    uri: "amqp://guest:guest@127.0.0.1:5672"

使用Kafka:

# 队列配置
mq:
  # 产品选项,可选:rabbit/kafka
  product: "kafka"
  # kafka配置
  kafka:
    bootstrap-servers: "10.76.2.150:9092,10.76.2.150:9093,10.76.2.150:9094"

二、使用示例

发布订阅模式,发布消息:

QueueProxy proxy = new QueueProxy();

// 创建发布者,使用topic:member_register
Publisher p = proxy.createPublisher("member_register");

for(int i = 0; i < 100; i++) {
    Message<String> msg = new Message<String>();

    // 按照日志规范,填充数据
    msg.setSender((i + 1) + ". 书店静默注册");
    msg.setType((i + 1) + ". 注册");
    msg.setData((i + 1) + ". 爱唱歌的小狐狸");

    // 发送消息
    p.send(msg);
}

发布订阅模式,订阅消息:

QueueProxy proxy = new QueueProxy();

// 创建订阅者,使用topic:member_register
Subscriber subscriber = new Subscriber("member_register", 1) {
    /**
     * 实现订阅的回调方法
     *
     * @param json 消息数据Message对象序列化形成的JSON数据
     *
     * @return 消费是否成功。队列自动ack,因此此结果暂时没有意义
     */
    public boolean consume(JSONObject json) {
        try {
            logger.info("Subscriber received: " + json);

        } catch (Exception ex) {
            json.put("ERROR_MESSAGE", ex.getMessage());
            logger.error(String.valueOf(json));
        }

        return true;
    }
};

// 订阅消息
proxy.subscribe(subscriber);