1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.RedisClusterInfoCache; 13 14 import hunt.redis.Client; 15 import hunt.redis.Exceptions; 16 import hunt.redis.HostAndPort; 17 import hunt.redis.RedisClusterHostAndPortMap; 18 import hunt.redis.Redis; 19 import hunt.redis.RedisPool; 20 import hunt.redis.RedisPoolOptions; 21 import hunt.redis.util.SafeEncoder; 22 23 import hunt.collection.ArrayList; 24 import hunt.collection.Collections; 25 import hunt.collection.HashMap; 26 import hunt.collection.List; 27 import hunt.collection.Map; 28 import hunt.logging.ConsoleLogger; 29 30 import hunt.util.pool; 31 32 import hunt.Byte; 33 import hunt.Long; 34 import hunt.Integer; 35 import hunt.String; 36 37 import core.sync.mutex; 38 import core.sync.condition; 39 import std.conv; 40 41 42 /** 43 * 44 */ 45 class RedisClusterInfoCache { 46 private Map!(string, RedisPool) nodes; 47 private Map!(int, RedisPool) slots; 48 49 private Mutex mutex; 50 private bool rediscovering; 51 private RedisPoolOptions poolConfig; 52 53 // private SSLSocketFactory sslSocketFactory; 54 // private SSLParameters sslParameters; 55 // private HostnameVerifier hostnameVerifier; 56 private RedisClusterHostAndPortMap hostAndPortMap; 57 58 private enum int MASTER_NODE_INDEX = 2; 59 60 this(RedisPoolOptions poolConfig) { 61 62 nodes = new HashMap!(string, RedisPool)(); 63 slots = new HashMap!(int, RedisPool)(); 64 mutex = new Mutex(); 65 66 this.poolConfig = poolConfig; 67 this.hostAndPortMap = null; 68 } 69 70 void discoverClusterNodesAndSlots(Redis redis) { 71 mutex.lock(); 72 scope(exit) mutex.unlock(); 73 74 try { 75 reset(); 76 List!(Object) slots = redis.clusterSlots(); 77 78 foreach(Object slotInfoObj ; slots) { 79 List!(Object) slotInfo = cast(List!(Object)) slotInfoObj; 80 81 if (slotInfo.size() <= MASTER_NODE_INDEX) { 82 continue; 83 } 84 85 List!(int) slotNums = getAssignedSlotArray(slotInfo); 86 87 // hostInfos 88 int size = slotInfo.size(); 89 for (int i = MASTER_NODE_INDEX; i < size; i++) { 90 List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(i); 91 if (hostInfos.size() <= 0) { 92 continue; 93 } 94 95 HostAndPort targetNode = generateHostAndPort(hostInfos); 96 setupNodeIfNotExist(targetNode); 97 if (i == MASTER_NODE_INDEX) { 98 assignSlotsToNode(slotNums, targetNode); 99 } 100 } 101 } 102 } catch(Throwable ex) { 103 debug warning(ex.msg); 104 version(HUNT_DEBUG) warning(ex); 105 } 106 } 107 108 void renewClusterSlots(Redis redis) { 109 //If rediscovering is already in process - no need to start one more same rediscovering, just return 110 if (!rediscovering) { 111 mutex.lock(); 112 scope(exit) mutex.unlock(); 113 114 try { 115 if (!rediscovering) { 116 rediscovering = true; 117 118 try { 119 if (redis !is null) { 120 try { 121 discoverClusterSlots(redis); 122 return; 123 } catch (RedisException ex) { 124 //try nodes from all pools 125 debug warning(ex.msg); 126 version(HUNT_REDIS_DEBUG) warning(ex); 127 } 128 } 129 130 foreach(RedisPool jp ; getShuffledNodesPool()) { 131 Redis j = null; 132 try { 133 j = jp.borrow(); 134 discoverClusterSlots(j); 135 return; 136 } catch (RedisConnectionException ex) { 137 // try next nodes 138 debug warning(ex.msg); 139 version(HUNT_REDIS_DEBUG) warning(ex); 140 } finally { 141 if (j !is null) { 142 j.close(); 143 } 144 } 145 } 146 } finally { 147 rediscovering = false; 148 } 149 } 150 } catch(Exception ex) { 151 debug warning(ex.msg); 152 version(HUNT_REDIS_DEBUG) warning(ex); 153 } 154 } 155 } 156 157 private void discoverClusterSlots(Redis redis) { 158 List!(Object) slots = redis.clusterSlots(); 159 this.slots.clear(); 160 161 foreach(Object slotInfoObj ; slots) { 162 List!(Object) slotInfo = cast(List!(Object)) slotInfoObj; 163 164 if (slotInfo.size() <= MASTER_NODE_INDEX) { 165 continue; 166 } 167 168 List!(int) slotNums = getAssignedSlotArray(slotInfo); 169 170 // hostInfos 171 List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(MASTER_NODE_INDEX); 172 if (hostInfos.isEmpty()) { 173 continue; 174 } 175 176 // at this time, we just use master, discard slave information 177 HostAndPort targetNode = generateHostAndPort(hostInfos); 178 assignSlotsToNode(slotNums, targetNode); 179 } 180 } 181 182 private HostAndPort generateHostAndPort(List!(Object) hostInfos) { 183 Object info = hostInfos.get(0); 184 185 Bytes infoBytes = cast(Bytes)info; 186 if(infoBytes is null) { 187 warningf("wrong cast: from %s to %s", typeid(info), typeid(Bytes)); 188 } 189 string host = cast(string)infoBytes.value(); 190 int port = (cast(Long) hostInfos.get(1)).intValue(); 191 if (poolConfig.ssl && hostAndPortMap !is null) { 192 HostAndPort hostAndPort = hostAndPortMap.getSSLHostAndPort(host, port); 193 if (hostAndPort !is null) { 194 return hostAndPort; 195 } 196 } 197 version(HUNT_REDIS_DEBUG) tracef("%s:%d", host, port); 198 return new HostAndPort(host, port); 199 } 200 201 RedisPool setupNodeIfNotExist(HostAndPort node) { 202 mutex.lock(); 203 try { 204 string nodeKey = getNodeKey(node); 205 RedisPool existingPool = nodes.get(nodeKey); 206 if (existingPool !is null) return existingPool; 207 208 209 RedisPoolOptions config = new RedisPoolOptions(poolConfig); 210 config.host = node.getHost(); 211 config.port = node.getPort; 212 213 RedisPool nodePool = new RedisFactory(config).pool(); 214 215 nodes.put(nodeKey, nodePool); 216 return nodePool; 217 } finally { 218 mutex.unlock(); 219 } 220 } 221 222 void assignSlotToNode(int slot, HostAndPort targetNode) { 223 mutex.lock(); 224 try { 225 RedisPool targetPool = setupNodeIfNotExist(targetNode); 226 slots.put(slot, targetPool); 227 } finally { 228 mutex.unlock(); 229 } 230 } 231 232 void assignSlotsToNode(List!(int) targetSlots, HostAndPort targetNode) { 233 mutex.lock(); 234 try { 235 RedisPool targetPool = setupNodeIfNotExist(targetNode); 236 foreach(int slot ; targetSlots) { 237 slots.put(slot, targetPool); 238 } 239 } finally { 240 mutex.unlock(); 241 } 242 } 243 244 RedisPool getNode(string nodeKey) { 245 mutex.lock(); 246 try { 247 return nodes.get(nodeKey); 248 } finally { 249 mutex.unlock(); 250 } 251 } 252 253 RedisPool getSlotPool(int slot) { 254 mutex.lock(); 255 try { 256 return slots.get(slot); 257 } finally { 258 mutex.unlock(); 259 } 260 } 261 262 Map!(string, RedisPool) getNodes() { 263 mutex.lock(); 264 try { 265 return new HashMap!(string, RedisPool)(nodes); 266 } finally { 267 mutex.unlock(); 268 } 269 } 270 271 List!(RedisPool) getShuffledNodesPool() { 272 mutex.lock(); 273 try { 274 List!(RedisPool) pools = new ArrayList!(RedisPool)(nodes.values()); 275 // TODO: Tasks pending completion -@zxp at 7/17/2019, 10:01:18 AM 276 // 277 // Collections.shuffle(pools); 278 return pools; 279 } finally { 280 mutex.unlock(); 281 } 282 } 283 284 /** 285 * Clear discovered nodes collections and gently release allocated resources 286 */ 287 void reset() { 288 mutex.lock(); 289 scope(exit) { 290 mutex.unlock(); 291 } 292 293 doReset(); 294 } 295 296 private void doReset() { 297 foreach(RedisPool pool ; nodes.values()) { 298 try { 299 if (pool !is null) { 300 pool.destroy(); 301 } 302 } catch (Exception ex) { 303 // pass 304 debug warning(ex.msg); 305 version(HUNT_REDIS_DEBUG) warning(ex); 306 } 307 } 308 nodes.clear(); 309 slots.clear(); 310 } 311 312 static string getNodeKey(HostAndPort hnp) { 313 return hnp.getHost() ~ ":" ~ hnp.getPort().to!string(); 314 } 315 316 static string getNodeKey(Client client) { 317 return client.getHost() ~ ":" ~ client.getPort().to!string(); 318 } 319 320 static string getNodeKey(Redis redis) { 321 return getNodeKey(redis.getClient()); 322 } 323 324 private List!(int) getAssignedSlotArray(List!(Object) slotInfo) { 325 List!(int) slotNums = new ArrayList!(int)(); 326 for (int slot = (cast(Long) slotInfo.get(0)).intValue(); slot <= (cast(Long) slotInfo.get(1)) 327 .intValue(); slot++) { 328 slotNums.add(slot); 329 } 330 return slotNums; 331 } 332 }