RocketMQ学习笔记(二)
RocketMq基本使用
- 导入MQ客户端依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
- 消息发送者步骤
1.创建消息生产者producer,并制定生产者组名
2.指定NameServer地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
- 消息消费者步骤
1.创建消费者Consumer,并指定消费者组名
2.指定NameServer地址
3.订阅主题Topic和Tag
4.设置回调函数,处理信息
5.启动消费者consumer
1. 基本样例
消息发送
1. 发送同步消息
用于相应时间不敏感的场景
package com.example.rocketmq.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class BaseProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("baseMQ","A",("我是BaseMQ消息"+i).getBytes());
//5.发送消息 (同步消息)
SendResult result = producer.send(msg);
System.out.println(result);
}
//6.关闭生产者producer
producer.shutdown();
}
}
2. 发送异步消息
用于对相应时间敏感的场景
package com.example.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("baseGroupA");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("baseMQ","B",("我是异步消息"+i).getBytes());
//5.发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.print("发送成功:");
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.print("发送异常:");
System.out.println(throwable);
}
});
//防止发送过快而导致链接失败
Thread.sleep(1000L);
}
//6.关闭生产者producer
producer.shutdown();
}
}
3. 发送单向消息
对发送结果不关心的场景,如发送日志。
package com.example.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class OneWayProducer {
public static void main(String[] args) throws Exception{
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("baseMQ","A",("我是BaseMQ单向消息"+i).getBytes());
//5.发送消息
producer.sendOneway(msg);
}
//6.关闭生产者producer
producer.shutdown();
}
}
消息消费
1. 负载均衡模式(默认)
所有消息的消费均平分给消费者集群中订阅相同的的每一台机器
package com.example.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BaseGroup");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("baseMQ","A");
//4.设置回调函数,处理信息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
String msg = null;
try {
msg = new String(body,"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//5.启动消费者consumer
consumer.start();
}
}
2. 广播模式
所有消费者都能受到所有的消息,设置setMessageModel为MessageModel.BROADCASTING即可开启广播模式。
package com.example.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.io.UnsupportedEncodingException;
public class BroConsumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BaseGroup");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("baseMQ","B");
//设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置回调函数,处理信息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
String msg = null;
try {
msg = new String(body,"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//5.启动消费者consumer
consumer.start();
System.out.println("广播模式消费者启动成功!");
}
}
2.顺序消息
顺序消息消费主要是保证局部消息顺序。因为消费者是多线程消费borker中的所有队列,所以要保证消费者消费顺序,生产者保证顺序消息发送到同一个broker队列中(通过业务表示如orderID之类的),消费者保证每个broker队列都用一个线程来消费,这样就可以保证消息的顺序消费。以订单消息为例子:创建、支付、完成。

所以需要将生产者的消息按照业务表示分别发送到broker的同一个队列中去,而消费者对每个队列的消息用单线程去读取。这样就可以实现消息的顺序读取了。

代码实现:
- 创建一个订单流程pojo模拟订单发送流程
package com.example.rocketmq.pojo;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data
public class OrderStep {
private int orderId;
private String desc;
public OrderStep(int orderId, String desc){
this.orderId = orderId;
this.desc = desc;
}
//创建一系列订单模拟数据 直接用订单desc来模拟订单业务
public static List<OrderStep> buildOrders(){
//模拟消息流
List<OrderStep> orderSteps = new ArrayList<>();
//订单1 的流程
OrderStep order1Step1 = new OrderStep(134,"创建");
OrderStep order1Step2 = new OrderStep(134, "支付");
OrderStep order1Step3 = new OrderStep(134, "推送");
OrderStep order1Step4 = new OrderStep(134, "完成");
//订单2 的流程
OrderStep order2Step1 = new OrderStep(135,"创建");
OrderStep order2Step2 = new OrderStep(135, "支付");
OrderStep order2Step3 = new OrderStep(135, "推送");
OrderStep order2Step4 = new OrderStep(135, "完成");
//订单3 的流程
OrderStep order3Step1 = new OrderStep(136,"创建");
OrderStep order3Step2 = new OrderStep(136, "支付");
OrderStep order3Step3 = new OrderStep(136, "推送");
//模拟发送队列中 3条订单消息交叉发送
// 订单1创建
orderSteps.add(order1Step1);
//订单2创建
orderSteps.add(order2Step1);
//订单3创建
orderSteps.add(order3Step1);
//订单1支付
orderSteps.add(order1Step2);
//订单2支付
orderSteps.add(order2Step2);
//订单2推送
orderSteps.add(order2Step3);
//订单1推送
orderSteps.add(order1Step3);
//订单1完成
orderSteps.add(order1Step4);
//订单3支付
orderSteps.add(order3Step2);
//订单2完成
orderSteps.add(order2Step4);
//订单3推送
orderSteps.add(order3Step3);
return orderSteps;
}
}
- 消息生产者发送顺序消息
package com.example.rocketmq.producer;
import com.example.rocketmq.pojo.OrderStep;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("OrderGroup");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
List<OrderStep> orderSteps = OrderStep.buildOrders();
for (OrderStep orderStep : orderSteps) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("Order", "shop", orderStep.toString().getBytes());
//5.发送顺序消息
/**
* 参数msg:消息体
* 参数MessageQueueSelector :消息队列选择器
* 参数arg:业务参数
*/
producer.send(msg, new MessageQueueSelector() {
/**
* @param list 返回的所有的broker消息队列
* @param message 传入的消息对象
* @param arg 传入的业务参数
* @return 选择哪个消息队列来发送
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
//写选择逻辑
OrderStep orderStep = (OrderStep) arg;
//纯数字orderId 可根据余数来选择哪个 队列发送消息
int index = orderStep.getOrderId() % list.size();
System.out.println("orderId:" + orderStep.getOrderId() + "desc:" + orderStep.getDesc() + " 处于队列:" + index);
return list.get(index);
}
}, orderStep);
Thread.sleep(1000L);
}
//6.无消息发送则关闭生产者producer
producer.shutdown();
}
}
- 消息消费者顺序消费消息
package com.example.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws Exception {
//1.创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderGroup");
//2.指定NameServer
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅topic和tag (*为订阅所有tag,tag为*时无法顺序消费消息)
consumer.subscribe("Order", "shop");
//4.设置监听器,处理消息 监听器用这个MessageListenerOrderly保证同一个队列单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt messageExt : list) {
System.out.println("消费者线程【" + Thread.currentThread().getName() + "】消费消息:" + new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.启动消费者
consumer.start();
System.out.println("消费者已启动!");
}
}
3.延时消息
- 延迟消息发送只需要设置延迟级别即可
message.setDelayTimeLevel(3) - 设置延时级别只能在这下面些中选 1级别对应
1s以此类推 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
package com.example.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class DelayProducer {
public static void main(String[] args) throws Exception {
//1.创建生产者
DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
//2.链接Nameserver
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建发送消息,并指定topic,tag
Message msg = new Message("delay","tagA",("我是第"+i+"条延迟消息"+" 消息发送时间为: "+System.currentTimeMillis()).getBytes());
//设置延时级别 3 为延迟10s
msg.setDelayTimeLevel(3);
//5.发送消息
producer.send(msg);
}
//6.无消息发送,则关闭生产者
producer.shutdown();
}
}
- 消费者消费
- 测试发现设定延时级别为
10s时,落盘时间 - 发送时间 = 9s,消费者消费的当前时间 - 发送时间 = 10s
package com.example.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("baseGroup");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("delay","tagA");
//4.设置回调函数,处理信息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt messageExt : list) {
byte[] body = messageExt.getBody();
String msg = null;
try {
msg = new String(body,"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(msg+ " 落盘时间为:"+(messageExt.getStoreTimestamp())+" 时间差:"+ (System.currentTimeMillis() - Long.parseLong(msg.substring(msg.length()-13)))/1000);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//5.启动消费者consumer
consumer.start();
System.out.println("启动成功!");
}
}
- 输出结果

4.批量消息
- 生产者将发送消息保存到 List
中,直接发送List即可 - 一般来说批量消息发送 消息大小不大于4m,大于的时候可以对消息分割后再分别发送
package com.example.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("baseGroup");
//2.指定NameServer地址
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动producer
producer.start();
List<Message> msgs = new ArrayList<>();
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg1 = new Message("batch","A",("我是批量消息A").getBytes());
Message msg2 = new Message("batch","A",("我是批量消息B").getBytes());
Message msg3 = new Message("batch","A",("我是批量消息C").getBytes());
msgs.add(msg1);
msgs.add(msg2);
msgs.add(msg3);
//5.发送消息 批量发送消息
SendResult result = producer.send(msgs);
System.out.println(result);
//6.关闭生产者producer
producer.shutdown();
}
}
- 消息消费和普通一样消费即可
5.过滤消息
消息消费的时候可以对消息进行过滤来,消费特定的消息。

在订阅时,
subExpression就时过滤条件,一般可以通过tags来过滤,MessageSelector.byTagsconsumer.subscribe("batch","tagA || tagB || tagC"); //或者 consumer.subscribe("batch",MessageSelector.byTags("tagA || tagB || tagC"));通过
sql语法来过滤特定信息,MessageSelector.bySql- 注意:报错:The broker does not support consumer to filter message by SQL92 时,请修改
broker.conf配置文件为enablePropertyFilter=true
consumer.subscribe("baseMQ", MessageSelector.bySql("i is not null and i < 3 and name = 'tangseng'"));- 注意:报错:The broker does not support consumer to filter message by SQL92 时,请修改
生产者发送消息时 在消息中设置过滤参数
for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 Message msg = new Message("baseMQ",("我是BaseMQ消息"+i).getBytes()); msg.putUserProperty("i",i+""); //设置过滤参数 if(i < 5){ msg.putUserProperty("name","tangseng"); } //5.发送消息 SendResult result = producer.send(msg); System.out.println(result); }
6.事务消息
- 事务消息流程

- 生产者模拟发送 三条消息
package com.example.rocketmq.producer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TranProducer {
public static void main(String[] args) throws Exception {
//1.创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("Transaction");
//2.设置NameServer
producer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.启动生产者
producer.start();
//设置回调监听者
producer.setTransactionListener(new TransactionListener() {
/**
* 本地事务处理
* @param message 发送的消息体
* @param o 发送消息传入的参数
* @return 消息事务状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//模拟 事务三种状态 Tag A的消息 commit ,Tag B的消息 rollback ,Tag C的消息unknow
if (message.getTags().equals("A")) {
return LocalTransactionState.COMMIT_MESSAGE;
}
if (message.getTags().equals("B")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
/**
* 不明事务状态的回调 查询事务状态
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息事务状态不明,主动检测事务状态");
System.out.println(messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
String[] tags = {"A", "B", "C"};
for (int i = 0; i < 3; i++) {
//4.创建消息 并指定 topic tag
Message message = new Message("Tran", tags[i], ("我是事务消息 " + i).getBytes());
//5.发送消息
producer.sendMessageInTransaction(message, null);
Thread.sleep(4000L);
}
}
}
- 生产者发送结果

- 消费者消费
package com.example.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("baseGroup");
//2.指定NameServer地址
consumer.setNamesrvAddr("192.168.2.107:9876;192.168.2.105:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("Tran", "*");
//4.设置回调函数,处理信息
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt messageExt : list) {
System.out.println(new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//5.启动消费者consumer
consumer.start();
System.out.println("启动成功!");
}
}
- 消费者消费结果

因为【我是事务消息 1】是Tag B,在生产者事务监听器中模拟 tagB 为事务回滚,所以 tag B的消息 由于rollback而被删除。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 tangseng233的日常!
评论
