activeMQ消息队列连接池

  • #activeMQ消息队列连接池创建实战
  • 适合了解MQ基本知识的同学
  • ## 为什么使用连接池 ##
  • 客户端发送请求的时候如果每次都创建Connection会消耗大量的资源,尤其是在高并发的情况下,服务器会被直接打死。试想北京到上海的铁路,如果每去一次就创建一条铁路是多么坑的事。不仅仅创建Connection会消耗资源,session、producer的创建也会消耗大量系统资源,所以针对这些资源都要相应的创建对应的连接池。
  • -

创建连接池

package com.swkj.pool;import com.swkj.commom.MessageType;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.util.LinkedList;/*** @author wangdongdong* @version V1.0* @Description: AMQ连接池管理类* @date 2017/10/25 0025 15:50*/
public class AMQPoolFactory {private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //默认连接地址private int maxConnection = 10;  //connection最大连接数private int minConnection = 1;    //connection最小连接数private int maxSesstionConnection = 100; //每个connection可建的最大session数private int minSesstionConnection = 10; //每个connection可建的最小session数private int maxProducerSession = 10;    //每个session可建的最大producer数private int maxConsumerSession = 10;   //每个session可建的最大consumer数private ConnectionFactory factory;      //jms工厂private LinkedList poolConnection = new LinkedList();private LinkedList poolSession = new LinkedList();/*** 初始化AMQ连接池,生成连接和会话信息** @throws Exception*/private void initFactory() throws Exception {try {factory = new ActiveMQConnectionFactory(BROKEURL);if (minConnection > 0 && minConnection <= maxConnection) {for (int i = 0; i < minConnection; i++) {Connection connection = factory.createConnection(USERNAME, PASSWORD);connection.start();ConnectionPool connPool = new ConnectionPool();connPool.setConnection(connection);//存放Conn连接if (minSesstionConnection > 0 && minSesstionConnection <= maxSesstionConnection) {connPool.setActiveSessions(minSesstionConnection);//设置当前存在session数目for (int j = 0; j < minSesstionConnection; j++) {Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//为自动确认,pro con不用关心,由activemq确认信息是否到达。SessionPool sessionPool = new SessionPool();sessionPool.setConnection(connection);sessionPool.setSession(session);poolSession.addLast(sessionPool);}poolConnection.addLast(connPool);} else {throw new Exception("AMQ配置minSessionConnection和maxSessionConnection错误");}}} else {throw new Exception("AMQ配置minConnections和maxConnections错误");}} catch (JMSException e) {throw new Exception("AMQ初始化异常", e);}}/*** @author 王冬冬* @Description: 从连接池中获取connection连接* @date 2017/10/25 0025 16:21* @version V1.0*/private ConnectionPool getConnection() throws Exception {ConnectionPool connPool = null;if (poolConnection != null && poolConnection.size() > 0) {for (ConnectionPool connectionPool : poolConnection) {int poolSessionSize = connectionPool.getActiveSessions();if (poolSessionSize < maxSesstionConnection) {connPool = connectionPool;//取会话比较少的连接}}if (connPool == null && poolConnection.size() < maxConnection) {//当前连接池耗尽的时候需要新创建连接与会话try {Connection conn = factory.createConnection(USERNAME, PASSWORD);conn.start();connPool = new ConnectionPool();connPool.setConnection(conn);//存放Conn连接if (minSesstionConnection > 0 && minSesstionConnection <= maxSesstionConnection) {connPool.setActiveSessions(minSesstionConnection);//设置当前存在session数目for (int j = 0; j < minSesstionConnection; j++) {Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);SessionPool sessionPool = new SessionPool();sessionPool.setSession(session);sessionPool.setConnection(conn);poolSession.addLast(sessionPool);}}poolConnection.addLast(connPool);} catch (JMSException e) {throw new Exception("getConnection方法创建Connection异常", e);}}}return connPool;}/*** @author 王冬冬* @Description: 池中获取producer的session* @date 2017/10/25 0025 16:32* @version V1.0*/private SessionPool getProducerSession() throws Exception {SessionPool sesPool = null;if (poolSession != 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部