spring boot 集成 rabbitmq 、 redis 、 mqtt(mosquitto)、activemq

举报
小米粒-biubiubiu 发表于 2020/12/03 00:31:06 2020/12/03
【摘要】                      spring boot  集成rabbitmq、  redis 、 和  mqtt(mosquitto) 一、 添加依赖,编写 application.xml 依赖 <!--添加 r...

                     spring boot  集成rabbitmq、  redis 、 和  mqtt(mosquitto)

一、 添加依赖,编写 application.xml 依赖


  
  1. <!--添加 rabbitmq 的依赖-->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>http-client</artifactId>
  5. <version>${rabbitmq.http-client}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-amqp</artifactId>
  10. </dependency>
  11. <!-- 引入redis依赖 -->
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-data-redis</artifactId>
  15. </dependency>
  16. <!-- 缓存的依赖 -->
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter-cache</artifactId>
  20. </dependency>
  21. <!--mqtt依赖-->
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-integration</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.integration</groupId>
  28. <artifactId>spring-integration-stream</artifactId>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.springframework.integration</groupId>
  32. <artifactId>spring-integration-mqtt</artifactId>
  33. </dependency>
  34. <!--添加activemq的依赖-->
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-activemq</artifactId>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.apache.activemq</groupId>
  41. <artifactId>activemq-pool</artifactId>
  42. <version>5.15.8</version>
  43. </dependency>

二、配置自动配置类

1.编写 RabbitConfig 配置类


  
  1. package com.devframe.common.config;
  2. import org.springframework.amqp.core.AmqpAdmin;
  3. import org.springframework.amqp.core.AmqpManagementOperations;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  8. import org.springframework.amqp.rabbit.core.RabbitManagementTemplate;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  11. import org.springframework.boot.context.properties.ConfigurationProperties;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.context.annotation.Configuration;
  14. /**
  15. * @ClassName:
  16. * @Description:
  17. * @author DuanZhaoXu
  18. * @data 2019年1月5日上午10:52:32
  19. */
  20. @Configuration
  21. public class RabbitConfig {
  22. @Bean
  23. @ConfigurationProperties(prefix = "spring.rabbitmq")
  24. public ConnectionFactory connectionFactory() {
  25. return new CachingConnectionFactory();
  26. }
  27. @Bean
  28. public AmqpAdmin AmqpAdmin() {
  29. return new RabbitAdmin(connectionFactory());
  30. }
  31. @Bean
  32. public RabbitTemplate rabbitTemplate() {
  33. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  34. template.setMessageConverter(new Jackson2JsonMessageConverter());
  35. return template;
  36. }
  37. @Bean
  38. public AmqpManagementOperations amqpManagementOperations() {
  39. AmqpManagementOperations amqpManagementOperations = new RabbitManagementTemplate(
  40. "http://192.168.19.200:15672", "admin", "admin@123");
  41. return amqpManagementOperations;
  42. }
  43. @Bean
  44. public Queue mqttQueue() {
  45. return new Queue("mqttQueue", true, false, false);
  46. }
  47. }

 2.编写 RedisConfig配置类


  
  1. package com.devframe.common.config;
  2. import java.io.Serializable;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.data.redis.connection.RedisConnectionFactory;
  6. import org.springframework.data.redis.core.RedisTemplate;
  7. import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
  8. import org.springframework.data.redis.serializer.StringRedisSerializer;
  9. @Configuration
  10. public class RedisConfig {
  11. @Bean
  12. public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
  13. RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
  14. redisTemplate.setKeySerializer(new StringRedisSerializer());
  15. redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  16. redisTemplate.setConnectionFactory(redisConnectionFactory);
  17. return redisTemplate;
  18. }
  19. }

 3.编写 MqttSenderConfig 配置类 

https://docs.spring.io/spring-integration/docs/5.1.1.RELEASE/reference/html/mqtt.html#mqtt-inbound


  
  1. package com.devframe.common.config;
  2. import java.util.Arrays;
  3. import java.util.List;
  4. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.integration.annotation.IntegrationComponentScan;
  9. import org.springframework.integration.annotation.ServiceActivator;
  10. import org.springframework.integration.channel.DirectChannel;
  11. import org.springframework.integration.core.MessageProducer;
  12. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  13. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  14. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  15. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  16. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  17. import org.springframework.messaging.Message;
  18. import org.springframework.messaging.MessageChannel;
  19. import org.springframework.messaging.MessageHandler;
  20. import org.springframework.messaging.MessagingException;
  21. @Configuration
  22. @IntegrationComponentScan
  23. public class MqttSenderConfig {
  24. @Value("${spring.mqtt.username}")
  25. private String username;
  26. @Value("${spring.mqtt.password}")
  27. private String password;
  28. @Value("${spring.mqtt.url}")
  29. private String hostUrl;
  30. //@Value("${spring.mqtt.client.id}")
  31. private String clientId = String.valueOf(System.currentTimeMillis());
  32. @Value("${spring.mqtt.default.topic}")
  33. private String defaultTopic;
  34. @Value("#{'${spring.mqtt.topics}'.split(',')}")
  35. private List<String> topics ;
  36. @Value("#{'${spring.mqtt.qosValues}'.split(',')}")
  37. private List<Integer> qosValues;
  38. @Bean
  39. public MqttConnectOptions getMqttConnectOptions(){
  40. MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
  41. mqttConnectOptions.setUserName(username);
  42. mqttConnectOptions.setCleanSession(true);
  43. mqttConnectOptions.setPassword(password.toCharArray());
  44. mqttConnectOptions.setServerURIs(new String[]{hostUrl});
  45. mqttConnectOptions.setKeepAliveInterval(2);
  46. // 设置超时时间 单位为秒
  47. mqttConnectOptions.setConnectionTimeout(10);
  48. mqttConnectOptions.setMaxInflight(100000000);
  49. return mqttConnectOptions;
  50. }
  51. @Bean
  52. public MqttPahoClientFactory mqttClientFactory() {
  53. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  54. factory.setConnectionOptions(getMqttConnectOptions());
  55. return factory;
  56. }
  57. @Bean
  58. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  59. public MessageHandler mqttOutbound() {
  60. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
  61. messageHandler.setAsync(true);
  62. messageHandler.setDefaultTopic(defaultTopic);
  63. messageHandler.setDefaultRetained(false);
  64. return messageHandler;
  65. }
  66. @Bean
  67. public MessageChannel mqttOutboundChannel() {
  68. return new DirectChannel();
  69. }
  70. //接收通道
  71. @Bean
  72. public MessageChannel mqttInputChannel() {
  73. return new DirectChannel();
  74. }
  75. //配置client,监听的topic
  76. @Bean
  77. public MessageProducer inbound() {
  78. String[] strings = new String[topics.size()];
  79. Integer[] ints = new Integer[qosValues.size()];
  80. topics.toArray(strings);
  81. qosValues.toArray(ints);
  82. int[] its= Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
  83. MqttPahoMessageDrivenChannelAdapter adapter =
  84. new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),strings);
  85. adapter.setCompletionTimeout(3000);
  86. adapter.setConverter(new DefaultPahoMessageConverter());
  87. adapter.setOutputChannel(mqttInputChannel());
  88. return adapter;
  89. }
  90. //通过通道获取数据
  91. @Bean
  92. @ServiceActivator(inputChannel = "mqttInputChannel")
  93. public MessageHandler handler() {
  94. return new MessageHandler() {
  95. @Override
  96. public void handleMessage(Message<?> message) throws MessagingException {
  97. String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
  98. String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
  99. // System.out.println(topic+"|"+message.getPayload().toString());
  100. }
  101. };
  102. }
  103. }

4.编写  MqttGateway 发送消息的接口


  
  1. package com.devframe.common.config;
  2. import org.springframework.integration.annotation.MessagingGateway;
  3. import org.springframework.integration.mqtt.support.MqttHeaders;
  4. import org.springframework.messaging.handler.annotation.Header;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  8. public interface MqttGateway {
  9. void sendToMqtt(String data);
  10. void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
  11. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
  12. }

在  其他的 需要 用到 mqtt 发送消息的时候直接 @Autowired 该 接口 既可进行消息的发送 

5、编写 activemq 的 配置类


  
  1. package com.devframe.common.config;
  2. import javax.jms.Queue;
  3. import javax.jms.Topic;
  4. import org.apache.activemq.ActiveMQConnectionFactory;
  5. import org.apache.activemq.command.ActiveMQQueue;
  6. import org.apache.activemq.command.ActiveMQTopic;
  7. import org.apache.activemq.pool.PooledConnectionFactory;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
  12. import org.springframework.jms.config.JmsListenerContainerFactory;
  13. @Configuration
  14. public class ActivemqConfig {
  15. // #配置 农机上报的消息转发到的topic名称
  16. // queueName: mqttQueue
  17. // topicName: mqttTopic
  18. //@Value("${spring.queueName}")
  19. public static final String queueName ="mqttQueue";
  20. //@Value("${spring.topicName}")
  21. public static final String topicName ="mqttTopic";
  22. @Value("${spring.activemq.user}")
  23. private String usrName;
  24. @Value("${spring.activemq.password}")
  25. private String password;
  26. @Value("${spring.activemq.broker-url}")
  27. private String brokerUrl;
  28. @Bean
  29. public Queue queue(){
  30. return new ActiveMQQueue(queueName);
  31. }
  32. @Bean
  33. public Topic topic(){
  34. return new ActiveMQTopic(topicName);
  35. }
  36. //配置activemq连接工厂
  37. // @Bean
  38. // public ActiveMQConnectionFactory activeMQConnectionFactory() {
  39. // return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
  40. // }
  41. //配置连接池工厂(在高并发的情况下需要使用池连接工厂,不然当向activemq发送过多的消息时候会报错)
  42. @Bean
  43. public PooledConnectionFactory pooledConnectionFactory() {
  44. return new PooledConnectionFactory(new ActiveMQConnectionFactory(usrName, password, brokerUrl));
  45. }
  46. //配置JmsListenerContainerFactory
  47. @Bean
  48. public JmsListenerContainerFactory<?> jmsListenerContainerQueue( ){
  49. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  50. bean.setConnectionFactory(pooledConnectionFactory());
  51. return bean;
  52. }
  53. //配置发布订阅模式的JmsListenerContainerFactory,用于在消费者方指定
  54. @Bean
  55. public JmsListenerContainerFactory<?> jmsListenerContainerTopic(){
  56. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  57. //设置为发布订阅方式, 默认情况下使用的生产消费者方式
  58. bean.setPubSubDomain(true);
  59. bean.setConnectionFactory(pooledConnectionFactory());
  60. return bean;
  61. }
  62. }

 

 

 三 、AmqpTemplate 、RedisTemplate、MqttGateway ,

 1.AmqpTemplate 的使用


  
  1. package com.devframe.controller;
  2. import java.io.UnsupportedEncodingException;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.Map.Entry;
  7. import java.util.UUID;
  8. import org.springframework.amqp.core.AmqpAdmin;
  9. import org.springframework.amqp.core.AmqpManagementOperations;
  10. import org.springframework.amqp.core.AmqpTemplate;
  11. import org.springframework.amqp.core.Binding;
  12. import org.springframework.amqp.core.BindingBuilder;
  13. import org.springframework.amqp.core.Exchange;
  14. import org.springframework.amqp.core.ExchangeBuilder;
  15. import org.springframework.amqp.core.Message;
  16. import org.springframework.amqp.core.MessageProperties;
  17. import org.springframework.amqp.core.Queue;
  18. import org.springframework.amqp.core.QueueBuilder;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.ui.Model;
  21. import org.springframework.web.bind.annotation.RequestMapping;
  22. import org.springframework.web.bind.annotation.RequestMethod;
  23. import org.springframework.web.bind.annotation.RequestParam;
  24. import org.springframework.web.bind.annotation.RestController;
  25. import com.devframe.entity.OperatEntity;
  26. import io.swagger.annotations.Api;
  27. import io.swagger.annotations.ApiOperation;
  28. @RestController
  29. @Api(tags = "RabbitServerController rabbitmq测试controller")
  30. public class RabbitServerController {
  31. @Autowired
  32. private AmqpTemplate amqpTemplate;
  33. @Autowired
  34. private AmqpManagementOperations amqpManagementOperations;
  35. @Autowired
  36. private AmqpAdmin amqpAdmin;
  37. @RequestMapping(value = "/sendMsg", method = RequestMethod.POST)
  38. public String sendAmqbMsg(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
  39. if (model != null && !"".equals(msg)) {
  40. amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", msg);
  41. } else {
  42. amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", "hello world");
  43. }
  44. amqpManagementOperations.getQueues().forEach(x -> {
  45. System.out.println(x.getName());
  46. });
  47. ;
  48. return "success";
  49. }
  50. @RequestMapping(value = "/sendMsg2", method = RequestMethod.POST)
  51. public String sendAmqbMsg2(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
  52. if (model != null && !"".equals(msg)) {
  53. amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙!!!");
  54. } else {
  55. amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙");
  56. }
  57. return "success";
  58. }
  59. @RequestMapping(value = "/sendMsg3", method = RequestMethod.POST)
  60. public String sendAmqbMsg3(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
  61. if (model != null && !"".equals(msg)) {
  62. amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界!!!");
  63. } else {
  64. amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界");
  65. }
  66. return "success";
  67. }
  68. @RequestMapping(value = "/helloWorld", method = RequestMethod.POST)
  69. @ApiOperation("1对1 or 1对多 无交换机模式")
  70. public String helloWorld(Model model, @RequestParam(value = "msg", defaultValue = "1对1,无交换机模式!") String msg) {
  71. if (model != null && !"".equals(msg)) {
  72. amqpTemplate.convertAndSend("mqttQueue", msg);
  73. }
  74. return "success";
  75. }
  76. @RequestMapping(value = "/pubSub", method = RequestMethod.POST)
  77. @ApiOperation("广播发布/订阅模式")
  78. public String pubSub(Model model, @RequestParam(value = "msg", defaultValue = "广播发布/订阅模式") String msg) {
  79. if (model != null && !"".equals(msg)) {
  80. // 广播模式对于路由无效,所有的消费者都可以获取都消息
  81. amqpTemplate.convertAndSend("pubSubExchange", "", msg);
  82. }
  83. return "success";
  84. }
  85. @RequestMapping(value = "/routing", method = RequestMethod.POST)
  86. @ApiOperation("路由消息模式")
  87. public String routing(Model model, @RequestParam(value = "msg", defaultValue = "路由消息模式") String msg) {
  88. if (model != null && !"".equals(msg)) {
  89. String[] infoTyp = new String[] { "info", "warn", "error" };
  90. for (String routing : infoTyp) {
  91. amqpTemplate.convertAndSend("routingExchange", routing, msg);
  92. }
  93. }
  94. return "success";
  95. }
  96. @RequestMapping(value = "/topicMatch", method = RequestMethod.POST)
  97. @ApiOperation("主题模式")
  98. public String topicModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) {
  99. if (model != null && !"".equals(msg)) {
  100. String[] infoTyp = new String[] { "P.123.asdasd", "P.456.JQBE", "P.789.WBD", "P.ASBDJBAS" };
  101. for (String routing : infoTyp) {
  102. amqpTemplate.convertAndSend("topicExchange", routing, msg);
  103. }
  104. }
  105. return "success";
  106. }
  107. @RequestMapping(value = "/headerModal", method = RequestMethod.POST)
  108. @ApiOperation("header模式")
  109. public String headerModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg)
  110. throws UnsupportedEncodingException {
  111. if (model != null && !"".equals(msg)) {
  112. Map<String, Object> map = new HashMap<String, Object>();
  113. map.put("abc", "nb");
  114. map.put("def", "pl");
  115. map.put("jabs", "aksd");
  116. for (Entry<String, Object> entry : map.entrySet()) {
  117. MessageProperties messageProperties = new MessageProperties();
  118. messageProperties.setHeader(entry.getKey(), entry.getValue());
  119. Message message = new Message(msg.getBytes("utf-8"), messageProperties);
  120. amqpTemplate.convertAndSend("headerExchange", null, message);
  121. }
  122. }
  123. return "success";
  124. }
  125. @RequestMapping(value = "/createTaskQueue", method = RequestMethod.POST)
  126. @ApiOperation("自动创建队列并发送消息到队列")
  127. public String createTaskQueue(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg,
  128. @RequestParam(name = "queueName", defaultValue = "ff008") String queueName)
  129. throws UnsupportedEncodingException {
  130. if (model != null && !"".equals(msg)) {
  131. amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
  132. OperatEntity operatEntity = new OperatEntity();
  133. operatEntity.setId("123456");
  134. operatEntity.setIndepeid("askdnad");
  135. operatEntity.setRecordid(msg);
  136. operatEntity.setTablename(msg);
  137. amqpTemplate.convertAndSend(queueName, operatEntity);
  138. // Object object = amqpTemplate.receiveAndConvert(queueName);
  139. // if(object instanceof OperatEntity){
  140. // OperatEntity operatEntity2 = (OperatEntity)object;
  141. // System.out.println("从队列"+queueName+"收到了一个【"+operatEntity2.toString()+"】");
  142. // System.out.println(operatEntity.getId());
  143. // System.out.println(operatEntity.getIndepeid());
  144. // System.out.println(operatEntity.getRecordid());
  145. // System.out.println(operatEntity.getTablename());
  146. // }else{
  147. // String result = (String)object;
  148. // System.out.println("从队列"+queueName+"收到了一个【"+result+"】");
  149. // }
  150. }
  151. return "success";
  152. }
  153. /**
  154. * 删除以 任务名称为前缀的队列
  155. *
  156. * @param queueNamePre
  157. * @return String
  158. */
  159. @RequestMapping(value = "/deleteQueue", method = RequestMethod.POST)
  160. @ApiOperation("删除以 任务名称为前缀的队列")
  161. public String deleteQueueWithPre(String queueNamePre) {
  162. List<Queue> queues = amqpManagementOperations.getQueues();
  163. for (Queue queue : queues) {
  164. if (queue.getName().startsWith(queueNamePre)) {
  165. amqpManagementOperations.deleteQueue(queue);
  166. }
  167. }
  168. return "success";
  169. }
  170. @RequestMapping(value = "/createExchangeBindTaskQueue", method = RequestMethod.POST)
  171. @ApiOperation("自动创建交换机并绑定队列")
  172. public String createExchangeBindTaskQueue(Model model,
  173. @RequestParam(value = "msg", defaultValue = "主题模式") String msg,
  174. @RequestParam(name = "exchangeName", defaultValue = "ff008") String exchangeName)
  175. throws UnsupportedEncodingException {
  176. if (model != null && !"".equals(msg)) {
  177. // 查询交换机是否存在
  178. // Exchange exchange =
  179. // amqpManagementOperations.getExchange(exchangeName);
  180. // if(exchange==null){ //如果不存在 ,则声明该交换机
  181. String randomNum = UUID.randomUUID().toString().substring(0, 8);
  182. Exchange exchange = ExchangeBuilder.directExchange(exchangeName).durable(true).build();
  183. Queue queue = QueueBuilder.durable(exchangeName + "-" + randomNum).build();
  184. amqpAdmin.declareExchange(exchange);
  185. amqpAdmin.declareQueue(queue);
  186. // }
  187. // 否则直接 将 该队列绑定到 交换机上面,routingkey 为 生成的8位随机数
  188. amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(randomNum).noargs());
  189. amqpTemplate.convertAndSend(exchangeName, randomNum, msg);
  190. String result = (String) amqpTemplate.receiveAndConvert(queue.getName());
  191. System.out.println(result);
  192. }
  193. return "success";
  194. }
  195. /**
  196. * 删除以 任务名称的交换机
  197. *
  198. * @param queueNamePre
  199. * @return String
  200. */
  201. @RequestMapping(value = "/deleteExchange", method = RequestMethod.POST)
  202. @ApiOperation("删除以 任务名称的交换机")
  203. public String deleteExchange(String exchangeName) {
  204. // Map<String, Object> map =
  205. // amqpManagementOperations.getExchange(exchangeName).getArguments();
  206. List<Binding> bindings = amqpManagementOperations.getBindingsForExchange("/", exchangeName);
  207. for (Binding binding : bindings) {
  208. String routingkey = binding.getRoutingKey();
  209. amqpAdmin.deleteQueue(exchangeName + "-" + routingkey);
  210. }
  211. amqpAdmin.deleteExchange(exchangeName);
  212. return "success";
  213. }
  214. // rpc调用未实现
  215. }

2. RedisTemplate 的使用

 


  
  1. package com.devframe.common.util;
  2. import java.util.List;
  3. import java.util.Set;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.data.redis.core.StringRedisTemplate;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class RedisTemplateUtils {
  10. @Autowired
  11. private StringRedisTemplate stringRedisTemplate;
  12. // @Autowired
  13. // private RedisTemplate<String, Serializable> redisTemplate;
  14. public static final int LOCK_TIMEOUT = 4;
  15. // ---------------value 为String 的 操作
  16. /**
  17. * set 字符串的 key,value
  18. *
  19. * @param key
  20. * @param value
  21. */
  22. public void set(String key, String value) {
  23. stringRedisTemplate.opsForValue().set(key, value);
  24. }
  25. /**
  26. * 根据key获取字符串的 value
  27. *
  28. * @param key
  29. */
  30. public String get(String key) {
  31. return stringRedisTemplate.opsForValue().get(key);
  32. }
  33. /**
  34. * 根据 key 删除
  35. *
  36. * @param key
  37. */
  38. public void del(String key) {
  39. stringRedisTemplate.opsForValue().getOperations().delete(key);
  40. }
  41. // ---------------value 为 List 的操作
  42. /**
  43. * push 元素到 list中
  44. */
  45. public void lpush(String key,String value) {
  46. stringRedisTemplate.opsForList().leftPush(key, value);
  47. }
  48. /**
  49. * 获取 list中 某个下标的元素
  50. * @param key
  51. * @param index
  52. */
  53. public String lindex(String key,long index) {
  54. return stringRedisTemplate.opsForList().index(key, index);
  55. }
  56. /**
  57. * 根据key 获取 list的长度
  58. * @param key
  59. * @return
  60. */
  61. public long llen(String key) {
  62. return stringRedisTemplate.opsForList().size(key);
  63. }
  64. /**
  65. * 根据 key,start,end 获取某一个区间的 list数据集
  66. * @param key
  67. * @param start
  68. * @param end
  69. * @return
  70. */
  71. public List<String> lrange(String key, long start, long end){
  72. return stringRedisTemplate.opsForList().range(key, start, end);
  73. }
  74. /**
  75. * 根据 key值 pattern查询所有匹配的值,比如login*
  76. * @param key
  77. * @return
  78. */
  79. public Set<String> keys(String key){
  80. return stringRedisTemplate.keys(key);
  81. }
  82. /**
  83. * 加锁
  84. *
  85. * @param key productId - 商品的唯一标志
  86. * @param value 当前时间+超时时间 也就是时间戳
  87. * @return
  88. */
  89. public boolean lock(String key,String value) {
  90. if (stringRedisTemplate.opsForValue().setIfAbsent(key, value)) {// 对应setnx命令
  91. // 可以成功设置,也就是key不存在
  92. return true;
  93. }
  94. // 判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁
  95. String currentValue = stringRedisTemplate.opsForValue().get(key);
  96. // 如果锁过期
  97. if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {// currentValue不为空且小于当前时间
  98. // 获取上一个锁的时间value
  99. String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value);// 对应getset,如果key存在
  100. // 假设两个线程同时进来这里,因为key被占用了,而且锁过期了。获取的值currentValue=A(get取的旧的值肯定是一样的),两个线程的value都是B,key都是K.锁时间已经过期了。
  101. // 而这里面的getAndSet一次只会一个执行,也就是一个执行之后,上一个的value已经变成了B。只有一个线程获取的上一个值会是A,另一个线程拿到的值是B。
  102. if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
  103. // oldValue不为空且oldValue等于currentValue,也就是校验是不是上个对应的商品时间戳,也是防止并发
  104. return true;
  105. }
  106. }
  107. return false;
  108. }
  109. /**
  110. * 解锁
  111. *
  112. * @param key
  113. * @param value
  114. */
  115. public void unlock(String key,String value) {
  116. try {
  117. String currentValue = stringRedisTemplate.opsForValue().get(key);
  118. if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
  119. stringRedisTemplate.opsForValue().getOperations().delete(key);// 删除key
  120. }
  121. } catch (Exception e) {
  122. Log.error(e);
  123. }
  124. }
  125. }

 3.MqttGateway 的使用


  
  1. @Autowired
  2. private MqttGateway mqttGateway;
  3. @RequestMapping("/sendMsg")
  4. public String sendMsg(String sendData,String topic){
  5. try {
  6. mqttGateway.sendToMqtt(sendData,topic);
  7. mqttGateway.sendToMqtt(topic, 0, sendData);
  8. return "发送成功";
  9. }catch (Exception e) {
  10. // TODO: handle exception
  11. Log.error(e);
  12. return "发送失败";
  13. }
  14. }

4、jmsMessagingTemplate 的使用


  
  1. @@Autowired
  2. private JmsMessagingTemplate jmsMessagingTemplate;
  3. jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(ActivemqConfig.topicName), payload);

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/86233738

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。