1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.RedisClusterInfoCache; 13 14 import hunt.redis.Client; 15 import hunt.redis.Exceptions; 16 import hunt.redis.HostAndPort; 17 import hunt.redis.RedisClusterHostAndPortMap; 18 import hunt.redis.Redis; 19 import hunt.redis.RedisPool; 20 import hunt.redis.util.SafeEncoder; 21 22 import hunt.collection.ArrayList; 23 import hunt.collection.Collections; 24 import hunt.collection.HashMap; 25 import hunt.collection.List; 26 import hunt.collection.Map; 27 import hunt.logging.ConsoleLogger; 28 import hunt.pool.impl.GenericObjectPoolConfig; 29 30 import hunt.Byte; 31 import hunt.Long; 32 import hunt.Integer; 33 import hunt.String; 34 35 import core.sync.mutex; 36 import core.sync.condition; 37 import std.conv; 38 39 class RedisClusterInfoCache { 40 private Map!(string, RedisPool) nodes; 41 private Map!(int, RedisPool) slots; 42 43 private Mutex mutex; 44 // private Object.Monitor r; 45 // private Object.Monitor w; 46 private bool rediscovering; 47 private GenericObjectPoolConfig poolConfig; 48 49 private int connectionTimeout; 50 private int soTimeout; 51 private string password; 52 private string clientName; 53 54 private bool ssl; 55 // private SSLSocketFactory sslSocketFactory; 56 // private SSLParameters sslParameters; 57 // private HostnameVerifier hostnameVerifier; 58 private RedisClusterHostAndPortMap hostAndPortMap; 59 60 private enum int MASTER_NODE_INDEX = 2; 61 62 this(GenericObjectPoolConfig poolConfig, int timeout) { 63 this(poolConfig, timeout, timeout, null, null); 64 } 65 66 this(GenericObjectPoolConfig poolConfig, 67 int connectionTimeout, int soTimeout, string password, string clientName) { 68 // this(poolConfig, connectionTimeout, soTimeout, password, clientName, false, null, null, null, null); 69 70 nodes = new HashMap!(string, RedisPool)(); 71 slots = new HashMap!(int, RedisPool)(); 72 mutex = new Mutex(); 73 74 this.poolConfig = poolConfig; 75 this.connectionTimeout = connectionTimeout; 76 this.soTimeout = soTimeout; 77 this.password = password; 78 this.clientName = clientName; 79 this.ssl = false; 80 // this.sslSocketFactory = sslSocketFactory; 81 // this.sslParameters = sslParameters; 82 // this.hostnameVerifier = hostnameVerifier; 83 this.hostAndPortMap = null; 84 } 85 86 // this(GenericObjectPoolConfig poolConfig, 87 // int connectionTimeout, int soTimeout, string password, string clientName, 88 // bool ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, 89 // HostnameVerifier hostnameVerifier, RedisClusterHostAndPortMap hostAndPortMap) { 90 // this.poolConfig = poolConfig; 91 // this.connectionTimeout = connectionTimeout; 92 // this.soTimeout = soTimeout; 93 // this.password = password; 94 // this.clientName = clientName; 95 // this.ssl = ssl; 96 // this.sslSocketFactory = sslSocketFactory; 97 // this.sslParameters = sslParameters; 98 // this.hostnameVerifier = hostnameVerifier; 99 // this.hostAndPortMap = hostAndPortMap; 100 // } 101 102 void discoverClusterNodesAndSlots(Redis redis) { 103 mutex.lock(); 104 scope(exit) mutex.unlock(); 105 106 try { 107 reset(); 108 List!(Object) slots = redis.clusterSlots(); 109 110 foreach(Object slotInfoObj ; slots) { 111 List!(Object) slotInfo = cast(List!(Object)) slotInfoObj; 112 113 if (slotInfo.size() <= MASTER_NODE_INDEX) { 114 continue; 115 } 116 117 List!(int) slotNums = getAssignedSlotArray(slotInfo); 118 119 // hostInfos 120 int size = slotInfo.size(); 121 for (int i = MASTER_NODE_INDEX; i < size; i++) { 122 List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(i); 123 if (hostInfos.size() <= 0) { 124 continue; 125 } 126 127 HostAndPort targetNode = generateHostAndPort(hostInfos); 128 setupNodeIfNotExist(targetNode); 129 if (i == MASTER_NODE_INDEX) { 130 assignSlotsToNode(slotNums, targetNode); 131 } 132 } 133 } 134 } catch(Throwable ex) { 135 debug warning(ex.msg); 136 version(HUNT_DEBUG) warning(ex); 137 } 138 } 139 140 void renewClusterSlots(Redis redis) { 141 //If rediscovering is already in process - no need to start one more same rediscovering, just return 142 if (!rediscovering) { 143 mutex.lock(); 144 scope(exit) mutex.unlock(); 145 146 try { 147 if (!rediscovering) { 148 rediscovering = true; 149 150 try { 151 if (redis !is null) { 152 try { 153 discoverClusterSlots(redis); 154 return; 155 } catch (RedisException ex) { 156 //try nodes from all pools 157 debug warning(ex.msg); 158 version(HUNT_REDIS_DEBUG) warning(ex); 159 } 160 } 161 162 foreach(RedisPool jp ; getShuffledNodesPool()) { 163 Redis j = null; 164 try { 165 j = jp.getResource(); 166 discoverClusterSlots(j); 167 return; 168 } catch (RedisConnectionException ex) { 169 // try next nodes 170 debug warning(ex.msg); 171 version(HUNT_REDIS_DEBUG) warning(ex); 172 } finally { 173 if (j !is null) { 174 j.close(); 175 } 176 } 177 } 178 } finally { 179 rediscovering = false; 180 } 181 } 182 } catch(Exception ex) { 183 debug warning(ex.msg); 184 version(HUNT_REDIS_DEBUG) warning(ex); 185 } 186 } 187 } 188 189 private void discoverClusterSlots(Redis redis) { 190 List!(Object) slots = redis.clusterSlots(); 191 this.slots.clear(); 192 193 foreach(Object slotInfoObj ; slots) { 194 List!(Object) slotInfo = cast(List!(Object)) slotInfoObj; 195 196 if (slotInfo.size() <= MASTER_NODE_INDEX) { 197 continue; 198 } 199 200 List!(int) slotNums = getAssignedSlotArray(slotInfo); 201 202 // hostInfos 203 List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(MASTER_NODE_INDEX); 204 if (hostInfos.isEmpty()) { 205 continue; 206 } 207 208 // at this time, we just use master, discard slave information 209 HostAndPort targetNode = generateHostAndPort(hostInfos); 210 assignSlotsToNode(slotNums, targetNode); 211 } 212 } 213 214 private HostAndPort generateHostAndPort(List!(Object) hostInfos) { 215 Object info = hostInfos.get(0); 216 217 Bytes infoBytes = cast(Bytes)info; 218 if(infoBytes is null) { 219 warningf("wrong cast: from %s to %s", typeid(info), typeid(Bytes)); 220 } 221 string host = cast(string)infoBytes.value(); 222 int port = (cast(Long) hostInfos.get(1)).intValue(); 223 if (ssl && hostAndPortMap !is null) { 224 HostAndPort hostAndPort = hostAndPortMap.getSSLHostAndPort(host, port); 225 if (hostAndPort !is null) { 226 return hostAndPort; 227 } 228 } 229 version(HUNT_REDIS_DEBUG) tracef("%s:%d", host, port); 230 return new HostAndPort(host, port); 231 } 232 233 RedisPool setupNodeIfNotExist(HostAndPort node) { 234 mutex.lock(); 235 try { 236 string nodeKey = getNodeKey(node); 237 RedisPool existingPool = nodes.get(nodeKey); 238 if (existingPool !is null) return existingPool; 239 240 // RedisPool nodePool = new RedisPool(poolConfig, node.getHost(), node.getPort(), 241 // connectionTimeout, soTimeout, password, 0, clientName, 242 // ssl, sslSocketFactory, sslParameters, hostnameVerifier); 243 RedisPool nodePool = new RedisPool(poolConfig, node.getHost(), node.getPort(), 244 connectionTimeout, soTimeout, password, 0, clientName, 245 ssl); 246 nodes.put(nodeKey, nodePool); 247 return nodePool; 248 } finally { 249 mutex.unlock(); 250 } 251 } 252 253 void assignSlotToNode(int slot, HostAndPort targetNode) { 254 mutex.lock(); 255 try { 256 RedisPool targetPool = setupNodeIfNotExist(targetNode); 257 slots.put(slot, targetPool); 258 } finally { 259 mutex.unlock(); 260 } 261 } 262 263 void assignSlotsToNode(List!(int) targetSlots, HostAndPort targetNode) { 264 mutex.lock(); 265 try { 266 RedisPool targetPool = setupNodeIfNotExist(targetNode); 267 foreach(int slot ; targetSlots) { 268 slots.put(slot, targetPool); 269 } 270 } finally { 271 mutex.unlock(); 272 } 273 } 274 275 RedisPool getNode(string nodeKey) { 276 mutex.lock(); 277 try { 278 return nodes.get(nodeKey); 279 } finally { 280 mutex.unlock(); 281 } 282 } 283 284 RedisPool getSlotPool(int slot) { 285 mutex.lock(); 286 try { 287 return slots.get(slot); 288 } finally { 289 mutex.unlock(); 290 } 291 } 292 293 Map!(string, RedisPool) getNodes() { 294 mutex.lock(); 295 try { 296 return new HashMap!(string, RedisPool)(nodes); 297 } finally { 298 mutex.unlock(); 299 } 300 } 301 302 List!(RedisPool) getShuffledNodesPool() { 303 mutex.lock(); 304 try { 305 List!(RedisPool) pools = new ArrayList!(RedisPool)(nodes.values()); 306 // TODO: Tasks pending completion -@zxp at 7/17/2019, 10:01:18 AM 307 // 308 // Collections.shuffle(pools); 309 return pools; 310 } finally { 311 mutex.unlock(); 312 } 313 } 314 315 /** 316 * Clear discovered nodes collections and gently release allocated resources 317 */ 318 void reset() { 319 mutex.lock(); 320 scope(exit) { 321 mutex.unlock(); 322 } 323 324 doReset(); 325 } 326 327 private void doReset() { 328 foreach(RedisPool pool ; nodes.values()) { 329 try { 330 if (pool !is null) { 331 pool.destroy(); 332 } 333 } catch (Exception ex) { 334 // pass 335 debug warning(ex.msg); 336 version(HUNT_REDIS_DEBUG) warning(ex); 337 } 338 } 339 nodes.clear(); 340 slots.clear(); 341 } 342 343 static string getNodeKey(HostAndPort hnp) { 344 return hnp.getHost() ~ ":" ~ hnp.getPort().to!string(); 345 } 346 347 static string getNodeKey(Client client) { 348 return client.getHost() ~ ":" ~ client.getPort().to!string(); 349 } 350 351 static string getNodeKey(Redis redis) { 352 return getNodeKey(redis.getClient()); 353 } 354 355 private List!(int) getAssignedSlotArray(List!(Object) slotInfo) { 356 List!(int) slotNums = new ArrayList!(int)(); 357 for (int slot = (cast(Long) slotInfo.get(0)).intValue(); slot <= (cast(Long) slotInfo.get(1)) 358 .intValue(); slot++) { 359 slotNums.add(slot); 360 } 361 return slotNums; 362 } 363 }