Java整合RabbitMQ实现生产消费(7种通讯方式)
【摘要】 环境说明RabbitMQ环境,参考RabbitMQ环境搭建Java版本:JDK1.8Maven版本:apache-maven-3.6.3开发工具:IntelliJ IDEA 工程搭建创建maven项目pom.xml文件引入RabbitMQ依赖 <dependencies> <dependency> <groupId>com.rabbitmq</gr...
环境说明
- RabbitMQ环境,参考RabbitMQ环境搭建
- Java版本:JDK1.8
- Maven版本:apache-maven-3.6.3
- 开发工具:IntelliJ IDEA
工程搭建
- 创建maven项目
- pom.xml文件引入RabbitMQ依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>compile</scope>
</dependency>
</dependencies>
连接RabbitMQ
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConnections {
public static final String RABBITMQ_HOST = "127.0.0.1";
public static final int RABBITMQ_PORT = 5672;
public static final String RABBITMQ_USERNAME = "guest";
public static final String RABBITMQ_PASSWORD = "guest";
public static final String RABBITMQ_VIRTUAL_HOST = "/";
/**
* 构建RabbitMQ连接对象
*
* @return
*/
public static Connection getConnection() throws IOException, TimeoutException {
//1.创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置Rabbitmq连接信息
factory.setHost(RABBITMQ_HOST);
factory.setPort(RABBITMQ_PORT);
factory.setUsername(RABBITMQ_USERNAME);
factory.setPassword(RABBITMQ_PASSWORD);
factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
//3.返回连接对象
return factory.newConnection();
}
}
通讯模式
1.简单通讯
即一个生产者可以向一个队列发送消息,一个消费者可以尝试从一个队列接收数据。如下图:
public final static String HELLO_QUEUE_NAME = "hello";
@Test
public void publish_hello() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(HELLO_QUEUE_NAME, false, false, false, null);
//4.发布消息
String msg = "hello,world";
channel.basicPublish("", HELLO_QUEUE_NAME, null, msg.getBytes());
}
@Test
public void consume_hello() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(HELLO_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received '" + message + "'");
};
channel.basicConsume(HELLO_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
2.工作队列通讯
与简单通讯一样,当消费能力不足或想要提高吞吐时可添加多个消费者进行处理业务。如下图,队列中的消息会逐条被C1和C2消费。
public final static String WORK_QUEUE_NAME = "work";
@Test
public void publish_work_queue() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//4.发布消息
String msg = "hello,work queue";
channel.basicPublish("", WORK_QUEUE_NAME, null, msg.getBytes());
}
@Test
public void consume_work_queue1() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("consume1 Received '" + message + "'");
};
channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
@Test
public void consume_work_queue2() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("consume2 Received '" + message + "'");
};
channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
3.发布/订阅通讯
工作队列背后的假设是,每个任务只交付给一个消费者做同一件事。如果要交付给多个消费者做不同的事,需要引入交换机实现一个完整的消息传递模型,这种模式被称为“发布/订阅”。如下图,消息会发布到交换机中,交换机向绑定的队列同时发送消息,最终C1和C2会同时消费此条消息。
public final static String PUB_EXCHANGE_NAME = "pub-ex";
public final static String PUB1_QUEUE_NAME = "pub-que1";
public final static String PUB2_QUEUE_NAME = "pub-que2";
@Test
public void publish_pub_sub() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建交换机
channel.exchangeDeclare(PUB_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//4.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(PUB1_QUEUE_NAME, false, false, false, null);
channel.queueDeclare(PUB2_QUEUE_NAME, false, false, false, null);
//5.绑定队列
channel.queueBind(PUB1_QUEUE_NAME, PUB_EXCHANGE_NAME, "");
channel.queueBind(PUB2_QUEUE_NAME, PUB_EXCHANGE_NAME, "");
//6.发布消息
String msg = "hello,pub/sub";
channel.basicPublish(PUB_EXCHANGE_NAME, "", null, msg.getBytes());
}
@Test
public void consume_pub_sub1() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(PUB1_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("pub_sub1 Received '" + message + "'");
};
channel.basicConsume(PUB1_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
@Test
public void consume_pub_sub2() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(PUB2_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("pub_sub2 Received '" + message + "'");
};
channel.basicConsume(PUB2_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
4.路由通讯
发布/订阅模式是交换机将一条消息同时路由给多个队列,“路由”模式可以将消息通过交换机指定到某个队列中从而被消费。如下图,交换机将所有类型的日志路由到一个队列中,将error类型的日志路由到另一个队列中。
public final static String ROUT_EXCHANGE_NAME = "rout-ex";
public final static String ROUTALL_QUEUE_NAME = "rout-queall";
public final static String ROUTONE_QUEUE_NAME = "rout-queone";
@Test
public void publish_routing() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建交换机
channel.exchangeDeclare(ROUT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//4.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(ROUTALL_QUEUE_NAME, false, false, false, null);
channel.queueDeclare(ROUTONE_QUEUE_NAME, false, false, false, null);
//5.绑定队列
channel.queueBind(ROUTALL_QUEUE_NAME, ROUT_EXCHANGE_NAME, "all");
channel.queueBind(ROUTONE_QUEUE_NAME, ROUT_EXCHANGE_NAME, "one");
//6.发布消息
String msg1 = "hello,1-all";
String msg2 = "hello,2-all";
String msg3 = "hello,1-one";
channel.basicPublish(ROUT_EXCHANGE_NAME, "all", null, msg1.getBytes());
channel.basicPublish(ROUT_EXCHANGE_NAME, "all", null, msg2.getBytes());
channel.basicPublish(ROUT_EXCHANGE_NAME, "one", null, msg3.getBytes());
}
@Test
public void consume_routing_all() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(ROUTALL_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("consume_routing_all Received '" + message + "'");
};
channel.basicConsume(ROUTALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
@Test
public void consume_routing_one() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(ROUTONE_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("consume_routing_one Received '" + message + "'");
};
channel.basicConsume(ROUTONE_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
5.主题通讯
“路由”模式仍然有局限性——它不能基于多个标准进行路由。主题可以带来很大的灵活性,发送到主题交换的消息不能有任意的routing_key,它必须是一个用点分隔的单词列表,routing_key有两种重要的特殊情况:
- *只能代替一个词。
- #可以替换零个或多个单词。
public final static String TOPIC_EXCHANGE_NAME = "topic-ex";
public final static String TOPICALL_QUEUE_NAME = "topic-queall";
public final static String TOPICONE_QUEUE_NAME = "topic-queone";
@Test
public void publish_topic() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建交换机
channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//4.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(TOPICALL_QUEUE_NAME, false, false, false, null);
channel.queueDeclare(TOPICONE_QUEUE_NAME, false, false, false, null);
//5.绑定队列
channel.queueBind(TOPICALL_QUEUE_NAME, TOPIC_EXCHANGE_NAME, "*.all.*");
channel.queueBind(TOPICONE_QUEUE_NAME, TOPIC_EXCHANGE_NAME, "#.one");
//6.发布消息
String msg1 = "hello.all.world";
String msg2 = "hello.world.one";
channel.basicPublish(TOPIC_EXCHANGE_NAME, "hello.all.world", null, msg1.getBytes());
channel.basicPublish(TOPIC_EXCHANGE_NAME, "hello.world.one", null, msg2.getBytes());
}
@Test
public void consume_topic_all() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(TOPICALL_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("consume_topic_all Received '" + message + "'");
};
channel.basicConsume(TOPICALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
@Test
public void consume_topic_one() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(TOPICONE_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("consume_topic_one Received '" + message + "'");
};
channel.basicConsume(TOPICONE_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
6.RPC通讯
RabbitMQ作为消息中间件可以达到应用解耦效果,如果想达到RPC远程调用同步返回结果,RabbitMQ同样支持,其原理如下:
- 发布者发送消息时指定一个回调队列和唯一id
- 消费者处理完成后将结果发送到回调队列中
- 发布者按照唯一id接收消息并处理
如下图
public final static String RPC_QUEUE_NAME = "rpc-que";
public final static String RPCCALLBACK_QUEUE_NAME = "rpc-callback-que";
@Test
public void publish_rpc() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
// replyTo回调队列
channel.queueDeclare(RPCCALLBACK_QUEUE_NAME, false, false, false, null);
//4.发布消息
String msg = "hello rpc";
String correlationId = UUID.randomUUID().toString();
/*AMQP 协议预先定义了一组与消息一起使用的14个属性。除了以下属性外,大多数属性很少使用:
deliveryMode:将消息标记为持久(值为2)或瞬时(任何其他值)。
contentType:用于描述编码的mime类型。例如,对于常用的JSON编码,最好将此属性设置为:application/JSON。
replyTo:通常用于命名回调队列。
correlationId:用于将RPC响应与请求关联。*/
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(RPCCALLBACK_QUEUE_NAME).correlationId(correlationId).build();
//5.回调响应结果
channel.basicPublish("", RPC_QUEUE_NAME, basicProperties, msg.getBytes());
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String recid = delivery.getProperties().getCorrelationId();
if (correlationId.equalsIgnoreCase(recid)) System.out.println("rpc-callback-que '" + message + "'");
};
channel.basicConsume(RPCCALLBACK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
@Test
public void consume_rpc() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("consume_rpc Received '" + message + "'");
String correlationId = delivery.getProperties().getCorrelationId();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
String replyTo = delivery.getProperties().getReplyTo();
String callbackmsg = "rpc callback";
channel.basicPublish("", replyTo, basicProperties, callbackmsg.getBytes());
};
channel.basicConsume(RPC_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
System.in.read();
}
7.Publisher确认通讯
Publisher确认是RabbitMQ扩展以实现可靠发布。当在通道上启用发布者确认时,代理将异步确认客户端发布的消息,这意味着它们已在服务器端得到处理。
public final static String CONFIRM_EXCHANGE_NAME = "confirm-ex";
public final static String CONFIRM_QUEUE_NAME = "confirm-que";
@Test
public void publish_confirm() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.开启确认选项
channel.confirmSelect();
//4.构建交换机
channel.exchangeDeclare(CONFIRM_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//5.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(CONFIRM_QUEUE_NAME, true, false, false, null);
//6.绑定队列
String right_routing_key = "confirm";
String error_routing_key = "confirm_err";
channel.queueBind(CONFIRM_QUEUE_NAME, CONFIRM_EXCHANGE_NAME, right_routing_key);
//7.消息到达交换机确认监听
channel.addConfirmListener((sequenceNumber, multiple) -> {
System.out.println("消息成功发送到交换机");
}, (sequenceNumber, multiple) -> {
System.err.println("消息未发送到交换机,补偿操作。");
});
//8.消息到达队列确认监听
channel.addReturnListener((replyCode, replyText, exchange, routingKey, basicProperties, body) -> {
System.err.format("消息 %s 未路由到指定队列: %s, replyText: %s,replyCode: %d%n", body, routingKey, replyText, replyCode);
});
//设置消息持久化
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
//7.发布消息
String msg = "hello confirm";
channel.basicPublish(CONFIRM_EXCHANGE_NAME, error_routing_key,true, basicProperties, msg.getBytes());
System.in.read();
}
@Test
public void consume_ack() throws IOException, TimeoutException {
//1.获取连接对象
Connection connection = MQConnections.getConnection();
//2.构建Channl
Channel channel = connection.createChannel();
//3.构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
channel.queueDeclare(CONFIRM_QUEUE_NAME, true, false, false, null);
//4.监听消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("consume_routing_one Received '" + message + "'");
//消息处理后手动ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// ack为false
channel.basicConsume(CONFIRM_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
System.in.read();
}
代码仓库
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)