1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.RedisSentinelPool; 13 14 import hunt.redis.Exceptions; 15 import hunt.redis.HostAndPort; 16 import hunt.redis.Protocol; 17 import hunt.redis.Redis; 18 import hunt.redis.RedisFactory; 19 import hunt.redis.RedisPoolAbstract; 20 import hunt.redis.RedisPubSub; 21 22 import hunt.Exceptions; 23 import hunt.collection.HashSet; 24 import hunt.collection.List; 25 import hunt.collection.Set; 26 import hunt.concurrency.thread; 27 import hunt.logging.ConsoleLogger; 28 import hunt.pool.impl.GenericObjectPoolConfig; 29 import hunt.util.ArrayHelper; 30 31 import core.atomic; 32 import core.thread; 33 import core.time; 34 35 import std.algorithm; 36 import std.conv; 37 import std.format; 38 import std.string; 39 40 /** 41 */ 42 // class RedisSentinelPool : RedisPoolAbstract { 43 44 // protected GenericObjectPoolConfig poolConfig; 45 46 // protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT; 47 // protected int soTimeout = Protocol.DEFAULT_TIMEOUT; 48 49 // protected string password; 50 51 // protected int database = Protocol.DEFAULT_DATABASE; 52 53 // protected string clientName; 54 55 // protected Set!(MasterListener) masterListeners; 56 57 // private RedisFactory factory; 58 // private HostAndPort currentHostMaster; 59 60 // private Object initPoolLock; 61 62 // this(string masterName, Set!(string) sentinels, 63 // GenericObjectPoolConfig poolConfig) { 64 // this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, 65 // Protocol.DEFAULT_DATABASE); 66 // } 67 68 // this(string masterName, Set!(string) sentinels) { 69 // this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null, 70 // Protocol.DEFAULT_DATABASE); 71 // } 72 73 // this(string masterName, Set!(string) sentinels, string password) { 74 // this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password); 75 // } 76 77 // this(string masterName, Set!(string) sentinels, 78 // GenericObjectPoolConfig poolConfig, int timeout, string password) { 79 // this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); 80 // } 81 82 // this(string masterName, Set!(string) sentinels, 83 // GenericObjectPoolConfig poolConfig, int timeout) { 84 // this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); 85 // } 86 87 // this(string masterName, Set!(string) sentinels, 88 // GenericObjectPoolConfig poolConfig, string password) { 89 // this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); 90 // } 91 92 // this(string masterName, Set!(string) sentinels, 93 // GenericObjectPoolConfig poolConfig, int timeout, string password, 94 // int database) { 95 // this(masterName, sentinels, poolConfig, timeout, timeout, password, database); 96 // } 97 98 // this(string masterName, Set!(string) sentinels, 99 // GenericObjectPoolConfig poolConfig, int timeout, string password, 100 // int database, string clientName) { 101 // this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName); 102 // } 103 104 // this(string masterName, Set!(string) sentinels, 105 // GenericObjectPoolConfig poolConfig, int timeout, int soTimeout, 106 // string password, int database) { 107 // this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null); 108 // } 109 110 // this(string masterName, Set!(string) sentinels, 111 // GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, 112 // string password, int database, string clientName) { 113 114 // masterListeners = new HashSet!(MasterListener)(); 115 // initPoolLock = new Object(); 116 // this.poolConfig = poolConfig; 117 // this.connectionTimeout = connectionTimeout; 118 // this.soTimeout = soTimeout; 119 // this.password = password; 120 // this.database = database; 121 // this.clientName = clientName; 122 123 // HostAndPort master = initSentinels(sentinels, masterName); 124 // initPool(master); 125 // } 126 127 // override void destroy() { 128 // foreach(MasterListener m ; masterListeners) { 129 // m.shutdown(); 130 // } 131 132 // super.destroy(); 133 // } 134 135 // HostAndPort getCurrentHostMaster() { 136 // return currentHostMaster; 137 // } 138 139 // private void initPool(HostAndPort master) { 140 // synchronized(initPoolLock){ 141 // if (master != currentHostMaster) { 142 // currentHostMaster = master; 143 // if (factory is null) { 144 // factory = new RedisFactory(master.getHost(), master.getPort(), connectionTimeout, 145 // soTimeout, password, database, clientName); 146 // initPool(poolConfig, factory); 147 // } else { 148 // factory.setHostAndPort(currentHostMaster); 149 // // although we clear the pool, we still have to check the 150 // // returned object 151 // // in getResource, this call only clears idle instances, not 152 // // borrowed instances 153 // internalPool.clear(); 154 // } 155 156 // info("Created RedisPool to master at " ~ master.toString()); 157 // } 158 // } 159 // } 160 161 // private HostAndPort initSentinels(Set!(string) sentinels, string masterName) { 162 163 // HostAndPort master = null; 164 // bool sentinelAvailable = false; 165 166 // info("Trying to find master from available[] Sentinels..."); 167 168 // foreach(string sentinel ; sentinels) { 169 // HostAndPort hap = HostAndPort.parseString(sentinel); 170 171 // tracef("Connecting to Sentinel %s", hap); 172 173 // Redis redis = null; 174 // try { 175 // redis = new Redis(hap); 176 177 // List!(string) masterAddr = redis.sentinelGetMasterAddrByName(masterName); 178 179 // // connected to[] sentinel... 180 // sentinelAvailable = true; 181 182 // if (masterAddr is null || masterAddr.size() != 2) { 183 // warningf("Can not get master addr, master name: %s. Sentinel: %s", masterName, hap); 184 // continue; 185 // } 186 187 // master = toHostAndPort(masterAddr); 188 // tracef("Found Redis master at %s", master); 189 // break; 190 // } catch (RedisException e) { 191 // // resolves #1036, it should handle RedisException there's another chance 192 // // of raising RedisDataException 193 // warningf( 194 // "Cannot get master address from sentinel running @ %s. Reason: %s. Trying next one.", hap, 195 // e.toString()); 196 // } finally { 197 // if (redis !is null) { 198 // redis.close(); 199 // } 200 // } 201 // } 202 203 // if (master is null) { 204 // if (sentinelAvailable) { 205 // // can connect to sentinel, but master name seems to not 206 // // monitored 207 // throw new RedisException("Can connect to sentinel, but " ~ masterName 208 // ~ " seems to be not[] monitored..."); 209 // } else { 210 // throw new RedisConnectionException("All sentinels down, cannot determine where is " 211 // ~ masterName ~ " master is[] running..."); 212 // } 213 // } 214 215 // info("Redis master running at " ~ master.toString() ~ ", starting Sentinel[] listeners..."); 216 217 // foreach(string sentinel ; sentinels) { 218 // HostAndPort hap = HostAndPort.parseString(sentinel); 219 // MasterListener masterListener = new this(masterName, hap.getHost(), hap.getPort()); 220 // // whether MasterListener threads are alive or not, process can be stopped 221 // masterListener.setDaemon(true); 222 // masterListeners.add(masterListener); 223 // masterListener.start(); 224 // } 225 226 // return master; 227 // } 228 229 // private HostAndPort toHostAndPort(string[] getMasterAddrByNameResult) { 230 // string host = getMasterAddrByNameResult[0]; 231 // int port = to!int(getMasterAddrByNameResult[1]); 232 233 // return new HostAndPort(host, port); 234 // } 235 236 // private HostAndPort toHostAndPort(List!(string) getMasterAddrByNameResult) { 237 // string host = getMasterAddrByNameResult.get(0); 238 // int port = to!int(getMasterAddrByNameResult.get(1)); 239 240 // return new HostAndPort(host, port); 241 // } 242 243 // override 244 // Redis getResource() { 245 // while (true) { 246 // Redis redis = super.getResource(); 247 // redis.setDataSource(this); 248 249 // // get a reference because it can change concurrently 250 // HostAndPort master = currentHostMaster; 251 // HostAndPort connection = new HostAndPort(redis.getClient().getHost(), redis.getClient() 252 // .getPort()); 253 254 // if (master == connection) { 255 // // connected to the correct master 256 // return redis; 257 // } else { 258 // returnBrokenResource(redis); 259 // } 260 // } 261 // } 262 263 // override 264 // protected void returnBrokenResource(Redis resource) { 265 // if (resource !is null) { 266 // returnBrokenResourceObject(resource); 267 // } 268 // } 269 270 // override 271 // protected void returnResource(Redis resource) { 272 // if (resource !is null) { 273 // resource.resetState(); 274 // returnResourceObject(resource); 275 // } 276 // } 277 278 // protected class MasterListener : ThreadEx { 279 280 // protected string masterName; 281 // protected string host; 282 // protected int port; 283 // protected long subscribeRetryWaitTimeMillis = 5000; 284 // protected Redis j; 285 // protected shared bool running = false; 286 287 // protected this() { 288 // } 289 290 // this(string masterName, string host, int port) { 291 // super(format("MasterListener-%s-[%s:%d]", masterName, host, port)); 292 // this.masterName = masterName; 293 // this.host = host; 294 // this.port = port; 295 // } 296 297 // this(string masterName, string host, int port, 298 // long subscribeRetryWaitTimeMillis) { 299 // this(masterName, host, port); 300 // this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; 301 // } 302 303 // override 304 // void run() { 305 306 // running = true; 307 308 // while (running) { 309 310 // j = new Redis(host, port); 311 312 // try { 313 // // double check that it is not being shutdown 314 // if (!running) { 315 // break; 316 // } 317 318 // /* 319 // * Added code for active refresh 320 // */ 321 // List!(string) masterAddr = j.sentinelGetMasterAddrByName(masterName); 322 // if (masterAddr is null || masterAddr.size() != 2) { 323 // warningf("Can not get master addr, master name: %s. Sentinel: %s:%s.",masterName,host,port); 324 // }else{ 325 // initPool(toHostAndPort(masterAddr)); 326 // } 327 328 // j.subscribe(new class RedisPubSub { 329 // override 330 // void onMessage(string channel, string message) { 331 // tracef("Sentinel %s:%s published: %s.", host, port, message); 332 333 // string[] switchMasterMsg = message.split(" "); 334 335 // if (switchMasterMsg.length > 3) { 336 337 // if (masterName == switchMasterMsg[0]) { 338 // initPool(toHostAndPort([switchMasterMsg[3], switchMasterMsg[4]])); 339 // } else { 340 // tracef( 341 // "Ignoring message on +switch-master for master name %s, our master name is %s", 342 // switchMasterMsg[0], masterName); 343 // } 344 345 // } else { 346 // errorf( 347 // "Invalid message received on Sentinel %s:%s on channel +switch-master: %s", host, 348 // port, message); 349 // } 350 // } 351 // }, "+switch-master"); 352 353 // } catch (RedisException e) { 354 355 // if (running) { 356 // errorf("Lost connection to Sentinel at %s:%s. Sleeping 5000ms and retrying.", host, 357 // port, e); 358 // try { 359 // Thread.sleep(subscribeRetryWaitTimeMillis.msecs); 360 // } catch (InterruptedException e1) { 361 // errorf("Sleep interrupted: ", e1); 362 // } 363 // } else { 364 // tracef("Unsubscribing from Sentinel at %s:%s", host, port); 365 // } 366 // } finally { 367 // j.close(); 368 // } 369 // } 370 // } 371 372 // void shutdown() { 373 // try { 374 // tracef("Shutting down listener on %s:%s", host, port); 375 // running = false; 376 // // This isn't good, the Redis object is not thread safe 377 // if (j !is null) { 378 // j.disconnect(); 379 // } 380 // } catch (Exception e) { 381 // errorf("Caught exception while shutting down: ", e); 382 // } 383 // } 384 // } 385 // }