1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.RedisClusterInfoCache;
13 
14 import hunt.redis.Client;
15 import hunt.redis.Exceptions;
16 import hunt.redis.HostAndPort;
17 import hunt.redis.RedisClusterHostAndPortMap;
18 import hunt.redis.Redis;
19 import hunt.redis.RedisPool;
20 import hunt.redis.RedisPoolOptions;
21 import hunt.redis.util.SafeEncoder;
22 
23 import hunt.collection.ArrayList;
24 import hunt.collection.Collections;
25 import hunt.collection.HashMap;
26 import hunt.collection.List;
27 import hunt.collection.Map;
28 import hunt.logging.ConsoleLogger;
29 
30 import hunt.util.pool;
31 
32 import hunt.Byte;
33 import hunt.Long;
34 import hunt.Integer;
35 import hunt.String;
36 
37 import core.sync.mutex;
38 import core.sync.condition;
39 import std.conv;
40 
41 
42 /** 
43  * 
44  */
45 class RedisClusterInfoCache {
46     private Map!(string, RedisPool) nodes;
47     private Map!(int, RedisPool) slots;
48 
49     private Mutex mutex;
50     private bool rediscovering;
51     private RedisPoolOptions poolConfig;
52 
53 //   private SSLSocketFactory sslSocketFactory;
54 //   private SSLParameters sslParameters;
55 //   private HostnameVerifier hostnameVerifier;
56     private RedisClusterHostAndPortMap hostAndPortMap;
57 
58     private enum int MASTER_NODE_INDEX = 2;
59 
60     this(RedisPoolOptions poolConfig) {
61 
62         nodes = new HashMap!(string, RedisPool)();
63         slots = new HashMap!(int, RedisPool)();
64         mutex = new Mutex();
65 
66         this.poolConfig = poolConfig;
67         this.hostAndPortMap = null;    
68     }
69 
70     void discoverClusterNodesAndSlots(Redis redis) {
71         mutex.lock();
72         scope(exit) mutex.unlock();
73 
74         try {
75             reset(); 
76             List!(Object) slots = redis.clusterSlots();
77 
78             foreach(Object slotInfoObj ; slots) {
79                 List!(Object) slotInfo = cast(List!(Object)) slotInfoObj;
80 
81                 if (slotInfo.size() <= MASTER_NODE_INDEX) {
82                     continue;
83                 }
84 
85                 List!(int) slotNums = getAssignedSlotArray(slotInfo);
86 
87                 // hostInfos
88                 int size = slotInfo.size();
89                 for (int i = MASTER_NODE_INDEX; i < size; i++) {
90                     List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(i);
91                     if (hostInfos.size() <= 0) {
92                         continue;
93                     }
94 
95                     HostAndPort targetNode = generateHostAndPort(hostInfos);
96                     setupNodeIfNotExist(targetNode);
97                     if (i == MASTER_NODE_INDEX) {
98                         assignSlotsToNode(slotNums, targetNode);
99                     }
100                 }
101             }
102         } catch(Throwable ex) {
103             debug warning(ex.msg);
104             version(HUNT_DEBUG) warning(ex);
105         }
106     }
107 
108     void renewClusterSlots(Redis redis) {
109         //If rediscovering is already in process - no need to start one more same rediscovering, just return
110         if (!rediscovering) {
111             mutex.lock();
112             scope(exit) mutex.unlock();
113 
114             try {
115                 if (!rediscovering) {
116                     rediscovering = true;
117 
118                     try {
119                         if (redis !is null) {
120                             try {
121                                 discoverClusterSlots(redis);
122                                 return;
123                             } catch (RedisException ex) {
124                                 //try nodes from all pools
125                                 debug warning(ex.msg);
126                                 version(HUNT_REDIS_DEBUG) warning(ex);
127                             }
128                         }
129 
130                         foreach(RedisPool jp ; getShuffledNodesPool()) {
131                             Redis j = null;
132                             try {
133                                 j = jp.borrow();
134                                 discoverClusterSlots(j);
135                                 return;
136                             } catch (RedisConnectionException ex) {
137                                 // try next nodes
138                                 debug warning(ex.msg);
139                                 version(HUNT_REDIS_DEBUG) warning(ex);
140                             } finally {
141                                 if (j !is null) {
142                                     j.close();
143                                 }
144                             }
145                         }
146                     } finally {
147                         rediscovering = false;      
148                     }
149                 }
150             } catch(Exception ex) {
151                 debug warning(ex.msg);
152                 version(HUNT_REDIS_DEBUG) warning(ex);
153             }
154         }
155     }
156 
157     private void discoverClusterSlots(Redis redis) {
158         List!(Object) slots = redis.clusterSlots();
159         this.slots.clear();
160 
161         foreach(Object slotInfoObj ; slots) {
162             List!(Object) slotInfo = cast(List!(Object)) slotInfoObj;
163 
164             if (slotInfo.size() <= MASTER_NODE_INDEX) {
165                 continue;
166             }
167 
168             List!(int) slotNums = getAssignedSlotArray(slotInfo);
169 
170             // hostInfos
171             List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(MASTER_NODE_INDEX);
172             if (hostInfos.isEmpty()) {
173                 continue;
174             }
175 
176             // at this time, we just use master, discard slave information
177             HostAndPort targetNode = generateHostAndPort(hostInfos);
178             assignSlotsToNode(slotNums, targetNode);
179         }
180     }
181 
182     private HostAndPort generateHostAndPort(List!(Object) hostInfos) {
183         Object info = hostInfos.get(0);
184 
185         Bytes infoBytes = cast(Bytes)info;
186         if(infoBytes is null) {
187             warningf("wrong cast: from %s to %s", typeid(info), typeid(Bytes));
188         }
189         string host = cast(string)infoBytes.value();
190         int port = (cast(Long) hostInfos.get(1)).intValue();
191         if (poolConfig.ssl && hostAndPortMap !is null) {
192             HostAndPort hostAndPort = hostAndPortMap.getSSLHostAndPort(host, port);
193             if (hostAndPort !is null) {
194                 return hostAndPort;
195             }
196         }
197         version(HUNT_REDIS_DEBUG) tracef("%s:%d", host, port);
198         return new HostAndPort(host, port);
199     }
200 
201     RedisPool setupNodeIfNotExist(HostAndPort node) {
202         mutex.lock();
203         try {
204             string nodeKey = getNodeKey(node);
205             RedisPool existingPool = nodes.get(nodeKey);
206             if (existingPool !is null) return existingPool;
207 
208 
209             RedisPoolOptions config = new RedisPoolOptions(poolConfig); 
210             config.host = node.getHost();
211             config.port = node.getPort;
212 
213             RedisPool nodePool = new RedisFactory(config).pool();
214 
215             nodes.put(nodeKey, nodePool);
216             return nodePool;
217         } finally {
218             mutex.unlock();
219         }
220     }
221 
222     void assignSlotToNode(int slot, HostAndPort targetNode) {
223         mutex.lock();
224         try {
225             RedisPool targetPool = setupNodeIfNotExist(targetNode);
226             slots.put(slot, targetPool);
227         } finally {
228             mutex.unlock();
229         }
230     }
231 
232     void assignSlotsToNode(List!(int) targetSlots, HostAndPort targetNode) {
233         mutex.lock();
234         try {
235             RedisPool targetPool = setupNodeIfNotExist(targetNode);
236             foreach(int slot ; targetSlots) {
237                 slots.put(slot, targetPool);
238             }
239         } finally {
240             mutex.unlock();
241         }
242     }
243 
244     RedisPool getNode(string nodeKey) {
245         mutex.lock();
246         try {
247             return nodes.get(nodeKey);
248         } finally {
249             mutex.unlock();
250         }
251     }
252 
253     RedisPool getSlotPool(int slot) {
254         mutex.lock();
255         try {
256             return slots.get(slot);
257         } finally {
258             mutex.unlock();
259         }
260     }
261 
262     Map!(string, RedisPool) getNodes() {
263         mutex.lock();
264         try {
265             return new HashMap!(string, RedisPool)(nodes);
266         } finally {
267             mutex.unlock();
268         }
269     }
270 
271     List!(RedisPool) getShuffledNodesPool() {
272         mutex.lock();
273         try {
274             List!(RedisPool) pools = new ArrayList!(RedisPool)(nodes.values());
275 // TODO: Tasks pending completion -@zxp at 7/17/2019, 10:01:18 AM            
276 // 
277             // Collections.shuffle(pools);
278             return pools;
279         } finally {
280             mutex.unlock();
281         }
282     }
283 
284     /**
285      * Clear discovered nodes collections and gently release allocated resources
286      */
287     void reset() {
288         mutex.lock();
289         scope(exit) {
290             mutex.unlock();
291         }
292         
293         doReset();
294     }
295 
296     private void doReset() {
297         foreach(RedisPool pool ; nodes.values()) {
298             try {
299                 if (pool !is null) {
300                     pool.destroy();
301                 }
302             } catch (Exception ex) {
303                 // pass
304                 debug warning(ex.msg);
305                 version(HUNT_REDIS_DEBUG) warning(ex);                    
306             }
307         }
308         nodes.clear();
309         slots.clear();
310     }
311 
312     static string getNodeKey(HostAndPort hnp) {
313         return hnp.getHost() ~ ":" ~ hnp.getPort().to!string();
314     }
315 
316     static string getNodeKey(Client client) {
317         return client.getHost() ~ ":" ~ client.getPort().to!string();
318     }
319 
320     static string getNodeKey(Redis redis) {
321         return getNodeKey(redis.getClient());
322     }
323 
324     private List!(int) getAssignedSlotArray(List!(Object) slotInfo) {
325         List!(int) slotNums = new ArrayList!(int)();
326         for (int slot = (cast(Long) slotInfo.get(0)).intValue(); slot <= (cast(Long) slotInfo.get(1))
327                 .intValue(); slot++) {
328             slotNums.add(slot);
329         }
330         return slotNums;
331     }
332 }