1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.RedisSentinelPool;
13 
14 import hunt.redis.Exceptions;
15 import hunt.redis.HostAndPort;
16 import hunt.redis.Protocol;
17 import hunt.redis.Redis;
18 import hunt.redis.RedisFactory;
19 import hunt.redis.RedisPoolAbstract;
20 import hunt.redis.RedisPubSub;
21 
22 import hunt.Exceptions;
23 import hunt.collection.HashSet;
24 import hunt.collection.List;
25 import hunt.collection.Set;
26 import hunt.concurrency.thread;
27 import hunt.logging.ConsoleLogger;
28 import hunt.pool.impl.GenericObjectPoolConfig;
29 import hunt.util.ArrayHelper;
30 
31 import core.atomic;
32 import core.thread;
33 import core.time;
34 
35 import std.algorithm;
36 import std.conv;
37 import std.format;
38 import std.string;
39 
40 /**
41 */
42 // class RedisSentinelPool : RedisPoolAbstract {
43 
44 //     protected GenericObjectPoolConfig poolConfig;
45 
46 //     protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
47 //     protected int soTimeout = Protocol.DEFAULT_TIMEOUT;
48 
49 //     protected string password;
50 
51 //     protected int database = Protocol.DEFAULT_DATABASE;
52 
53 //     protected string clientName;
54 
55 //     protected Set!(MasterListener) masterListeners;
56 
57 //     private RedisFactory factory;
58 //     private HostAndPort currentHostMaster;
59     
60 //     private Object initPoolLock;
61 
62 //     this(string masterName, Set!(string) sentinels,
63 //             GenericObjectPoolConfig poolConfig) {
64 //         this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
65 //                 Protocol.DEFAULT_DATABASE);
66 //     }
67 
68 //     this(string masterName, Set!(string) sentinels) {
69 //         this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null,
70 //                 Protocol.DEFAULT_DATABASE);
71 //     }
72 
73 //     this(string masterName, Set!(string) sentinels, string password) {
74 //         this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password);
75 //     }
76 
77 //     this(string masterName, Set!(string) sentinels,
78 //             GenericObjectPoolConfig poolConfig, int timeout, string password) {
79 //         this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE);
80 //     }
81 
82 //     this(string masterName, Set!(string) sentinels,
83 //             GenericObjectPoolConfig poolConfig, int timeout) {
84 //         this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE);
85 //     }
86 
87 //     this(string masterName, Set!(string) sentinels,
88 //             GenericObjectPoolConfig poolConfig, string password) {
89 //         this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password);
90 //     }
91 
92 //     this(string masterName, Set!(string) sentinels,
93 //             GenericObjectPoolConfig poolConfig, int timeout, string password,
94 //             int database) {
95 //         this(masterName, sentinels, poolConfig, timeout, timeout, password, database);
96 //     }
97 
98 //     this(string masterName, Set!(string) sentinels,
99 //             GenericObjectPoolConfig poolConfig, int timeout, string password,
100 //             int database, string clientName) {
101 //         this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName);
102 //     }
103 
104 //     this(string masterName, Set!(string) sentinels,
105 //             GenericObjectPoolConfig poolConfig, int timeout, int soTimeout,
106 //             string password, int database) {
107 //         this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null);
108 //     }
109 
110 //     this(string masterName, Set!(string) sentinels,
111 //             GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout,
112 //             string password, int database, string clientName) {
113         
114 //         masterListeners = new HashSet!(MasterListener)();
115 //         initPoolLock = new Object();
116 //         this.poolConfig = poolConfig;
117 //         this.connectionTimeout = connectionTimeout;
118 //         this.soTimeout = soTimeout;
119 //         this.password = password;
120 //         this.database = database;
121 //         this.clientName = clientName;
122 
123 //         HostAndPort master = initSentinels(sentinels, masterName);
124 //         initPool(master);
125 //     }
126 
127 //     override void destroy() {
128 //         foreach(MasterListener m ; masterListeners) {
129 //             m.shutdown();
130 //         }
131 
132 //         super.destroy();
133 //     }
134 
135 //     HostAndPort getCurrentHostMaster() {
136 //         return currentHostMaster;
137 //     }
138 
139 //     private void initPool(HostAndPort master) {
140 //         synchronized(initPoolLock){
141 //             if (master != currentHostMaster) {
142 //                 currentHostMaster = master;
143 //                 if (factory is null) {
144 //                     factory = new RedisFactory(master.getHost(), master.getPort(), connectionTimeout,
145 //                             soTimeout, password, database, clientName);
146 //                     initPool(poolConfig, factory);
147 //                 } else {
148 //                     factory.setHostAndPort(currentHostMaster);
149 //                     // although we clear the pool, we still have to check the
150 //                     // returned object
151 //                     // in getResource, this call only clears idle instances, not
152 //                     // borrowed instances
153 //                     internalPool.clear();
154 //                 }
155 
156 //                 info("Created RedisPool to master at " ~ master.toString());
157 //             }
158 //         }
159 //     }
160 
161 //     private HostAndPort initSentinels(Set!(string) sentinels, string masterName) {
162 
163 //         HostAndPort master = null;
164 //         bool sentinelAvailable = false;
165 
166 //         info("Trying to find master from available[] Sentinels...");
167 
168 //         foreach(string sentinel ; sentinels) {
169 //             HostAndPort hap = HostAndPort.parseString(sentinel);
170 
171 //             tracef("Connecting to Sentinel %s", hap);
172 
173 //             Redis redis = null;
174 //             try {
175 //                 redis = new Redis(hap);
176 
177 //                 List!(string) masterAddr = redis.sentinelGetMasterAddrByName(masterName);
178 
179 //                 // connected to[] sentinel...
180 //                 sentinelAvailable = true;
181 
182 //                 if (masterAddr is null || masterAddr.size() != 2) {
183 //                     warningf("Can not get master addr, master name: %s. Sentinel: %s", masterName, hap);
184 //                     continue;
185 //                 }
186 
187 //                 master = toHostAndPort(masterAddr);
188 //                 tracef("Found Redis master at %s", master);
189 //                 break;
190 //             } catch (RedisException e) {
191 //                 // resolves #1036, it should handle RedisException there's another chance
192 //                 // of raising RedisDataException
193 //                 warningf(
194 //                     "Cannot get master address from sentinel running @ %s. Reason: %s. Trying next one.", hap,
195 //                     e.toString());
196 //             } finally {
197 //                 if (redis !is null) {
198 //                     redis.close();
199 //                 }
200 //             }
201 //         }
202 
203 //         if (master is null) {
204 //             if (sentinelAvailable) {
205 //                 // can connect to sentinel, but master name seems to not
206 //                 // monitored
207 //                 throw new RedisException("Can connect to sentinel, but " ~ masterName
208 //                         ~ " seems to be not[] monitored...");
209 //             } else {
210 //                 throw new RedisConnectionException("All sentinels down, cannot determine where is "
211 //                         ~ masterName ~ " master is[] running...");
212 //             }
213 //         }
214 
215 //         info("Redis master running at " ~ master.toString() ~ ", starting Sentinel[] listeners...");
216 
217 //         foreach(string sentinel ; sentinels) {
218 //             HostAndPort hap = HostAndPort.parseString(sentinel);
219 //             MasterListener masterListener = new this(masterName, hap.getHost(), hap.getPort());
220 //             // whether MasterListener threads are alive or not, process can be stopped
221 //             masterListener.setDaemon(true);
222 //             masterListeners.add(masterListener);
223 //             masterListener.start();
224 //         }
225 
226 //         return master;
227 //     }
228 
229 //     private HostAndPort toHostAndPort(string[] getMasterAddrByNameResult) {
230 //         string host = getMasterAddrByNameResult[0];
231 //         int port = to!int(getMasterAddrByNameResult[1]);
232 
233 //         return new HostAndPort(host, port);
234 //     }
235 
236 //     private HostAndPort toHostAndPort(List!(string) getMasterAddrByNameResult) {
237 //         string host = getMasterAddrByNameResult.get(0);
238 //         int port = to!int(getMasterAddrByNameResult.get(1));
239 
240 //         return new HostAndPort(host, port);
241 //     }
242 
243 //     override
244 //     Redis getResource() {
245 //         while (true) {
246 //             Redis redis = super.getResource();
247 //             redis.setDataSource(this);
248 
249 //             // get a reference because it can change concurrently
250 //             HostAndPort master = currentHostMaster;
251 //             HostAndPort connection = new HostAndPort(redis.getClient().getHost(), redis.getClient()
252 //                     .getPort());
253 
254 //             if (master == connection) {
255 //                 // connected to the correct master
256 //                 return redis;
257 //             } else {
258 //                 returnBrokenResource(redis);
259 //             }
260 //         }
261 //     }
262 
263 //     override
264 //     protected void returnBrokenResource(Redis resource) {
265 //         if (resource !is null) {
266 //             returnBrokenResourceObject(resource);
267 //         }
268 //     }
269 
270 //     override
271 //     protected void returnResource(Redis resource) {
272 //         if (resource !is null) {
273 //             resource.resetState();
274 //             returnResourceObject(resource);
275 //         }
276 //     }
277 
278 //     protected class MasterListener : ThreadEx {
279 
280 //         protected string masterName;
281 //         protected string host;
282 //         protected int port;
283 //         protected long subscribeRetryWaitTimeMillis = 5000;
284 //         protected Redis j;
285 //         protected shared bool running = false;
286 
287 //         protected this() {
288 //         }
289 
290 //         this(string masterName, string host, int port) {
291 //             super(format("MasterListener-%s-[%s:%d]", masterName, host, port));
292 //             this.masterName = masterName;
293 //             this.host = host;
294 //             this.port = port;
295 //         }
296 
297 //         this(string masterName, string host, int port,
298 //                 long subscribeRetryWaitTimeMillis) {
299 //             this(masterName, host, port);
300 //             this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
301 //         }
302 
303 //         override
304 //         void run() {
305 
306 //             running = true;
307 
308 //             while (running) {
309 
310 //                 j = new Redis(host, port);
311 
312 //                 try {
313 //                     // double check that it is not being shutdown
314 //                     if (!running) {
315 //                         break;
316 //                     }
317                     
318 //                     /*
319 //                      * Added code for active refresh
320 //                      */
321 //                     List!(string) masterAddr = j.sentinelGetMasterAddrByName(masterName);  
322 //                     if (masterAddr is null || masterAddr.size() != 2) {
323 //                         warningf("Can not get master addr, master name: %s. Sentinel: %s:%s.",masterName,host,port);
324 //                     }else{
325 //                             initPool(toHostAndPort(masterAddr)); 
326 //                     }
327 
328 //                     j.subscribe(new class RedisPubSub {
329 //                         override
330 //                         void onMessage(string channel, string message) {
331 //                             tracef("Sentinel %s:%s published: %s.", host, port, message);
332 
333 //                             string[] switchMasterMsg = message.split(" ");
334 
335 //                             if (switchMasterMsg.length > 3) {
336 
337 //                                 if (masterName == switchMasterMsg[0]) {
338 //                                     initPool(toHostAndPort([switchMasterMsg[3], switchMasterMsg[4]]));
339 //                                 } else {
340 //                                     tracef(
341 //                                         "Ignoring message on +switch-master for master name %s, our master name is %s",
342 //                                         switchMasterMsg[0], masterName);
343 //                                 }
344 
345 //                             } else {
346 //                                 errorf(
347 //                                     "Invalid message received on Sentinel %s:%s on channel +switch-master: %s", host,
348 //                                     port, message);
349 //                             }
350 //                         }
351 //                     }, "+switch-master");
352 
353 //                 } catch (RedisException e) {
354 
355 //                     if (running) {
356 //                         errorf("Lost connection to Sentinel at %s:%s. Sleeping 5000ms and retrying.", host,
357 //                             port, e);
358 //                         try {
359 //                             Thread.sleep(subscribeRetryWaitTimeMillis.msecs);
360 //                         } catch (InterruptedException e1) {
361 //                             errorf("Sleep interrupted: ", e1);
362 //                         }
363 //                     } else {
364 //                         tracef("Unsubscribing from Sentinel at %s:%s", host, port);
365 //                     }
366 //                 } finally {
367 //                     j.close();
368 //                 }
369 //             }
370 //         }
371 
372 //         void shutdown() {
373 //             try {
374 //                 tracef("Shutting down listener on %s:%s", host, port);
375 //                 running = false;
376 //                 // This isn't good, the Redis object is not thread safe
377 //                 if (j !is null) {
378 //                     j.disconnect();
379 //                 }
380 //             } catch (Exception e) {
381 //                 errorf("Caught exception while shutting down: ", e);
382 //             }
383 //         }
384 //     }
385 // }