文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

rabbitMq动态创建和监听队列

2023-09-15 21:08

关注

rabbitMq动态创建和监听队列

背景

在与第三方业务员系统对接时,需要根据第三方的信息的进行队列的创建,且个数不定,这就造成使用@RabbitListener来添加监听不方便。本文采用了当业务需要时,动态的创建队列和监听队列的方式,适合某个任务为一组的队列方式,需要考虑队列使用完成后的处理方式。

引入jar

 <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

application.properties

// mq相关配置,简单使用rabbitmq只需配置以下几个配置项即可使用spring.rabbitmq.host=124.222.229.252spring.rabbitmq.port=15677spring.rabbitmq.username=guestspring.rabbitmq.password=guest

生产者端

封装方法,动态创建队列

    @Autowired    private AmqpAdmin amqpAdmin;    @Override    public void createQueue(String queueName) {        //判断队列是否存在        if (!isExistQueue(queueName)) {            //创建队列            amqpAdmin.declareQueue(new Queue(queueName, true));        }        logger.info("创建队列:{}成功",queueName);    }        public boolean isExistQueue(String queueName){        boolean flag = true;        if(StringUtils.isNotBlank(queueName)){            Properties queueProperties = amqpAdmin.getQueueProperties(queueName);            if (queueProperties == null) {                flag = false;            }        }else {            throw new RuntimeException("队列名称为空");        }        return flag;    }

往指定队列发送消息

@Autowired    private RabbitTemplate rabbitTemplate;    @Override    public void sendMessage(String queueName, String tatus) {        if(!isExistQueue(queueName)){            throw new RuntimeException("发送消息失败,队列为空");        }        //定义业务消息实体        SignTaskMessageVo messageVo = new SignTaskMessageVo();        messageVo.setTaskId(queueName);        messageVo.setStatus(taskStatus);        messageVo.setSendTime(DateUtils.getCurrentTime());        //往指定队列发送消息        rabbitTemplate.convertAndSend(queueName, JSONObject.toJSONString(messageVo));        logger.info("发送消息成功:{}", messageVo);    }

消费者端

java动态添加监听的队列

配置类

@Configurationpublic class RabbitConfig {        @Bean    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);        return container;    }} 

消息监听处理类

@Componentpublic class MessageListenerImpl implements MessageListener {    private static final Logger logger = LoggerFactory.getLogger(MessageListenerImpl.class);    @Override    public void onMessage(Message message) {        try {            String mes = new String(message.getBody(), "utf-8");            logger.info("监听到消息:{}",mes);            //todo 业务处理        } catch (UnsupportedEncodingException e) {            e.printStackTrace();        }    }}

动态添加监听

 @Autowired    private SimpleMessageListenerContainer container;    @Autowired    private MessageListenerImpl message;@Override    public void addListener(String queueName) {      //获取当前监听的队列名称        String[] strings=container.getQueueNames();        List<String> list=Arrays.asList(strings);        if (!list.contains(queueName)) {            container.addQueueNames(queueName);            //设置消息监听处理类            container.setMessageListener(message);        }    }

前端js监听并消费

引入stomp.js(cv大法或者自己引入)

// Generated by CoffeeScript 1.7.1(function() {  var Byte, Client, Frame, Stomp,    __hasProp = {}.hasOwnProperty,    __slice = [].slice;  Byte = {    LF: '\x0A',    NULL: '\x00'  };  Frame = (function() {    var unmarshallSingle;    function Frame(command, headers, body) {      this.command = command;      this.headers = headers != null ? headers : {};      this.body = body != null ? body : '';    }    Frame.prototype.toString = function() {      var lines, name, skipContentLength, value, _ref;      lines = [this.command];      skipContentLength = this.headers['content-length'] === false ? true : false;      if (skipContentLength) {        delete this.headers['content-length'];      }      _ref = this.headers;      for (name in _ref) {        if (!__hasProp.call(_ref, name)) continue;        value = _ref[name];        lines.push("" + name + ":" + value);      }      if (this.body && !skipContentLength) {        lines.push("content-length:" + (Frame.sizeOfUTF8(this.body)));      }      lines.push(Byte.LF + this.body);      return lines.join(Byte.LF);    };    Frame.sizeOfUTF8 = function(s) {      if (s) {        return encodeURI(s).match(/%..|./g).length;      } else {        return 0;      }    };    unmarshallSingle = function(data) {      var body, chr, command, divider, headerLines, headers, i, idx, len, line, start, trim, _i, _j, _len, _ref, _ref1;      divider = data.search(RegExp("" + Byte.LF + Byte.LF));      headerLines = data.substring(0, divider).split(Byte.LF);      command = headerLines.shift();      headers = {};      trim = function(str) {        return str.replace(/^\s+|\s+$/g, '');      };      _ref = headerLines.reverse();      for (_i = 0, _len = _ref.length; _i < _len; _i++) {        line = _ref[_i];        idx = line.indexOf(':');        headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1));      }      body = '';      start = divider + 2;      if (headers['content-length']) {        len = parseInt(headers['content-length']);        body = ('' + data).substring(start, start + len);      } else {        chr = null;        for (i = _j = start, _ref1 = data.length; start <= _ref1 ? _j < _ref1 : _j > _ref1; i = start <= _ref1 ? ++_j : --_j) {          chr = data.charAt(i);          if (chr === Byte.NULL) {            break;          }          body += chr;        }      }      return new Frame(command, headers, body);    };    Frame.unmarshall = function(datas) {      var data;      return (function() {        var _i, _len, _ref, _results;        _ref = datas.split(RegExp("" + Byte.NULL + Byte.LF + "*"));        _results = [];        for (_i = 0, _len = _ref.length; _i < _len; _i++) {          data = _ref[_i];          if ((data != null ? data.length : void 0) > 0) {            _results.push(unmarshallSingle(data));          }        }        return _results;      })();    };    Frame.marshall = function(command, headers, body) {      var frame;      frame = new Frame(command, headers, body);      return frame.toString() + Byte.NULL;    };        return Frame;  })();  Client = (function() {    var now;    function Client(ws) {      this.ws = ws;      this.ws.binaryType = "arraybuffer";      this.counter = 0;      this.connected = false;      this.heartbeat = {        outgoing: 10000,        incoming: 10000      };      this.maxWebSocketFrameSize = 16 * 1024;      this.subscriptions = {};    }    Client.prototype.debug = function(message) {      var _ref;      return typeof window !== "undefined" && window !== null ? (_ref = window.console) != null ? _ref.log(message) : void 0 : void 0;    };    now = function() {      if (Date.now) {        return Date.now();      } else {        return new Date().valueOf;      }    };    Client.prototype._transmit = function(command, headers, body) {      var out;      out = Frame.marshall(command, headers, body);      if (typeof this.debug === "function") {        this.debug(">>> " + out);      }      while (true) {        if (out.length > this.maxWebSocketFrameSize) {          this.ws.send(out.substring(0, this.maxWebSocketFrameSize));          out = out.substring(this.maxWebSocketFrameSize);          if (typeof this.debug === "function") {            this.debug("remaining = " + out.length);          }        } else {          return this.ws.send(out);        }      }    };    Client.prototype._setupHeartbeat = function(headers) {      var serverIncoming, serverOutgoing, ttl, v, _ref, _ref1;      if ((_ref = headers.version) !== Stomp.VERSIONS.V1_1 && _ref !== Stomp.VERSIONS.V1_2) {        return;      }      _ref1 = (function() {        var _i, _len, _ref1, _results;        _ref1 = headers['heart-beat'].split(",");        _results = [];        for (_i = 0, _len = _ref1.length; _i < _len; _i++) {          v = _ref1[_i];          _results.push(parseInt(v));        }        return _results;      })(), serverOutgoing = _ref1[0], serverIncoming = _ref1[1];      if (!(this.heartbeat.outgoing === 0 || serverIncoming === 0)) {        ttl = Math.max(this.heartbeat.outgoing, serverIncoming);        if (typeof this.debug === "function") {          this.debug("send PING every " + ttl + "ms");        }        this.pinger = Stomp.setInterval(ttl, (function(_this) {          return function() {            _this.ws.send(Byte.LF);            return typeof _this.debug === "function" ? _this.debug(">>> PING") : void 0;          };        })(this));      }      if (!(this.heartbeat.incoming === 0 || serverOutgoing === 0)) {        ttl = Math.max(this.heartbeat.incoming, serverOutgoing);        if (typeof this.debug === "function") {          this.debug("check PONG every " + ttl + "ms");        }        return this.ponger = Stomp.setInterval(ttl, (function(_this) {          return function() {            var delta;            delta = now() - _this.serverActivity;            if (delta > ttl * 2) {              if (typeof _this.debug === "function") {                _this.debug("did not receive server activity for the last " + delta + "ms");              }              return _this.ws.close();            }          };        })(this));      }    };    Client.prototype._parseConnect = function() {      var args, connectCallback, errorCallback, headers;      args = 1 <= arguments.length ? __slice.call(arguments, 0) : [];      headers = {};      switch (args.length) {        case 2:          headers = args[0], connectCallback = args[1];          break;        case 3:          if (args[1] instanceof Function) {            headers = args[0], connectCallback = args[1], errorCallback = args[2];          } else {            headers.login = args[0], headers.passcode = args[1], connectCallback = args[2];          }          break;        case 4:          headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3];          break;        default:          headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3], headers.host = args[4];      }      return [headers, connectCallback, errorCallback];    };    Client.prototype.connect = function() {      var args, errorCallback, headers, out;      args = 1 <= arguments.length ? __slice.call(arguments, 0) : [];      out = this._parseConnect.apply(this, args);      headers = out[0], this.connectCallback = out[1], errorCallback = out[2];      if (typeof this.debug === "function") {        this.debug("Opening Web Socket...");      }      this.ws.onmessage = (function(_this) {        return function(evt) {          var arr, c, client, data, frame, messageID, onreceive, subscription, _i, _len, _ref, _results;          data = typeof ArrayBuffer !== 'undefined' && evt.data instanceof ArrayBuffer ? (arr = new Uint8Array(evt.data), typeof _this.debug === "function" ? _this.debug("--- got data length: " + arr.length) : void 0, ((function() {            var _i, _len, _results;            _results = [];            for (_i = 0, _len = arr.length; _i < _len; _i++) {              c = arr[_i];              _results.push(String.fromCharCode(c));            }            return _results;          })()).join('')) : evt.data;          _this.serverActivity = now();          if (data === Byte.LF) {            if (typeof _this.debug === "function") {              _this.debug("<<< PONG");            }            return;          }          if (typeof _this.debug === "function") {            _this.debug("<<< " + data);          }          _ref = Frame.unmarshall(data);          _results = [];          for (_i = 0, _len = _ref.length; _i < _len; _i++) {            frame = _ref[_i];            switch (frame.command) {              case "CONNECTED":                if (typeof _this.debug === "function") {                  _this.debug("connected to server " + frame.headers.server);                }                _this.connected = true;                _this._setupHeartbeat(frame.headers);                _results.push(typeof _this.connectCallback === "function" ? _this.connectCallback(frame) : void 0);                break;              case "MESSAGE":                subscription = frame.headers.subscription;                onreceive = _this.subscriptions[subscription] || _this.onreceive;                if (onreceive) {                  client = _this;                  messageID = frame.headers["message-id"];                  frame.ack = function(headers) {                    if (headers == null) {                      headers = {};                    }                    return client.ack(messageID, subscription, headers);                  };                  frame.nack = function(headers) {                    if (headers == null) {                      headers = {};                    }                    return client.nack(messageID, subscription, headers);                  };                  _results.push(onreceive(frame));                } else {                  _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled received MESSAGE: " + frame) : void 0);                }                break;              case "RECEIPT":                _results.push(typeof _this.onreceipt === "function" ? _this.onreceipt(frame) : void 0);                break;              case "ERROR":                _results.push(typeof errorCallback === "function" ? errorCallback(frame) : void 0);                break;              default:                _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled frame: " + frame) : void 0);            }          }          return _results;        };      })(this);      this.ws.onclose = (function(_this) {        return function() {          var msg;          msg = "Whoops! Lost connection to " + _this.ws.url;          if (typeof _this.debug === "function") {            _this.debug(msg);          }          _this._cleanUp();          return typeof errorCallback === "function" ? errorCallback(msg) : void 0;        };      })(this);      return this.ws.onopen = (function(_this) {        return function() {          if (typeof _this.debug === "function") {            _this.debug('Web Socket Opened...');          }          headers["accept-version"] = Stomp.VERSIONS.supportedVersions();          headers["heart-beat"] = [_this.heartbeat.outgoing, _this.heartbeat.incoming].join(',');          return _this._transmit("CONNECT", headers);        };      })(this);    };    Client.prototype.disconnect = function(disconnectCallback, headers) {      if (headers == null) {        headers = {};      }      this._transmit("DISCONNECT", headers);      this.ws.onclose = null;      this.ws.close();      this._cleanUp();      return typeof disconnectCallback === "function" ? disconnectCallback() : void 0;    };    Client.prototype._cleanUp = function() {      this.connected = false;      if (this.pinger) {        Stomp.clearInterval(this.pinger);      }      if (this.ponger) {        return Stomp.clearInterval(this.ponger);      }    };    Client.prototype.send = function(destination, headers, body) {      if (headers == null) {        headers = {};      }      if (body == null) {        body = '';      }      headers.destination = destination;      return this._transmit("SEND", headers, body);    };    Client.prototype.subscribe = function(destination, callback, headers) {      var client;      if (headers == null) {        headers = {};      }      if (!headers.id) {        headers.id = "sub-" + this.counter++;      }      headers.destination = destination;      this.subscriptions[headers.id] = callback;      this._transmit("SUBSCRIBE", headers);      client = this;      return {        id: headers.id,        unsubscribe: function() {          return client.unsubscribe(headers.id);        }      };    };    Client.prototype.unsubscribe = function(id) {      delete this.subscriptions[id];      return this._transmit("UNSUBSCRIBE", {        id: id      });    };    Client.prototype.begin = function(transaction) {      var client, txid;      txid = transaction || "tx-" + this.counter++;      this._transmit("BEGIN", {        transaction: txid      });      client = this;      return {        id: txid,        commit: function() {          return client.commit(txid);        },        abort: function() {          return client.abort(txid);        }      };    };    Client.prototype.commit = function(transaction) {      return this._transmit("COMMIT", {        transaction: transaction      });    };    Client.prototype.abort = function(transaction) {      return this._transmit("ABORT", {        transaction: transaction      });    };    Client.prototype.ack = function(messageID, subscription, headers) {      if (headers == null) {        headers = {};      }      headers["message-id"] = messageID;      headers.subscription = subscription;      return this._transmit("ACK", headers);    };    Client.prototype.nack = function(messageID, subscription, headers) {      if (headers == null) {        headers = {};      }      headers["message-id"] = messageID;      headers.subscription = subscription;      return this._transmit("NACK", headers);    };    return Client;  })();  Stomp = {    VERSIONS: {      V1_0: '1.0',      V1_1: '1.1',      V1_2: '1.2',      supportedVersions: function() {        return '1.1,1.0';      }    },    client: function(url, protocols) {      var klass, ws;      if (protocols == null) {        protocols = ['v10.stomp', 'v11.stomp'];      }      klass = Stomp.WebSocketClass || WebSocket;      ws = new klass(url, protocols);      return new Client(ws);    },    over: function(ws) {      return new Client(ws);    },    Frame: Frame  };  if (typeof exports !== "undefined" && exports !== null) {    exports.Stomp = Stomp;  }  if (typeof window !== "undefined" && window !== null) {    Stomp.setInterval = function(interval, f) {      return window.setInterval(f, interval);    };    Stomp.clearInterval = function(id) {      return window.clearInterval(id);    };    window.Stomp = Stomp;  } else if (!exports) {    self.Stomp = Stomp;  }}).call(this);

index.html代码示例

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <meta name="viewport" content="width=device-width, initial-scale=1.0">    <meta http-equiv="X-UA-Compatible" content="ie=edge">    <title>消费者示例</title>    //需要引入stomp.js    <script src="./stomp.js"></script></head><body>监听到的消息:<span id ="dataId"> </span><br>    <script>    var queueName = "ceshi-1";//获取到的队列名称        // 初始化 ws 对象,需要rabbitMq开放协议,并监听ws端口号        var ws = new WebSocket('ws://124.222.229.252:15674/ws');         // 获得Stomp client对象        var client = Stomp.over(ws);        // 指定适当的心跳参数保持长时间稳定连接        client.heartbeat.outgoing = 10000;        client.heartbeat.incoming = 10000;         // 定义连接成功回调函数        var on_connect = function (x) {            //订阅名为 xxx队列中的消息 ,并手动消费            client.subscribe(queueName, function (data) {                var msg = data.body;               console.log("收到数据:" + msg);               data.ack();               document.getElementById("dataId").innerText=msg;            },{ack:'client'});//由客户端确认已经收到消息           };       //定义连接成功回调函数                 // 定义错误时回调函数        var on_error = function () {            console.log('error');        };        // 连接RabbitMQ        client.connect('guest', 'guest', on_connect, on_error, '/');                      //向名为debugQueue的队列中发送消息 ,并指定routingKey                       //程序退出时候,断开连接        //client.disconnect();    </script></body></html>

打开index.html

在这里插入图片描述

来源地址:https://blog.csdn.net/qq_42223485/article/details/130052546

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯