1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.RedisClusterInfoCache;
13 
14 import hunt.redis.Client;
15 import hunt.redis.Exceptions;
16 import hunt.redis.HostAndPort;
17 import hunt.redis.RedisClusterHostAndPortMap;
18 import hunt.redis.Redis;
19 import hunt.redis.RedisPool;
20 import hunt.redis.util.SafeEncoder;
21 
22 import hunt.collection.ArrayList;
23 import hunt.collection.Collections;
24 import hunt.collection.HashMap;
25 import hunt.collection.List;
26 import hunt.collection.Map;
27 import hunt.logging.ConsoleLogger;
28 import hunt.pool.impl.GenericObjectPoolConfig;
29 
30 import hunt.Byte;
31 import hunt.Long;
32 import hunt.Integer;
33 import hunt.String;
34 
35 import core.sync.mutex;
36 import core.sync.condition;
37 import std.conv;
38 
39 class RedisClusterInfoCache {
40     private Map!(string, RedisPool) nodes;
41     private Map!(int, RedisPool) slots;
42 
43     private Mutex mutex;
44     // private Object.Monitor r;
45     // private Object.Monitor w;
46     private bool rediscovering;
47     private GenericObjectPoolConfig poolConfig;
48 
49     private int connectionTimeout;
50     private int soTimeout;
51     private string password;
52     private string clientName;
53 
54     private bool ssl;
55 //   private SSLSocketFactory sslSocketFactory;
56 //   private SSLParameters sslParameters;
57 //   private HostnameVerifier hostnameVerifier;
58     private RedisClusterHostAndPortMap hostAndPortMap;
59 
60     private enum int MASTER_NODE_INDEX = 2;
61 
62     this(GenericObjectPoolConfig poolConfig, int timeout) {
63         this(poolConfig, timeout, timeout, null, null);
64     }
65 
66     this(GenericObjectPoolConfig poolConfig,
67             int connectionTimeout, int soTimeout, string password, string clientName) {
68         // this(poolConfig, connectionTimeout, soTimeout, password, clientName, false, null, null, null, null);
69 
70         nodes = new HashMap!(string, RedisPool)();
71         slots = new HashMap!(int, RedisPool)();
72         mutex = new Mutex();
73 
74         this.poolConfig = poolConfig;
75         this.connectionTimeout = connectionTimeout;
76         this.soTimeout = soTimeout;
77         this.password = password;
78         this.clientName = clientName;
79         this.ssl = false;
80         // this.sslSocketFactory = sslSocketFactory;
81         // this.sslParameters = sslParameters;
82         // this.hostnameVerifier = hostnameVerifier;
83         this.hostAndPortMap = null;    
84     }
85 
86 //   this(GenericObjectPoolConfig poolConfig,
87 //       int connectionTimeout, int soTimeout, string password, string clientName,
88 //       bool ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, 
89 //       HostnameVerifier hostnameVerifier, RedisClusterHostAndPortMap hostAndPortMap) {
90 //     this.poolConfig = poolConfig;
91 //     this.connectionTimeout = connectionTimeout;
92 //     this.soTimeout = soTimeout;
93 //     this.password = password;
94 //     this.clientName = clientName;
95 //     this.ssl = ssl;
96 //     this.sslSocketFactory = sslSocketFactory;
97 //     this.sslParameters = sslParameters;
98 //     this.hostnameVerifier = hostnameVerifier;
99 //     this.hostAndPortMap = hostAndPortMap;
100 //   }
101 
102     void discoverClusterNodesAndSlots(Redis redis) {
103         mutex.lock();
104         scope(exit) mutex.unlock();
105 
106         try {
107             reset(); 
108             List!(Object) slots = redis.clusterSlots();
109 
110             foreach(Object slotInfoObj ; slots) {
111                 List!(Object) slotInfo = cast(List!(Object)) slotInfoObj;
112 
113                 if (slotInfo.size() <= MASTER_NODE_INDEX) {
114                     continue;
115                 }
116 
117                 List!(int) slotNums = getAssignedSlotArray(slotInfo);
118 
119                 // hostInfos
120                 int size = slotInfo.size();
121                 for (int i = MASTER_NODE_INDEX; i < size; i++) {
122                     List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(i);
123                     if (hostInfos.size() <= 0) {
124                         continue;
125                     }
126 
127                     HostAndPort targetNode = generateHostAndPort(hostInfos);
128                     setupNodeIfNotExist(targetNode);
129                     if (i == MASTER_NODE_INDEX) {
130                         assignSlotsToNode(slotNums, targetNode);
131                     }
132                 }
133             }
134         } catch(Throwable ex) {
135             debug warning(ex.msg);
136             version(HUNT_DEBUG) warning(ex);
137         }
138     }
139 
140     void renewClusterSlots(Redis redis) {
141         //If rediscovering is already in process - no need to start one more same rediscovering, just return
142         if (!rediscovering) {
143             mutex.lock();
144             scope(exit) mutex.unlock();
145 
146             try {
147                 if (!rediscovering) {
148                     rediscovering = true;
149 
150                     try {
151                         if (redis !is null) {
152                             try {
153                                 discoverClusterSlots(redis);
154                                 return;
155                             } catch (RedisException ex) {
156                                 //try nodes from all pools
157                                 debug warning(ex.msg);
158                                 version(HUNT_REDIS_DEBUG) warning(ex);
159                             }
160                         }
161 
162                         foreach(RedisPool jp ; getShuffledNodesPool()) {
163                             Redis j = null;
164                             try {
165                                 j = jp.getResource();
166                                 discoverClusterSlots(j);
167                                 return;
168                             } catch (RedisConnectionException ex) {
169                                 // try next nodes
170                                 debug warning(ex.msg);
171                                 version(HUNT_REDIS_DEBUG) warning(ex);
172                             } finally {
173                                 if (j !is null) {
174                                     j.close();
175                                 }
176                             }
177                         }
178                     } finally {
179                         rediscovering = false;      
180                     }
181                 }
182             } catch(Exception ex) {
183                 debug warning(ex.msg);
184                 version(HUNT_REDIS_DEBUG) warning(ex);
185             }
186         }
187     }
188 
189     private void discoverClusterSlots(Redis redis) {
190         List!(Object) slots = redis.clusterSlots();
191         this.slots.clear();
192 
193         foreach(Object slotInfoObj ; slots) {
194             List!(Object) slotInfo = cast(List!(Object)) slotInfoObj;
195 
196             if (slotInfo.size() <= MASTER_NODE_INDEX) {
197                 continue;
198             }
199 
200             List!(int) slotNums = getAssignedSlotArray(slotInfo);
201 
202             // hostInfos
203             List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(MASTER_NODE_INDEX);
204             if (hostInfos.isEmpty()) {
205                 continue;
206             }
207 
208             // at this time, we just use master, discard slave information
209             HostAndPort targetNode = generateHostAndPort(hostInfos);
210             assignSlotsToNode(slotNums, targetNode);
211         }
212     }
213 
214     private HostAndPort generateHostAndPort(List!(Object) hostInfos) {
215         Object info = hostInfos.get(0);
216 
217         Bytes infoBytes = cast(Bytes)info;
218         if(infoBytes is null) {
219             warningf("wrong cast: from %s to %s", typeid(info), typeid(Bytes));
220         }
221         string host = cast(string)infoBytes.value();
222         int port = (cast(Long) hostInfos.get(1)).intValue();
223         if (ssl && hostAndPortMap !is null) {
224             HostAndPort hostAndPort = hostAndPortMap.getSSLHostAndPort(host, port);
225             if (hostAndPort !is null) {
226                 return hostAndPort;
227             }
228         }
229         version(HUNT_REDIS_DEBUG) tracef("%s:%d", host, port);
230         return new HostAndPort(host, port);
231     }
232 
233     RedisPool setupNodeIfNotExist(HostAndPort node) {
234         mutex.lock();
235         try {
236             string nodeKey = getNodeKey(node);
237             RedisPool existingPool = nodes.get(nodeKey);
238             if (existingPool !is null) return existingPool;
239 
240             // RedisPool nodePool = new RedisPool(poolConfig, node.getHost(), node.getPort(),
241             //         connectionTimeout, soTimeout, password, 0, clientName, 
242             //         ssl, sslSocketFactory, sslParameters, hostnameVerifier);
243             RedisPool nodePool = new RedisPool(poolConfig, node.getHost(), node.getPort(),
244                     connectionTimeout, soTimeout, password, 0, clientName, 
245                     ssl);            
246             nodes.put(nodeKey, nodePool);
247             return nodePool;
248         } finally {
249             mutex.unlock();
250         }
251     }
252 
253     void assignSlotToNode(int slot, HostAndPort targetNode) {
254         mutex.lock();
255         try {
256             RedisPool targetPool = setupNodeIfNotExist(targetNode);
257             slots.put(slot, targetPool);
258         } finally {
259             mutex.unlock();
260         }
261     }
262 
263     void assignSlotsToNode(List!(int) targetSlots, HostAndPort targetNode) {
264         mutex.lock();
265         try {
266             RedisPool targetPool = setupNodeIfNotExist(targetNode);
267             foreach(int slot ; targetSlots) {
268                 slots.put(slot, targetPool);
269             }
270         } finally {
271             mutex.unlock();
272         }
273     }
274 
275     RedisPool getNode(string nodeKey) {
276         mutex.lock();
277         try {
278             return nodes.get(nodeKey);
279         } finally {
280             mutex.unlock();
281         }
282     }
283 
284     RedisPool getSlotPool(int slot) {
285         mutex.lock();
286         try {
287             return slots.get(slot);
288         } finally {
289             mutex.unlock();
290         }
291     }
292 
293     Map!(string, RedisPool) getNodes() {
294         mutex.lock();
295         try {
296             return new HashMap!(string, RedisPool)(nodes);
297         } finally {
298             mutex.unlock();
299         }
300     }
301 
302     List!(RedisPool) getShuffledNodesPool() {
303         mutex.lock();
304         try {
305             List!(RedisPool) pools = new ArrayList!(RedisPool)(nodes.values());
306 // TODO: Tasks pending completion -@zxp at 7/17/2019, 10:01:18 AM            
307 // 
308             // Collections.shuffle(pools);
309             return pools;
310         } finally {
311             mutex.unlock();
312         }
313     }
314 
315     /**
316      * Clear discovered nodes collections and gently release allocated resources
317      */
318     void reset() {
319         mutex.lock();
320         scope(exit) {
321             mutex.unlock();
322         }
323         
324         doReset();
325     }
326 
327     private void doReset() {
328         foreach(RedisPool pool ; nodes.values()) {
329             try {
330                 if (pool !is null) {
331                     pool.destroy();
332                 }
333             } catch (Exception ex) {
334                 // pass
335                 debug warning(ex.msg);
336                 version(HUNT_REDIS_DEBUG) warning(ex);                    
337             }
338         }
339         nodes.clear();
340         slots.clear();
341     }
342 
343     static string getNodeKey(HostAndPort hnp) {
344         return hnp.getHost() ~ ":" ~ hnp.getPort().to!string();
345     }
346 
347     static string getNodeKey(Client client) {
348         return client.getHost() ~ ":" ~ client.getPort().to!string();
349     }
350 
351     static string getNodeKey(Redis redis) {
352         return getNodeKey(redis.getClient());
353     }
354 
355     private List!(int) getAssignedSlotArray(List!(Object) slotInfo) {
356         List!(int) slotNums = new ArrayList!(int)();
357         for (int slot = (cast(Long) slotInfo.get(0)).intValue(); slot <= (cast(Long) slotInfo.get(1))
358                 .intValue(); slot++) {
359             slotNums.add(slot);
360         }
361         return slotNums;
362     }
363 }