|
| 1 | +// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved. |
| 2 | +// |
| 3 | +// This software, the RabbitMQ Java client library, is triple-licensed under the |
| 4 | +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 |
| 5 | +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see |
| 6 | +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, |
| 7 | +// please see LICENSE-APACHE2. |
| 8 | +// |
| 9 | +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, |
| 10 | +// either express or implied. See the LICENSE file for specific language governing |
| 11 | +// rights and limitations of this software. |
| 12 | +// |
| 13 | +// If you have any questions regarding licensing, please contact us at |
| 14 | + |
| 15 | + |
| 16 | +package com.rabbitmq.client; |
| 17 | + |
| 18 | +import com.rabbitmq.client.impl.AMQConnection; |
| 19 | +import com.rabbitmq.client.impl.nio.NioParams; |
| 20 | + |
| 21 | +import java.io.BufferedReader; |
| 22 | +import java.io.FileReader; |
| 23 | +import java.io.IOException; |
| 24 | +import java.io.InputStream; |
| 25 | +import java.io.Reader; |
| 26 | +import java.net.URISyntaxException; |
| 27 | +import java.security.KeyManagementException; |
| 28 | +import java.security.NoSuchAlgorithmException; |
| 29 | +import java.util.HashMap; |
| 30 | +import java.util.Map; |
| 31 | +import java.util.Properties; |
| 32 | + |
| 33 | +/** |
| 34 | + * Helper class to load {@link ConnectionFactory} settings from a property file. |
| 35 | + * |
| 36 | + * The authorised keys are the constants values in this class (e.g. USERNAME). |
| 37 | + * The property file/properties instance/map instance keys can have |
| 38 | + * a prefix, the default being <code>rabbitmq.</code>. |
| 39 | + * |
| 40 | + * Property files can be loaded from the file system (the default), |
| 41 | + * but also from the classpath, by using the <code>classpath:</code> prefix |
| 42 | + * in the location. |
| 43 | + * |
| 44 | + * If default client properties should be set, set the <code>use.default.client.properties</code> |
| 45 | + * key to <code>true</code>. Custom client properties can be set by using |
| 46 | + * the <code>client.properties.</code>, e.g. <code>client.properties.app.name</code>. |
| 47 | + * |
| 48 | + * @since 4.4.0 |
| 49 | + * @see ConnectionFactory#load(String, String) |
| 50 | + */ |
| 51 | +public class ConnectionFactoryConfigurator { |
| 52 | + |
| 53 | + public static final String DEFAULT_PREFIX = "rabbitmq."; |
| 54 | + |
| 55 | + public static final String USERNAME = "username"; |
| 56 | + public static final String PASSWORD = "password"; |
| 57 | + public static final String VIRTUAL_HOST = "virtual.host"; |
| 58 | + public static final String HOST = "host"; |
| 59 | + public static final String PORT = "port"; |
| 60 | + public static final String CONNECTION_CHANNEL_MAX = "connection.channel.max"; |
| 61 | + public static final String CONNECTION_FRAME_MAX = "connection.frame.max"; |
| 62 | + public static final String CONNECTION_HEARTBEAT = "connection.heartbeat"; |
| 63 | + public static final String CONNECTION_TIMEOUT = "connection.timeout"; |
| 64 | + public static final String HANDSHAKE_TIMEOUT = "handshake.timeout"; |
| 65 | + public static final String SHUTDOWN_TIMEOUT = "shutdown.timeout"; |
| 66 | + public static final String USE_DEFAULT_CLIENT_PROPERTIES = "use.default.client.properties"; |
| 67 | + public static final String CLIENT_PROPERTIES_PREFIX = "client.properties."; |
| 68 | + public static final String CONNECTION_RECOVERY_ENABLED = "connection.recovery.enabled"; |
| 69 | + public static final String TOPOLOGY_RECOVERY_ENABLED = "topology.recovery.enabled"; |
| 70 | + public static final String CONNECTION_RECOVERY_INTERVAL = "connection.recovery.interval"; |
| 71 | + public static final String CHANNEL_RPC_TIMEOUT = "channel.rpc.timeout"; |
| 72 | + public static final String CHANNEL_SHOULD_CHECK_RPC_RESPONSE_TYPE = "channel.should.check.rpc.response.type"; |
| 73 | + public static final String USE_NIO = "use.nio"; |
| 74 | + public static final String NIO_READ_BYTE_BUFFER_SIZE = "nio.read.byte.buffer.size"; |
| 75 | + public static final String NIO_WRITE_BYTE_BUFFER_SIZE = "nio.write.byte.buffer.size"; |
| 76 | + public static final String NIO_NB_IO_THREADS = "nio.nb.io.threads"; |
| 77 | + public static final String NIO_WRITE_ENQUEUING_TIMEOUT_IN_MS = "nio.write.enqueuing.timeout.in.ms"; |
| 78 | + public static final String NIO_WRITE_QUEUE_CAPACITY = "nio.write.queue.capacity"; |
| 79 | + |
| 80 | + public static void load(ConnectionFactory cf, String propertyFileLocation, String prefix) throws IOException { |
| 81 | + if (propertyFileLocation == null || propertyFileLocation.isEmpty()) { |
| 82 | + throw new IllegalArgumentException("Property file argument cannot be null or empty"); |
| 83 | + } |
| 84 | + Properties properties = new Properties(); |
| 85 | + if (propertyFileLocation.startsWith("classpath:")) { |
| 86 | + InputStream in = null; |
| 87 | + try { |
| 88 | + in = ConnectionFactoryConfigurator.class.getResourceAsStream( |
| 89 | + propertyFileLocation.substring("classpath:".length()) |
| 90 | + ); |
| 91 | + properties.load(in); |
| 92 | + } finally { |
| 93 | + if (in != null) { |
| 94 | + in.close(); |
| 95 | + } |
| 96 | + } |
| 97 | + } else { |
| 98 | + Reader reader = null; |
| 99 | + try { |
| 100 | + reader = new BufferedReader(new FileReader(propertyFileLocation)); |
| 101 | + properties.load(reader); |
| 102 | + } finally { |
| 103 | + if (reader != null) { |
| 104 | + reader.close(); |
| 105 | + } |
| 106 | + } |
| 107 | + } |
| 108 | + load(cf, (Map) properties, prefix); |
| 109 | + } |
| 110 | + |
| 111 | + public static void load(ConnectionFactory cf, Map<String, String> properties, String prefix) { |
| 112 | + prefix = prefix == null ? "" : prefix; |
| 113 | + String uri = properties.get(prefix + "uri"); |
| 114 | + if (uri != null) { |
| 115 | + try { |
| 116 | + cf.setUri(uri); |
| 117 | + } catch (URISyntaxException e) { |
| 118 | + throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e); |
| 119 | + } catch (NoSuchAlgorithmException e) { |
| 120 | + throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e); |
| 121 | + } catch (KeyManagementException e) { |
| 122 | + throw new IllegalArgumentException("Error while setting AMQP URI: "+uri, e); |
| 123 | + } |
| 124 | + } |
| 125 | + String username = properties.get(prefix + USERNAME); |
| 126 | + if (username != null) { |
| 127 | + cf.setUsername(username); |
| 128 | + } |
| 129 | + String password = properties.get(prefix + PASSWORD); |
| 130 | + if (password != null) { |
| 131 | + cf.setPassword(password); |
| 132 | + } |
| 133 | + String vhost = properties.get(prefix + VIRTUAL_HOST); |
| 134 | + if (vhost != null) { |
| 135 | + cf.setVirtualHost(vhost); |
| 136 | + } |
| 137 | + String host = properties.get(prefix + HOST); |
| 138 | + if (host != null) { |
| 139 | + cf.setHost(host); |
| 140 | + } |
| 141 | + String port = properties.get(prefix + PORT); |
| 142 | + if (port != null) { |
| 143 | + cf.setPort(Integer.valueOf(port)); |
| 144 | + } |
| 145 | + String requestedChannelMax = properties.get(prefix + CONNECTION_CHANNEL_MAX); |
| 146 | + if (requestedChannelMax != null) { |
| 147 | + cf.setRequestedChannelMax(Integer.valueOf(requestedChannelMax)); |
| 148 | + } |
| 149 | + String requestedFrameMax = properties.get(prefix + CONNECTION_FRAME_MAX); |
| 150 | + if (requestedFrameMax != null) { |
| 151 | + cf.setRequestedFrameMax(Integer.valueOf(requestedFrameMax)); |
| 152 | + } |
| 153 | + String requestedHeartbeat = properties.get(prefix + CONNECTION_HEARTBEAT); |
| 154 | + if (requestedHeartbeat != null) { |
| 155 | + cf.setRequestedHeartbeat(Integer.valueOf(requestedHeartbeat)); |
| 156 | + } |
| 157 | + String connectionTimeout = properties.get(prefix + CONNECTION_TIMEOUT); |
| 158 | + if (connectionTimeout != null) { |
| 159 | + cf.setConnectionTimeout(Integer.valueOf(connectionTimeout)); |
| 160 | + } |
| 161 | + String handshakeTimeout = properties.get(prefix + HANDSHAKE_TIMEOUT); |
| 162 | + if (handshakeTimeout != null) { |
| 163 | + cf.setHandshakeTimeout(Integer.valueOf(handshakeTimeout)); |
| 164 | + } |
| 165 | + String shutdownTimeout = properties.get(prefix + SHUTDOWN_TIMEOUT); |
| 166 | + if (shutdownTimeout != null) { |
| 167 | + cf.setShutdownTimeout(Integer.valueOf(shutdownTimeout)); |
| 168 | + } |
| 169 | + |
| 170 | + Map<String, Object> clientProperties = new HashMap<String, Object>(); |
| 171 | + String useDefaultClientProperties = properties.get(prefix + USE_DEFAULT_CLIENT_PROPERTIES); |
| 172 | + if (useDefaultClientProperties != null && Boolean.valueOf(useDefaultClientProperties)) { |
| 173 | + clientProperties.putAll(AMQConnection.defaultClientProperties()); |
| 174 | + } |
| 175 | + |
| 176 | + for (Map.Entry<String, String> entry : properties.entrySet()) { |
| 177 | + if (entry.getKey().startsWith(prefix + CLIENT_PROPERTIES_PREFIX)) { |
| 178 | + clientProperties.put( |
| 179 | + entry.getKey().substring((prefix + CLIENT_PROPERTIES_PREFIX).length()), |
| 180 | + entry.getValue() |
| 181 | + ); |
| 182 | + } |
| 183 | + } |
| 184 | + cf.setClientProperties(clientProperties); |
| 185 | + |
| 186 | + String automaticRecovery = properties.get(prefix + CONNECTION_RECOVERY_ENABLED); |
| 187 | + if (automaticRecovery != null) { |
| 188 | + cf.setAutomaticRecoveryEnabled(Boolean.valueOf(automaticRecovery)); |
| 189 | + } |
| 190 | + String topologyRecovery = properties.get(prefix + TOPOLOGY_RECOVERY_ENABLED); |
| 191 | + if (topologyRecovery != null) { |
| 192 | + cf.setTopologyRecoveryEnabled(Boolean.getBoolean(topologyRecovery)); |
| 193 | + } |
| 194 | + String networkRecoveryInterval = properties.get(prefix + CONNECTION_RECOVERY_INTERVAL); |
| 195 | + if (networkRecoveryInterval != null) { |
| 196 | + cf.setNetworkRecoveryInterval(Long.valueOf(networkRecoveryInterval)); |
| 197 | + } |
| 198 | + String channelRpcTimeout = properties.get(prefix + CHANNEL_RPC_TIMEOUT); |
| 199 | + if (channelRpcTimeout != null) { |
| 200 | + cf.setChannelRpcTimeout(Integer.valueOf(channelRpcTimeout)); |
| 201 | + } |
| 202 | + String channelShouldCheckRpcResponseType = properties.get(prefix + CHANNEL_SHOULD_CHECK_RPC_RESPONSE_TYPE); |
| 203 | + if (channelShouldCheckRpcResponseType != null) { |
| 204 | + cf.setChannelShouldCheckRpcResponseType(Boolean.valueOf(channelShouldCheckRpcResponseType)); |
| 205 | + } |
| 206 | + |
| 207 | + String useNio = properties.get(prefix + USE_NIO); |
| 208 | + if (useNio != null && Boolean.valueOf(useNio)) { |
| 209 | + cf.useNio(); |
| 210 | + |
| 211 | + NioParams nioParams = new NioParams(); |
| 212 | + |
| 213 | + String readByteBufferSize = properties.get(prefix + NIO_READ_BYTE_BUFFER_SIZE); |
| 214 | + if (readByteBufferSize != null) { |
| 215 | + nioParams.setReadByteBufferSize(Integer.valueOf(readByteBufferSize)); |
| 216 | + } |
| 217 | + String writeByteBufferSize = properties.get(prefix + NIO_WRITE_BYTE_BUFFER_SIZE); |
| 218 | + if (writeByteBufferSize != null) { |
| 219 | + nioParams.setWriteByteBufferSize(Integer.valueOf(writeByteBufferSize)); |
| 220 | + } |
| 221 | + String nbIoThreads = properties.get(prefix + NIO_NB_IO_THREADS); |
| 222 | + if (nbIoThreads != null) { |
| 223 | + nioParams.setNbIoThreads(Integer.valueOf(nbIoThreads)); |
| 224 | + } |
| 225 | + String writeEnqueuingTime = properties.get(prefix + NIO_WRITE_ENQUEUING_TIMEOUT_IN_MS); |
| 226 | + if (writeEnqueuingTime != null) { |
| 227 | + nioParams.setWriteEnqueuingTimeoutInMs(Integer.valueOf(writeEnqueuingTime)); |
| 228 | + } |
| 229 | + String writeQueueCapacity = properties.get(prefix + NIO_WRITE_QUEUE_CAPACITY); |
| 230 | + if (writeQueueCapacity != null) { |
| 231 | + nioParams.setWriteQueueCapacity(Integer.valueOf(writeQueueCapacity)); |
| 232 | + } |
| 233 | + cf.setNioParams(nioParams); |
| 234 | + } |
| 235 | + } |
| 236 | + |
| 237 | + public static void load(ConnectionFactory connectionFactory, String propertyFileLocation) throws IOException { |
| 238 | + load(connectionFactory, propertyFileLocation, DEFAULT_PREFIX); |
| 239 | + } |
| 240 | + |
| 241 | + public static void load(ConnectionFactory connectionFactory, Properties properties) { |
| 242 | + load(connectionFactory, (Map) properties, DEFAULT_PREFIX); |
| 243 | + } |
| 244 | + |
| 245 | + public static void load(ConnectionFactory connectionFactory, Properties properties, String prefix) { |
| 246 | + load(connectionFactory, (Map) properties, prefix); |
| 247 | + } |
| 248 | + |
| 249 | + public static void load(ConnectionFactory connectionFactory, Map<String, String> properties) { |
| 250 | + load(connectionFactory, properties, DEFAULT_PREFIX); |
| 251 | + } |
| 252 | +} |
0 commit comments