1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.RedisSentinelPool;
13 
14 import hunt.redis.Exceptions;
15 import hunt.redis.HostAndPort;
16 import hunt.redis.Protocol;
17 import hunt.redis.Redis;
18 import hunt.redis.RedisPubSub;
19 
20 import hunt.Exceptions;
21 import hunt.collection.HashSet;
22 import hunt.collection.List;
23 import hunt.collection.Set;
24 import hunt.concurrency.thread;
25 import hunt.logging.ConsoleLogger;
26 import hunt.util.pool;
27 import hunt.util.ArrayHelper;
28 
29 import core.atomic;
30 import core.thread;
31 import core.time;
32 
33 import std.algorithm;
34 import std.conv;
35 import std.format;
36 import std.string;
37 
38 /**
39 */
40 // class RedisSentinelPool : RedisPoolAbstract {
41 
42 //     protected GenericObjectPoolConfig poolConfig;
43 
44 //     protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
45 //     protected int soTimeout = Protocol.DEFAULT_TIMEOUT;
46 
47 //     protected string password;
48 
49 //     protected int database = Protocol.DEFAULT_DATABASE;
50 
51 //     protected string clientName;
52 
53 //     protected Set!(MasterListener) masterListeners;
54 
55 //     private RedisFactory factory;
56 //     private HostAndPort currentHostMaster;
57     
58 //     private Object initPoolLock;
59 
60 //     this(string masterName, Set!(string) sentinels,
61 //             GenericObjectPoolConfig poolConfig) {
62 //         this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
63 //                 Protocol.DEFAULT_DATABASE);
64 //     }
65 
66 //     this(string masterName, Set!(string) sentinels) {
67 //         this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null,
68 //                 Protocol.DEFAULT_DATABASE);
69 //     }
70 
71 //     this(string masterName, Set!(string) sentinels, string password) {
72 //         this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password);
73 //     }
74 
75 //     this(string masterName, Set!(string) sentinels,
76 //             GenericObjectPoolConfig poolConfig, int timeout, string password) {
77 //         this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE);
78 //     }
79 
80 //     this(string masterName, Set!(string) sentinels,
81 //             GenericObjectPoolConfig poolConfig, int timeout) {
82 //         this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE);
83 //     }
84 
85 //     this(string masterName, Set!(string) sentinels,
86 //             GenericObjectPoolConfig poolConfig, string password) {
87 //         this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password);
88 //     }
89 
90 //     this(string masterName, Set!(string) sentinels,
91 //             GenericObjectPoolConfig poolConfig, int timeout, string password,
92 //             int database) {
93 //         this(masterName, sentinels, poolConfig, timeout, timeout, password, database);
94 //     }
95 
96 //     this(string masterName, Set!(string) sentinels,
97 //             GenericObjectPoolConfig poolConfig, int timeout, string password,
98 //             int database, string clientName) {
99 //         this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName);
100 //     }
101 
102 //     this(string masterName, Set!(string) sentinels,
103 //             GenericObjectPoolConfig poolConfig, int timeout, int soTimeout,
104 //             string password, int database) {
105 //         this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null);
106 //     }
107 
108 //     this(string masterName, Set!(string) sentinels,
109 //             GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout,
110 //             string password, int database, string clientName) {
111         
112 //         masterListeners = new HashSet!(MasterListener)();
113 //         initPoolLock = new Object();
114 //         this.poolConfig = poolConfig;
115 //         this.connectionTimeout = connectionTimeout;
116 //         this.soTimeout = soTimeout;
117 //         this.password = password;
118 //         this.database = database;
119 //         this.clientName = clientName;
120 
121 //         HostAndPort master = initSentinels(sentinels, masterName);
122 //         initPool(master);
123 //     }
124 
125 //     override void destroy() {
126 //         foreach(MasterListener m ; masterListeners) {
127 //             m.shutdown();
128 //         }
129 
130 //         super.destroy();
131 //     }
132 
133 //     HostAndPort getCurrentHostMaster() {
134 //         return currentHostMaster;
135 //     }
136 
137 //     private void initPool(HostAndPort master) {
138 //         synchronized(initPoolLock){
139 //             if (master != currentHostMaster) {
140 //                 currentHostMaster = master;
141 //                 if (factory is null) {
142 //                     factory = new RedisFactory(master.getHost(), master.getPort(), connectionTimeout,
143 //                             soTimeout, password, database, clientName);
144 //                     initPool(poolConfig, factory);
145 //                 } else {
146 //                     factory.setHostAndPort(currentHostMaster);
147 //                     // although we clear the pool, we still have to check the
148 //                     // returned object
149 //                     // in getResource, this call only clears idle instances, not
150 //                     // borrowed instances
151 //                     internalPool.clear();
152 //                 }
153 
154 //                 info("Created RedisPool to master at " ~ master.toString());
155 //             }
156 //         }
157 //     }
158 
159 //     private HostAndPort initSentinels(Set!(string) sentinels, string masterName) {
160 
161 //         HostAndPort master = null;
162 //         bool sentinelAvailable = false;
163 
164 //         info("Trying to find master from available[] Sentinels...");
165 
166 //         foreach(string sentinel ; sentinels) {
167 //             HostAndPort hap = HostAndPort.parseString(sentinel);
168 
169 //             tracef("Connecting to Sentinel %s", hap);
170 
171 //             Redis redis = null;
172 //             try {
173 //                 redis = new Redis(hap);
174 
175 //                 List!(string) masterAddr = redis.sentinelGetMasterAddrByName(masterName);
176 
177 //                 // connected to[] sentinel...
178 //                 sentinelAvailable = true;
179 
180 //                 if (masterAddr is null || masterAddr.size() != 2) {
181 //                     warningf("Can not get master addr, master name: %s. Sentinel: %s", masterName, hap);
182 //                     continue;
183 //                 }
184 
185 //                 master = toHostAndPort(masterAddr);
186 //                 tracef("Found Redis master at %s", master);
187 //                 break;
188 //             } catch (RedisException e) {
189 //                 // resolves #1036, it should handle RedisException there's another chance
190 //                 // of raising RedisDataException
191 //                 warningf(
192 //                     "Cannot get master address from sentinel running @ %s. Reason: %s. Trying next one.", hap,
193 //                     e.toString());
194 //             } finally {
195 //                 if (redis !is null) {
196 //                     redis.close();
197 //                 }
198 //             }
199 //         }
200 
201 //         if (master is null) {
202 //             if (sentinelAvailable) {
203 //                 // can connect to sentinel, but master name seems to not
204 //                 // monitored
205 //                 throw new RedisException("Can connect to sentinel, but " ~ masterName
206 //                         ~ " seems to be not[] monitored...");
207 //             } else {
208 //                 throw new RedisConnectionException("All sentinels down, cannot determine where is "
209 //                         ~ masterName ~ " master is[] running...");
210 //             }
211 //         }
212 
213 //         info("Redis master running at " ~ master.toString() ~ ", starting Sentinel[] listeners...");
214 
215 //         foreach(string sentinel ; sentinels) {
216 //             HostAndPort hap = HostAndPort.parseString(sentinel);
217 //             MasterListener masterListener = new this(masterName, hap.getHost(), hap.getPort());
218 //             // whether MasterListener threads are alive or not, process can be stopped
219 //             masterListener.setDaemon(true);
220 //             masterListeners.add(masterListener);
221 //             masterListener.start();
222 //         }
223 
224 //         return master;
225 //     }
226 
227 //     private HostAndPort toHostAndPort(string[] getMasterAddrByNameResult) {
228 //         string host = getMasterAddrByNameResult[0];
229 //         int port = to!int(getMasterAddrByNameResult[1]);
230 
231 //         return new HostAndPort(host, port);
232 //     }
233 
234 //     private HostAndPort toHostAndPort(List!(string) getMasterAddrByNameResult) {
235 //         string host = getMasterAddrByNameResult.get(0);
236 //         int port = to!int(getMasterAddrByNameResult.get(1));
237 
238 //         return new HostAndPort(host, port);
239 //     }
240 
241 //     override
242 //     Redis getResource() {
243 //         while (true) {
244 //             Redis redis = super.getResource();
245 //             redis.setDataSource(this);
246 
247 //             // get a reference because it can change concurrently
248 //             HostAndPort master = currentHostMaster;
249 //             HostAndPort connection = new HostAndPort(redis.getClient().getHost(), redis.getClient()
250 //                     .getPort());
251 
252 //             if (master == connection) {
253 //                 // connected to the correct master
254 //                 return redis;
255 //             } else {
256 //                 returnBrokenResource(redis);
257 //             }
258 //         }
259 //     }
260 
261 //     override
262 //     protected void returnBrokenResource(Redis resource) {
263 //         if (resource !is null) {
264 //             returnBrokenResourceObject(resource);
265 //         }
266 //     }
267 
268 //     override
269 //     protected void returnResource(Redis resource) {
270 //         if (resource !is null) {
271 //             resource.resetState();
272 //             returnResourceObject(resource);
273 //         }
274 //     }
275 
276 //     protected class MasterListener : ThreadEx {
277 
278 //         protected string masterName;
279 //         protected string host;
280 //         protected int port;
281 //         protected long subscribeRetryWaitTimeMillis = 5000;
282 //         protected Redis j;
283 //         protected shared bool running = false;
284 
285 //         protected this() {
286 //         }
287 
288 //         this(string masterName, string host, int port) {
289 //             super(format("MasterListener-%s-[%s:%d]", masterName, host, port));
290 //             this.masterName = masterName;
291 //             this.host = host;
292 //             this.port = port;
293 //         }
294 
295 //         this(string masterName, string host, int port,
296 //                 long subscribeRetryWaitTimeMillis) {
297 //             this(masterName, host, port);
298 //             this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
299 //         }
300 
301 //         override
302 //         void run() {
303 
304 //             running = true;
305 
306 //             while (running) {
307 
308 //                 j = new Redis(host, port);
309 
310 //                 try {
311 //                     // double check that it is not being shutdown
312 //                     if (!running) {
313 //                         break;
314 //                     }
315                     
316 //                     /*
317 //                      * Added code for active refresh
318 //                      */
319 //                     List!(string) masterAddr = j.sentinelGetMasterAddrByName(masterName);  
320 //                     if (masterAddr is null || masterAddr.size() != 2) {
321 //                         warningf("Can not get master addr, master name: %s. Sentinel: %s:%s.",masterName,host,port);
322 //                     }else{
323 //                             initPool(toHostAndPort(masterAddr)); 
324 //                     }
325 
326 //                     j.subscribe(new class RedisPubSub {
327 //                         override
328 //                         void onMessage(string channel, string message) {
329 //                             tracef("Sentinel %s:%s published: %s.", host, port, message);
330 
331 //                             string[] switchMasterMsg = message.split(" ");
332 
333 //                             if (switchMasterMsg.length > 3) {
334 
335 //                                 if (masterName == switchMasterMsg[0]) {
336 //                                     initPool(toHostAndPort([switchMasterMsg[3], switchMasterMsg[4]]));
337 //                                 } else {
338 //                                     tracef(
339 //                                         "Ignoring message on +switch-master for master name %s, our master name is %s",
340 //                                         switchMasterMsg[0], masterName);
341 //                                 }
342 
343 //                             } else {
344 //                                 errorf(
345 //                                     "Invalid message received on Sentinel %s:%s on channel +switch-master: %s", host,
346 //                                     port, message);
347 //                             }
348 //                         }
349 //                     }, "+switch-master");
350 
351 //                 } catch (RedisException e) {
352 
353 //                     if (running) {
354 //                         errorf("Lost connection to Sentinel at %s:%s. Sleeping 5000ms and retrying.", host,
355 //                             port, e);
356 //                         try {
357 //                             Thread.sleep(subscribeRetryWaitTimeMillis.msecs);
358 //                         } catch (InterruptedException e1) {
359 //                             errorf("Sleep interrupted: ", e1);
360 //                         }
361 //                     } else {
362 //                         tracef("Unsubscribing from Sentinel at %s:%s", host, port);
363 //                     }
364 //                 } finally {
365 //                     j.close();
366 //                 }
367 //             }
368 //         }
369 
370 //         void shutdown() {
371 //             try {
372 //                 tracef("Shutting down listener on %s:%s", host, port);
373 //                 running = false;
374 //                 // This isn't good, the Redis object is not thread safe
375 //                 if (j !is null) {
376 //                     j.disconnect();
377 //                 }
378 //             } catch (Exception e) {
379 //                 errorf("Caught exception while shutting down: ", e);
380 //             }
381 //         }
382 //     }
383 // }