1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.RedisSentinelPool; 13 14 import hunt.redis.Exceptions; 15 import hunt.redis.HostAndPort; 16 import hunt.redis.Protocol; 17 import hunt.redis.Redis; 18 import hunt.redis.RedisPubSub; 19 20 import hunt.Exceptions; 21 import hunt.collection.HashSet; 22 import hunt.collection.List; 23 import hunt.collection.Set; 24 import hunt.concurrency.thread; 25 import hunt.logging.ConsoleLogger; 26 import hunt.util.pool; 27 import hunt.util.ArrayHelper; 28 29 import core.atomic; 30 import core.thread; 31 import core.time; 32 33 import std.algorithm; 34 import std.conv; 35 import std.format; 36 import std.string; 37 38 /** 39 */ 40 // class RedisSentinelPool : RedisPoolAbstract { 41 42 // protected GenericObjectPoolConfig poolConfig; 43 44 // protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT; 45 // protected int soTimeout = Protocol.DEFAULT_TIMEOUT; 46 47 // protected string password; 48 49 // protected int database = Protocol.DEFAULT_DATABASE; 50 51 // protected string clientName; 52 53 // protected Set!(MasterListener) masterListeners; 54 55 // private RedisFactory factory; 56 // private HostAndPort currentHostMaster; 57 58 // private Object initPoolLock; 59 60 // this(string masterName, Set!(string) sentinels, 61 // GenericObjectPoolConfig poolConfig) { 62 // this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, 63 // Protocol.DEFAULT_DATABASE); 64 // } 65 66 // this(string masterName, Set!(string) sentinels) { 67 // this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null, 68 // Protocol.DEFAULT_DATABASE); 69 // } 70 71 // this(string masterName, Set!(string) sentinels, string password) { 72 // this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password); 73 // } 74 75 // this(string masterName, Set!(string) sentinels, 76 // GenericObjectPoolConfig poolConfig, int timeout, string password) { 77 // this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); 78 // } 79 80 // this(string masterName, Set!(string) sentinels, 81 // GenericObjectPoolConfig poolConfig, int timeout) { 82 // this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); 83 // } 84 85 // this(string masterName, Set!(string) sentinels, 86 // GenericObjectPoolConfig poolConfig, string password) { 87 // this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); 88 // } 89 90 // this(string masterName, Set!(string) sentinels, 91 // GenericObjectPoolConfig poolConfig, int timeout, string password, 92 // int database) { 93 // this(masterName, sentinels, poolConfig, timeout, timeout, password, database); 94 // } 95 96 // this(string masterName, Set!(string) sentinels, 97 // GenericObjectPoolConfig poolConfig, int timeout, string password, 98 // int database, string clientName) { 99 // this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName); 100 // } 101 102 // this(string masterName, Set!(string) sentinels, 103 // GenericObjectPoolConfig poolConfig, int timeout, int soTimeout, 104 // string password, int database) { 105 // this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null); 106 // } 107 108 // this(string masterName, Set!(string) sentinels, 109 // GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, 110 // string password, int database, string clientName) { 111 112 // masterListeners = new HashSet!(MasterListener)(); 113 // initPoolLock = new Object(); 114 // this.poolConfig = poolConfig; 115 // this.connectionTimeout = connectionTimeout; 116 // this.soTimeout = soTimeout; 117 // this.password = password; 118 // this.database = database; 119 // this.clientName = clientName; 120 121 // HostAndPort master = initSentinels(sentinels, masterName); 122 // initPool(master); 123 // } 124 125 // override void destroy() { 126 // foreach(MasterListener m ; masterListeners) { 127 // m.shutdown(); 128 // } 129 130 // super.destroy(); 131 // } 132 133 // HostAndPort getCurrentHostMaster() { 134 // return currentHostMaster; 135 // } 136 137 // private void initPool(HostAndPort master) { 138 // synchronized(initPoolLock){ 139 // if (master != currentHostMaster) { 140 // currentHostMaster = master; 141 // if (factory is null) { 142 // factory = new RedisFactory(master.getHost(), master.getPort(), connectionTimeout, 143 // soTimeout, password, database, clientName); 144 // initPool(poolConfig, factory); 145 // } else { 146 // factory.setHostAndPort(currentHostMaster); 147 // // although we clear the pool, we still have to check the 148 // // returned object 149 // // in getResource, this call only clears idle instances, not 150 // // borrowed instances 151 // internalPool.clear(); 152 // } 153 154 // info("Created RedisPool to master at " ~ master.toString()); 155 // } 156 // } 157 // } 158 159 // private HostAndPort initSentinels(Set!(string) sentinels, string masterName) { 160 161 // HostAndPort master = null; 162 // bool sentinelAvailable = false; 163 164 // info("Trying to find master from available[] Sentinels..."); 165 166 // foreach(string sentinel ; sentinels) { 167 // HostAndPort hap = HostAndPort.parseString(sentinel); 168 169 // tracef("Connecting to Sentinel %s", hap); 170 171 // Redis redis = null; 172 // try { 173 // redis = new Redis(hap); 174 175 // List!(string) masterAddr = redis.sentinelGetMasterAddrByName(masterName); 176 177 // // connected to[] sentinel... 178 // sentinelAvailable = true; 179 180 // if (masterAddr is null || masterAddr.size() != 2) { 181 // warningf("Can not get master addr, master name: %s. Sentinel: %s", masterName, hap); 182 // continue; 183 // } 184 185 // master = toHostAndPort(masterAddr); 186 // tracef("Found Redis master at %s", master); 187 // break; 188 // } catch (RedisException e) { 189 // // resolves #1036, it should handle RedisException there's another chance 190 // // of raising RedisDataException 191 // warningf( 192 // "Cannot get master address from sentinel running @ %s. Reason: %s. Trying next one.", hap, 193 // e.toString()); 194 // } finally { 195 // if (redis !is null) { 196 // redis.close(); 197 // } 198 // } 199 // } 200 201 // if (master is null) { 202 // if (sentinelAvailable) { 203 // // can connect to sentinel, but master name seems to not 204 // // monitored 205 // throw new RedisException("Can connect to sentinel, but " ~ masterName 206 // ~ " seems to be not[] monitored..."); 207 // } else { 208 // throw new RedisConnectionException("All sentinels down, cannot determine where is " 209 // ~ masterName ~ " master is[] running..."); 210 // } 211 // } 212 213 // info("Redis master running at " ~ master.toString() ~ ", starting Sentinel[] listeners..."); 214 215 // foreach(string sentinel ; sentinels) { 216 // HostAndPort hap = HostAndPort.parseString(sentinel); 217 // MasterListener masterListener = new this(masterName, hap.getHost(), hap.getPort()); 218 // // whether MasterListener threads are alive or not, process can be stopped 219 // masterListener.setDaemon(true); 220 // masterListeners.add(masterListener); 221 // masterListener.start(); 222 // } 223 224 // return master; 225 // } 226 227 // private HostAndPort toHostAndPort(string[] getMasterAddrByNameResult) { 228 // string host = getMasterAddrByNameResult[0]; 229 // int port = to!int(getMasterAddrByNameResult[1]); 230 231 // return new HostAndPort(host, port); 232 // } 233 234 // private HostAndPort toHostAndPort(List!(string) getMasterAddrByNameResult) { 235 // string host = getMasterAddrByNameResult.get(0); 236 // int port = to!int(getMasterAddrByNameResult.get(1)); 237 238 // return new HostAndPort(host, port); 239 // } 240 241 // override 242 // Redis getResource() { 243 // while (true) { 244 // Redis redis = super.getResource(); 245 // redis.setDataSource(this); 246 247 // // get a reference because it can change concurrently 248 // HostAndPort master = currentHostMaster; 249 // HostAndPort connection = new HostAndPort(redis.getClient().getHost(), redis.getClient() 250 // .getPort()); 251 252 // if (master == connection) { 253 // // connected to the correct master 254 // return redis; 255 // } else { 256 // returnBrokenResource(redis); 257 // } 258 // } 259 // } 260 261 // override 262 // protected void returnBrokenResource(Redis resource) { 263 // if (resource !is null) { 264 // returnBrokenResourceObject(resource); 265 // } 266 // } 267 268 // override 269 // protected void returnResource(Redis resource) { 270 // if (resource !is null) { 271 // resource.resetState(); 272 // returnResourceObject(resource); 273 // } 274 // } 275 276 // protected class MasterListener : ThreadEx { 277 278 // protected string masterName; 279 // protected string host; 280 // protected int port; 281 // protected long subscribeRetryWaitTimeMillis = 5000; 282 // protected Redis j; 283 // protected shared bool running = false; 284 285 // protected this() { 286 // } 287 288 // this(string masterName, string host, int port) { 289 // super(format("MasterListener-%s-[%s:%d]", masterName, host, port)); 290 // this.masterName = masterName; 291 // this.host = host; 292 // this.port = port; 293 // } 294 295 // this(string masterName, string host, int port, 296 // long subscribeRetryWaitTimeMillis) { 297 // this(masterName, host, port); 298 // this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; 299 // } 300 301 // override 302 // void run() { 303 304 // running = true; 305 306 // while (running) { 307 308 // j = new Redis(host, port); 309 310 // try { 311 // // double check that it is not being shutdown 312 // if (!running) { 313 // break; 314 // } 315 316 // /* 317 // * Added code for active refresh 318 // */ 319 // List!(string) masterAddr = j.sentinelGetMasterAddrByName(masterName); 320 // if (masterAddr is null || masterAddr.size() != 2) { 321 // warningf("Can not get master addr, master name: %s. Sentinel: %s:%s.",masterName,host,port); 322 // }else{ 323 // initPool(toHostAndPort(masterAddr)); 324 // } 325 326 // j.subscribe(new class RedisPubSub { 327 // override 328 // void onMessage(string channel, string message) { 329 // tracef("Sentinel %s:%s published: %s.", host, port, message); 330 331 // string[] switchMasterMsg = message.split(" "); 332 333 // if (switchMasterMsg.length > 3) { 334 335 // if (masterName == switchMasterMsg[0]) { 336 // initPool(toHostAndPort([switchMasterMsg[3], switchMasterMsg[4]])); 337 // } else { 338 // tracef( 339 // "Ignoring message on +switch-master for master name %s, our master name is %s", 340 // switchMasterMsg[0], masterName); 341 // } 342 343 // } else { 344 // errorf( 345 // "Invalid message received on Sentinel %s:%s on channel +switch-master: %s", host, 346 // port, message); 347 // } 348 // } 349 // }, "+switch-master"); 350 351 // } catch (RedisException e) { 352 353 // if (running) { 354 // errorf("Lost connection to Sentinel at %s:%s. Sleeping 5000ms and retrying.", host, 355 // port, e); 356 // try { 357 // Thread.sleep(subscribeRetryWaitTimeMillis.msecs); 358 // } catch (InterruptedException e1) { 359 // errorf("Sleep interrupted: ", e1); 360 // } 361 // } else { 362 // tracef("Unsubscribing from Sentinel at %s:%s", host, port); 363 // } 364 // } finally { 365 // j.close(); 366 // } 367 // } 368 // } 369 370 // void shutdown() { 371 // try { 372 // tracef("Shutting down listener on %s:%s", host, port); 373 // running = false; 374 // // This isn't good, the Redis object is not thread safe 375 // if (j !is null) { 376 // j.disconnect(); 377 // } 378 // } catch (Exception e) { 379 // errorf("Caught exception while shutting down: ", e); 380 // } 381 // } 382 // } 383 // }