spring boot 集成 rabbitmq 、 redis 、 mqtt(mosquitto)、activemq
【摘要】 spring boot 集成rabbitmq、 redis 、 和 mqtt(mosquitto)
一、 添加依赖,编写 application.xml 依赖
<!--添加 r...
spring boot 集成rabbitmq、 redis 、 和 mqtt(mosquitto)
一、 添加依赖,编写 application.xml 依赖
-
-
<!--添加 rabbitmq 的依赖-->
-
<dependency>
-
<groupId>com.rabbitmq</groupId>
-
<artifactId>http-client</artifactId>
-
<version>${rabbitmq.http-client}</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
-
-
<!-- 引入redis依赖 -->
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-data-redis</artifactId>
-
</dependency>
-
<!-- 缓存的依赖 -->
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-cache</artifactId>
-
</dependency>
-
<!--mqtt依赖-->
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-integration</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.integration</groupId>
-
<artifactId>spring-integration-stream</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.integration</groupId>
-
<artifactId>spring-integration-mqtt</artifactId>
-
</dependency>
-
-
<!--添加activemq的依赖-->
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-activemq</artifactId>
-
</dependency>
-
-
<dependency>
-
<groupId>org.apache.activemq</groupId>
-
<artifactId>activemq-pool</artifactId>
-
<version>5.15.8</version>
-
</dependency>
二、配置自动配置类
1.编写 RabbitConfig 配置类
-
package com.devframe.common.config;
-
-
import org.springframework.amqp.core.AmqpAdmin;
-
import org.springframework.amqp.core.AmqpManagementOperations;
-
import org.springframework.amqp.core.Queue;
-
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
-
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-
import org.springframework.amqp.rabbit.core.RabbitAdmin;
-
import org.springframework.amqp.rabbit.core.RabbitManagementTemplate;
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
-
import org.springframework.boot.context.properties.ConfigurationProperties;
-
import org.springframework.context.annotation.Bean;
-
import org.springframework.context.annotation.Configuration;
-
-
/**
-
* @ClassName:
-
* @Description:
-
* @author DuanZhaoXu
-
* @data 2019年1月5日上午10:52:32
-
*/
-
@Configuration
-
public class RabbitConfig {
-
-
@Bean
-
@ConfigurationProperties(prefix = "spring.rabbitmq")
-
public ConnectionFactory connectionFactory() {
-
return new CachingConnectionFactory();
-
}
-
-
@Bean
-
public AmqpAdmin AmqpAdmin() {
-
return new RabbitAdmin(connectionFactory());
-
}
-
-
@Bean
-
public RabbitTemplate rabbitTemplate() {
-
RabbitTemplate template = new RabbitTemplate(connectionFactory());
-
template.setMessageConverter(new Jackson2JsonMessageConverter());
-
return template;
-
}
-
-
@Bean
-
public AmqpManagementOperations amqpManagementOperations() {
-
AmqpManagementOperations amqpManagementOperations = new RabbitManagementTemplate(
-
"http://192.168.19.200:15672", "admin", "admin@123");
-
return amqpManagementOperations;
-
}
-
-
-
@Bean
-
public Queue mqttQueue() {
-
return new Queue("mqttQueue", true, false, false);
-
}
-
}
2.编写 RedisConfig配置类
-
package com.devframe.common.config;
-
-
import java.io.Serializable;
-
-
import org.springframework.context.annotation.Bean;
-
import org.springframework.context.annotation.Configuration;
-
import org.springframework.data.redis.connection.RedisConnectionFactory;
-
import org.springframework.data.redis.core.RedisTemplate;
-
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
-
import org.springframework.data.redis.serializer.StringRedisSerializer;
-
-
@Configuration
-
public class RedisConfig {
-
-
@Bean
-
public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
-
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
-
redisTemplate.setKeySerializer(new StringRedisSerializer());
-
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
-
redisTemplate.setConnectionFactory(redisConnectionFactory);
-
return redisTemplate;
-
}
-
}
3.编写 MqttSenderConfig 配置类
https://docs.spring.io/spring-integration/docs/5.1.1.RELEASE/reference/html/mqtt.html#mqtt-inbound
-
package com.devframe.common.config;
-
-
import java.util.Arrays;
-
import java.util.List;
-
-
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-
import org.springframework.beans.factory.annotation.Value;
-
import org.springframework.context.annotation.Bean;
-
import org.springframework.context.annotation.Configuration;
-
import org.springframework.integration.annotation.IntegrationComponentScan;
-
import org.springframework.integration.annotation.ServiceActivator;
-
import org.springframework.integration.channel.DirectChannel;
-
import org.springframework.integration.core.MessageProducer;
-
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
-
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
-
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
-
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
-
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
-
import org.springframework.messaging.Message;
-
import org.springframework.messaging.MessageChannel;
-
import org.springframework.messaging.MessageHandler;
-
import org.springframework.messaging.MessagingException;
-
-
-
@Configuration
-
@IntegrationComponentScan
-
public class MqttSenderConfig {
-
-
-
@Value("${spring.mqtt.username}")
-
private String username;
-
-
-
-
@Value("${spring.mqtt.password}")
-
private String password;
-
-
-
-
@Value("${spring.mqtt.url}")
-
private String hostUrl;
-
-
-
-
//@Value("${spring.mqtt.client.id}")
-
private String clientId = String.valueOf(System.currentTimeMillis());
-
-
-
-
@Value("${spring.mqtt.default.topic}")
-
private String defaultTopic;
-
-
@Value("#{'${spring.mqtt.topics}'.split(',')}")
-
private List<String> topics ;
-
-
@Value("#{'${spring.mqtt.qosValues}'.split(',')}")
-
private List<Integer> qosValues;
-
-
-
@Bean
-
public MqttConnectOptions getMqttConnectOptions(){
-
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
-
mqttConnectOptions.setUserName(username);
-
mqttConnectOptions.setCleanSession(true);
-
mqttConnectOptions.setPassword(password.toCharArray());
-
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
-
mqttConnectOptions.setKeepAliveInterval(2);
-
// 设置超时时间 单位为秒
-
mqttConnectOptions.setConnectionTimeout(10);
-
mqttConnectOptions.setMaxInflight(100000000);
-
-
return mqttConnectOptions;
-
-
}
-
-
@Bean
-
public MqttPahoClientFactory mqttClientFactory() {
-
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
-
factory.setConnectionOptions(getMqttConnectOptions());
-
return factory;
-
}
-
-
@Bean
-
@ServiceActivator(inputChannel = "mqttOutboundChannel")
-
public MessageHandler mqttOutbound() {
-
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
-
messageHandler.setAsync(true);
-
messageHandler.setDefaultTopic(defaultTopic);
-
messageHandler.setDefaultRetained(false);
-
return messageHandler;
-
}
-
-
@Bean
-
public MessageChannel mqttOutboundChannel() {
-
return new DirectChannel();
-
}
-
//接收通道
-
@Bean
-
public MessageChannel mqttInputChannel() {
-
return new DirectChannel();
-
}
-
-
-
//配置client,监听的topic
-
@Bean
-
public MessageProducer inbound() {
-
String[] strings = new String[topics.size()];
-
Integer[] ints = new Integer[qosValues.size()];
-
topics.toArray(strings);
-
qosValues.toArray(ints);
-
-
int[] its= Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
-
MqttPahoMessageDrivenChannelAdapter adapter =
-
new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),strings);
-
-
adapter.setCompletionTimeout(3000);
-
adapter.setConverter(new DefaultPahoMessageConverter());
-
adapter.setOutputChannel(mqttInputChannel());
-
return adapter;
-
}
-
-
//通过通道获取数据
-
@Bean
-
@ServiceActivator(inputChannel = "mqttInputChannel")
-
public MessageHandler handler() {
-
return new MessageHandler() {
-
@Override
-
public void handleMessage(Message<?> message) throws MessagingException {
-
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
-
String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
-
// System.out.println(topic+"|"+message.getPayload().toString());
-
}
-
};
-
}
-
-
}
4.编写 MqttGateway 发送消息的接口
-
package com.devframe.common.config;
-
-
import org.springframework.integration.annotation.MessagingGateway;
-
import org.springframework.integration.mqtt.support.MqttHeaders;
-
import org.springframework.messaging.handler.annotation.Header;
-
import org.springframework.stereotype.Component;
-
@Component
-
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
-
public interface MqttGateway {
-
-
void sendToMqtt(String data);
-
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
-
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
-
}
在 其他的 需要 用到 mqtt 发送消息的时候直接 @Autowired 该 接口 既可进行消息的发送
5、编写 activemq 的 配置类
-
package com.devframe.common.config;
-
-
import javax.jms.Queue;
-
import javax.jms.Topic;
-
-
import org.apache.activemq.ActiveMQConnectionFactory;
-
import org.apache.activemq.command.ActiveMQQueue;
-
import org.apache.activemq.command.ActiveMQTopic;
-
import org.apache.activemq.pool.PooledConnectionFactory;
-
import org.springframework.beans.factory.annotation.Value;
-
import org.springframework.context.annotation.Bean;
-
import org.springframework.context.annotation.Configuration;
-
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
-
import org.springframework.jms.config.JmsListenerContainerFactory;
-
-
@Configuration
-
public class ActivemqConfig {
-
-
// #配置 农机上报的消息转发到的topic名称
-
// queueName: mqttQueue
-
// topicName: mqttTopic
-
//@Value("${spring.queueName}")
-
public static final String queueName ="mqttQueue";
-
-
//@Value("${spring.topicName}")
-
public static final String topicName ="mqttTopic";
-
-
@Value("${spring.activemq.user}")
-
private String usrName;
-
-
@Value("${spring.activemq.password}")
-
private String password;
-
-
@Value("${spring.activemq.broker-url}")
-
private String brokerUrl;
-
-
-
@Bean
-
public Queue queue(){
-
return new ActiveMQQueue(queueName);
-
}
-
-
@Bean
-
public Topic topic(){
-
return new ActiveMQTopic(topicName);
-
}
-
-
//配置activemq连接工厂
-
// @Bean
-
// public ActiveMQConnectionFactory activeMQConnectionFactory() {
-
// return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
-
// }
-
-
//配置连接池工厂(在高并发的情况下需要使用池连接工厂,不然当向activemq发送过多的消息时候会报错)
-
@Bean
-
public PooledConnectionFactory pooledConnectionFactory() {
-
return new PooledConnectionFactory(new ActiveMQConnectionFactory(usrName, password, brokerUrl));
-
}
-
-
-
//配置JmsListenerContainerFactory
-
@Bean
-
public JmsListenerContainerFactory<?> jmsListenerContainerQueue( ){
-
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
-
bean.setConnectionFactory(pooledConnectionFactory());
-
return bean;
-
}
-
-
//配置发布订阅模式的JmsListenerContainerFactory,用于在消费者方指定
-
@Bean
-
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(){
-
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
-
//设置为发布订阅方式, 默认情况下使用的生产消费者方式
-
bean.setPubSubDomain(true);
-
bean.setConnectionFactory(pooledConnectionFactory());
-
return bean;
-
}
-
}
三 、AmqpTemplate 、RedisTemplate、MqttGateway ,
1.AmqpTemplate 的使用
-
package com.devframe.controller;
-
-
import java.io.UnsupportedEncodingException;
-
import java.util.HashMap;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.Map.Entry;
-
import java.util.UUID;
-
-
import org.springframework.amqp.core.AmqpAdmin;
-
import org.springframework.amqp.core.AmqpManagementOperations;
-
import org.springframework.amqp.core.AmqpTemplate;
-
import org.springframework.amqp.core.Binding;
-
import org.springframework.amqp.core.BindingBuilder;
-
import org.springframework.amqp.core.Exchange;
-
import org.springframework.amqp.core.ExchangeBuilder;
-
import org.springframework.amqp.core.Message;
-
import org.springframework.amqp.core.MessageProperties;
-
import org.springframework.amqp.core.Queue;
-
import org.springframework.amqp.core.QueueBuilder;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.ui.Model;
-
import org.springframework.web.bind.annotation.RequestMapping;
-
import org.springframework.web.bind.annotation.RequestMethod;
-
import org.springframework.web.bind.annotation.RequestParam;
-
import org.springframework.web.bind.annotation.RestController;
-
-
import com.devframe.entity.OperatEntity;
-
-
import io.swagger.annotations.Api;
-
import io.swagger.annotations.ApiOperation;
-
-
@RestController
-
@Api(tags = "RabbitServerController rabbitmq测试controller")
-
public class RabbitServerController {
-
-
@Autowired
-
private AmqpTemplate amqpTemplate;
-
-
@Autowired
-
private AmqpManagementOperations amqpManagementOperations;
-
-
@Autowired
-
private AmqpAdmin amqpAdmin;
-
-
@RequestMapping(value = "/sendMsg", method = RequestMethod.POST)
-
public String sendAmqbMsg(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
-
if (model != null && !"".equals(msg)) {
-
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", msg);
-
} else {
-
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", "hello world");
-
}
-
amqpManagementOperations.getQueues().forEach(x -> {
-
System.out.println(x.getName());
-
});
-
;
-
-
return "success";
-
}
-
-
@RequestMapping(value = "/sendMsg2", method = RequestMethod.POST)
-
public String sendAmqbMsg2(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
-
if (model != null && !"".equals(msg)) {
-
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙!!!");
-
} else {
-
amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙");
-
}
-
return "success";
-
}
-
-
@RequestMapping(value = "/sendMsg3", method = RequestMethod.POST)
-
public String sendAmqbMsg3(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
-
if (model != null && !"".equals(msg)) {
-
amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界!!!");
-
} else {
-
amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界");
-
}
-
return "success";
-
}
-
-
@RequestMapping(value = "/helloWorld", method = RequestMethod.POST)
-
@ApiOperation("1对1 or 1对多 无交换机模式")
-
public String helloWorld(Model model, @RequestParam(value = "msg", defaultValue = "1对1,无交换机模式!") String msg) {
-
if (model != null && !"".equals(msg)) {
-
amqpTemplate.convertAndSend("mqttQueue", msg);
-
}
-
return "success";
-
}
-
-
@RequestMapping(value = "/pubSub", method = RequestMethod.POST)
-
@ApiOperation("广播发布/订阅模式")
-
public String pubSub(Model model, @RequestParam(value = "msg", defaultValue = "广播发布/订阅模式") String msg) {
-
if (model != null && !"".equals(msg)) {
-
// 广播模式对于路由无效,所有的消费者都可以获取都消息
-
amqpTemplate.convertAndSend("pubSubExchange", "", msg);
-
}
-
return "success";
-
}
-
-
@RequestMapping(value = "/routing", method = RequestMethod.POST)
-
@ApiOperation("路由消息模式")
-
public String routing(Model model, @RequestParam(value = "msg", defaultValue = "路由消息模式") String msg) {
-
if (model != null && !"".equals(msg)) {
-
String[] infoTyp = new String[] { "info", "warn", "error" };
-
for (String routing : infoTyp) {
-
amqpTemplate.convertAndSend("routingExchange", routing, msg);
-
}
-
}
-
return "success";
-
}
-
-
@RequestMapping(value = "/topicMatch", method = RequestMethod.POST)
-
@ApiOperation("主题模式")
-
public String topicModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) {
-
if (model != null && !"".equals(msg)) {
-
String[] infoTyp = new String[] { "P.123.asdasd", "P.456.JQBE", "P.789.WBD", "P.ASBDJBAS" };
-
for (String routing : infoTyp) {
-
amqpTemplate.convertAndSend("topicExchange", routing, msg);
-
}
-
}
-
return "success";
-
}
-
-
@RequestMapping(value = "/headerModal", method = RequestMethod.POST)
-
@ApiOperation("header模式")
-
public String headerModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg)
-
throws UnsupportedEncodingException {
-
if (model != null && !"".equals(msg)) {
-
Map<String, Object> map = new HashMap<String, Object>();
-
map.put("abc", "nb");
-
map.put("def", "pl");
-
map.put("jabs", "aksd");
-
for (Entry<String, Object> entry : map.entrySet()) {
-
MessageProperties messageProperties = new MessageProperties();
-
messageProperties.setHeader(entry.getKey(), entry.getValue());
-
Message message = new Message(msg.getBytes("utf-8"), messageProperties);
-
amqpTemplate.convertAndSend("headerExchange", null, message);
-
}
-
}
-
return "success";
-
}
-
-
@RequestMapping(value = "/createTaskQueue", method = RequestMethod.POST)
-
@ApiOperation("自动创建队列并发送消息到队列")
-
public String createTaskQueue(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg,
-
@RequestParam(name = "queueName", defaultValue = "ff008") String queueName)
-
throws UnsupportedEncodingException {
-
if (model != null && !"".equals(msg)) {
-
amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
-
OperatEntity operatEntity = new OperatEntity();
-
operatEntity.setId("123456");
-
operatEntity.setIndepeid("askdnad");
-
operatEntity.setRecordid(msg);
-
operatEntity.setTablename(msg);
-
amqpTemplate.convertAndSend(queueName, operatEntity);
-
// Object object = amqpTemplate.receiveAndConvert(queueName);
-
// if(object instanceof OperatEntity){
-
// OperatEntity operatEntity2 = (OperatEntity)object;
-
// System.out.println("从队列"+queueName+"收到了一个【"+operatEntity2.toString()+"】");
-
// System.out.println(operatEntity.getId());
-
// System.out.println(operatEntity.getIndepeid());
-
// System.out.println(operatEntity.getRecordid());
-
// System.out.println(operatEntity.getTablename());
-
// }else{
-
// String result = (String)object;
-
// System.out.println("从队列"+queueName+"收到了一个【"+result+"】");
-
// }
-
}
-
return "success";
-
}
-
-
/**
-
* 删除以 任务名称为前缀的队列
-
*
-
* @param queueNamePre
-
* @return String
-
*/
-
@RequestMapping(value = "/deleteQueue", method = RequestMethod.POST)
-
@ApiOperation("删除以 任务名称为前缀的队列")
-
public String deleteQueueWithPre(String queueNamePre) {
-
List<Queue> queues = amqpManagementOperations.getQueues();
-
for (Queue queue : queues) {
-
if (queue.getName().startsWith(queueNamePre)) {
-
amqpManagementOperations.deleteQueue(queue);
-
}
-
}
-
return "success";
-
}
-
-
@RequestMapping(value = "/createExchangeBindTaskQueue", method = RequestMethod.POST)
-
@ApiOperation("自动创建交换机并绑定队列")
-
public String createExchangeBindTaskQueue(Model model,
-
@RequestParam(value = "msg", defaultValue = "主题模式") String msg,
-
@RequestParam(name = "exchangeName", defaultValue = "ff008") String exchangeName)
-
throws UnsupportedEncodingException {
-
if (model != null && !"".equals(msg)) {
-
// 查询交换机是否存在
-
// Exchange exchange =
-
// amqpManagementOperations.getExchange(exchangeName);
-
// if(exchange==null){ //如果不存在 ,则声明该交换机
-
String randomNum = UUID.randomUUID().toString().substring(0, 8);
-
Exchange exchange = ExchangeBuilder.directExchange(exchangeName).durable(true).build();
-
Queue queue = QueueBuilder.durable(exchangeName + "-" + randomNum).build();
-
amqpAdmin.declareExchange(exchange);
-
amqpAdmin.declareQueue(queue);
-
// }
-
// 否则直接 将 该队列绑定到 交换机上面,routingkey 为 生成的8位随机数
-
amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(randomNum).noargs());
-
amqpTemplate.convertAndSend(exchangeName, randomNum, msg);
-
String result = (String) amqpTemplate.receiveAndConvert(queue.getName());
-
System.out.println(result);
-
}
-
return "success";
-
}
-
-
/**
-
* 删除以 任务名称的交换机
-
*
-
* @param queueNamePre
-
* @return String
-
*/
-
@RequestMapping(value = "/deleteExchange", method = RequestMethod.POST)
-
@ApiOperation("删除以 任务名称的交换机")
-
public String deleteExchange(String exchangeName) {
-
// Map<String, Object> map =
-
// amqpManagementOperations.getExchange(exchangeName).getArguments();
-
List<Binding> bindings = amqpManagementOperations.getBindingsForExchange("/", exchangeName);
-
for (Binding binding : bindings) {
-
String routingkey = binding.getRoutingKey();
-
amqpAdmin.deleteQueue(exchangeName + "-" + routingkey);
-
}
-
amqpAdmin.deleteExchange(exchangeName);
-
return "success";
-
}
-
// rpc调用未实现
-
}
2. RedisTemplate 的使用
-
package com.devframe.common.util;
-
-
import java.util.List;
-
import java.util.Set;
-
-
import org.apache.commons.lang3.StringUtils;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.data.redis.core.StringRedisTemplate;
-
import org.springframework.stereotype.Component;
-
-
@Component
-
public class RedisTemplateUtils {
-
-
@Autowired
-
private StringRedisTemplate stringRedisTemplate;
-
-
// @Autowired
-
// private RedisTemplate<String, Serializable> redisTemplate;
-
-
public static final int LOCK_TIMEOUT = 4;
-
-
-
// ---------------value 为String 的 操作
-
/**
-
* set 字符串的 key,value
-
*
-
* @param key
-
* @param value
-
*/
-
public void set(String key, String value) {
-
stringRedisTemplate.opsForValue().set(key, value);
-
}
-
-
/**
-
* 根据key获取字符串的 value
-
*
-
* @param key
-
*/
-
public String get(String key) {
-
return stringRedisTemplate.opsForValue().get(key);
-
}
-
-
/**
-
* 根据 key 删除
-
*
-
* @param key
-
*/
-
public void del(String key) {
-
stringRedisTemplate.opsForValue().getOperations().delete(key);
-
}
-
-
-
// ---------------value 为 List 的操作
-
-
/**
-
* push 元素到 list中
-
*/
-
public void lpush(String key,String value) {
-
stringRedisTemplate.opsForList().leftPush(key, value);
-
}
-
-
/**
-
* 获取 list中 某个下标的元素
-
* @param key
-
* @param index
-
*/
-
public String lindex(String key,long index) {
-
return stringRedisTemplate.opsForList().index(key, index);
-
}
-
-
-
/**
-
* 根据key 获取 list的长度
-
* @param key
-
* @return
-
*/
-
public long llen(String key) {
-
return stringRedisTemplate.opsForList().size(key);
-
}
-
-
-
/**
-
* 根据 key,start,end 获取某一个区间的 list数据集
-
* @param key
-
* @param start
-
* @param end
-
* @return
-
*/
-
public List<String> lrange(String key, long start, long end){
-
return stringRedisTemplate.opsForList().range(key, start, end);
-
}
-
-
-
/**
-
* 根据 key值 pattern查询所有匹配的值,比如login*
-
* @param key
-
* @return
-
*/
-
public Set<String> keys(String key){
-
return stringRedisTemplate.keys(key);
-
}
-
-
-
-
-
/**
-
* 加锁
-
*
-
* @param key productId - 商品的唯一标志
-
* @param value 当前时间+超时时间 也就是时间戳
-
* @return
-
*/
-
public boolean lock(String key,String value) {
-
if (stringRedisTemplate.opsForValue().setIfAbsent(key, value)) {// 对应setnx命令
-
// 可以成功设置,也就是key不存在
-
return true;
-
}
-
-
// 判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁
-
String currentValue = stringRedisTemplate.opsForValue().get(key);
-
// 如果锁过期
-
if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {// currentValue不为空且小于当前时间
-
// 获取上一个锁的时间value
-
String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value);// 对应getset,如果key存在
-
-
// 假设两个线程同时进来这里,因为key被占用了,而且锁过期了。获取的值currentValue=A(get取的旧的值肯定是一样的),两个线程的value都是B,key都是K.锁时间已经过期了。
-
// 而这里面的getAndSet一次只会一个执行,也就是一个执行之后,上一个的value已经变成了B。只有一个线程获取的上一个值会是A,另一个线程拿到的值是B。
-
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
-
// oldValue不为空且oldValue等于currentValue,也就是校验是不是上个对应的商品时间戳,也是防止并发
-
return true;
-
}
-
}
-
return false;
-
}
-
-
/**
-
* 解锁
-
*
-
* @param key
-
* @param value
-
*/
-
public void unlock(String key,String value) {
-
try {
-
String currentValue = stringRedisTemplate.opsForValue().get(key);
-
if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
-
stringRedisTemplate.opsForValue().getOperations().delete(key);// 删除key
-
}
-
-
} catch (Exception e) {
-
Log.error(e);
-
}
-
}
-
-
}
3.MqttGateway 的使用
-
@Autowired
-
private MqttGateway mqttGateway;
-
-
@RequestMapping("/sendMsg")
-
public String sendMsg(String sendData,String topic){
-
try {
-
mqttGateway.sendToMqtt(sendData,topic);
-
mqttGateway.sendToMqtt(topic, 0, sendData);
-
return "发送成功";
-
}catch (Exception e) {
-
// TODO: handle exception
-
Log.error(e);
-
return "发送失败";
-
}
-
}
4、jmsMessagingTemplate 的使用
-
@@Autowired
-
private JmsMessagingTemplate jmsMessagingTemplate;
-
jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(ActivemqConfig.topicName), payload);
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/86233738
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)