1 /*
2 * Hunt - A redis client library for D programming language.
3 *
4 * Copyright (C) 2018-2019 HuntLabs
5 *
6 * Website: https://www.huntlabs.net/
7 *
8 * Licensed under the Apache-2.0 License.
9 *
10 */
11
12 module hunt.redis.RedisClusterInfoCache;
13
14 import hunt.redis.Client;
15 import hunt.redis.Exceptions;
16 import hunt.redis.HostAndPort;
17 import hunt.redis.RedisClusterHostAndPortMap;
18 import hunt.redis.Redis;
19 import hunt.redis.RedisPool;
20 import hunt.redis.RedisPoolOptions;
21 import hunt.redis.util.SafeEncoder;
22
23 import hunt.collection.ArrayList;
24 import hunt.collection.Collections;
25 import hunt.collection.HashMap;
26 import hunt.collection.List;
27 import hunt.collection.Map;
28 import hunt.logging.ConsoleLogger;
29
30 import hunt.util.pool;
31
32 import hunt.Byte;
33 import hunt.Long;
34 import hunt.Integer;
35 import hunt.String;
36
37 import core.sync.mutex;
38 import core.sync.condition;
39 import std.conv;
40
41
42 /**
43 *
44 */
45 class RedisClusterInfoCache {
46 private Map!(string, RedisPool) nodes;
47 private Map!(int, RedisPool) slots;
48
49 private Mutex mutex;
50 private bool rediscovering;
51 private RedisPoolOptions poolConfig;
52
53 // private SSLSocketFactory sslSocketFactory;
54 // private SSLParameters sslParameters;
55 // private HostnameVerifier hostnameVerifier;
56 private RedisClusterHostAndPortMap hostAndPortMap;
57
58 private enum int MASTER_NODE_INDEX = 2;
59
60 this(RedisPoolOptions poolConfig) {
61
62 nodes = new HashMap!(string, RedisPool)();
63 slots = new HashMap!(int, RedisPool)();
64 mutex = new Mutex();
65
66 this.poolConfig = poolConfig;
67 this.hostAndPortMap = null;
68 }
69
70 void discoverClusterNodesAndSlots(Redis redis) {
71 mutex.lock();
72 scope(exit) mutex.unlock();
73
74 try {
75 reset();
76 List!(Object) slots = redis.clusterSlots();
77
78 foreach(Object slotInfoObj ; slots) {
79 List!(Object) slotInfo = cast(List!(Object)) slotInfoObj;
80
81 if (slotInfo.size() <= MASTER_NODE_INDEX) {
82 continue;
83 }
84
85 List!(int) slotNums = getAssignedSlotArray(slotInfo);
86
87 // hostInfos
88 int size = slotInfo.size();
89 for (int i = MASTER_NODE_INDEX; i < size; i++) {
90 List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(i);
91 if (hostInfos.size() <= 0) {
92 continue;
93 }
94
95 HostAndPort targetNode = generateHostAndPort(hostInfos);
96 setupNodeIfNotExist(targetNode);
97 if (i == MASTER_NODE_INDEX) {
98 assignSlotsToNode(slotNums, targetNode);
99 }
100 }
101 }
102 } catch(Throwable ex) {
103 debug warning(ex.msg);
104 version(HUNT_DEBUG) warning(ex);
105 }
106 }
107
108 void renewClusterSlots(Redis redis) {
109 //If rediscovering is already in process - no need to start one more same rediscovering, just return
110 if (!rediscovering) {
111 mutex.lock();
112 scope(exit) mutex.unlock();
113
114 try {
115 if (!rediscovering) {
116 rediscovering = true;
117
118 try {
119 if (redis !is null) {
120 try {
121 discoverClusterSlots(redis);
122 return;
123 } catch (RedisException ex) {
124 //try nodes from all pools
125 debug warning(ex.msg);
126 version(HUNT_REDIS_DEBUG) warning(ex);
127 }
128 }
129
130 foreach(RedisPool jp ; getShuffledNodesPool()) {
131 Redis j = null;
132 try {
133 j = jp.borrow();
134 discoverClusterSlots(j);
135 return;
136 } catch (RedisConnectionException ex) {
137 // try next nodes
138 debug warning(ex.msg);
139 version(HUNT_REDIS_DEBUG) warning(ex);
140 } finally {
141 if (j !is null) {
142 j.close();
143 }
144 }
145 }
146 } finally {
147 rediscovering = false;
148 }
149 }
150 } catch(Exception ex) {
151 debug warning(ex.msg);
152 version(HUNT_REDIS_DEBUG) warning(ex);
153 }
154 }
155 }
156
157 private void discoverClusterSlots(Redis redis) {
158 List!(Object) slots = redis.clusterSlots();
159 this.slots.clear();
160
161 foreach(Object slotInfoObj ; slots) {
162 List!(Object) slotInfo = cast(List!(Object)) slotInfoObj;
163
164 if (slotInfo.size() <= MASTER_NODE_INDEX) {
165 continue;
166 }
167
168 List!(int) slotNums = getAssignedSlotArray(slotInfo);
169
170 // hostInfos
171 List!(Object) hostInfos = cast(List!(Object)) slotInfo.get(MASTER_NODE_INDEX);
172 if (hostInfos.isEmpty()) {
173 continue;
174 }
175
176 // at this time, we just use master, discard slave information
177 HostAndPort targetNode = generateHostAndPort(hostInfos);
178 assignSlotsToNode(slotNums, targetNode);
179 }
180 }
181
182 private HostAndPort generateHostAndPort(List!(Object) hostInfos) {
183 Object info = hostInfos.get(0);
184
185 Bytes infoBytes = cast(Bytes)info;
186 if(infoBytes is null) {
187 warningf("wrong cast: from %s to %s", typeid(info), typeid(Bytes));
188 }
189 string host = cast(string)infoBytes.value();
190 int port = (cast(Long) hostInfos.get(1)).intValue();
191 if (poolConfig.ssl && hostAndPortMap !is null) {
192 HostAndPort hostAndPort = hostAndPortMap.getSSLHostAndPort(host, port);
193 if (hostAndPort !is null) {
194 return hostAndPort;
195 }
196 }
197 version(HUNT_REDIS_DEBUG) tracef("%s:%d", host, port);
198 return new HostAndPort(host, port);
199 }
200
201 RedisPool setupNodeIfNotExist(HostAndPort node) {
202 mutex.lock();
203 try {
204 string nodeKey = getNodeKey(node);
205 RedisPool existingPool = nodes.get(nodeKey);
206 if (existingPool !is null) return existingPool;
207
208
209 RedisPoolOptions config = new RedisPoolOptions(poolConfig);
210 config.host = node.getHost();
211 config.port = node.getPort;
212
213 RedisPool nodePool = new RedisFactory(config).pool();
214
215 nodes.put(nodeKey, nodePool);
216 return nodePool;
217 } finally {
218 mutex.unlock();
219 }
220 }
221
222 void assignSlotToNode(int slot, HostAndPort targetNode) {
223 mutex.lock();
224 try {
225 RedisPool targetPool = setupNodeIfNotExist(targetNode);
226 slots.put(slot, targetPool);
227 } finally {
228 mutex.unlock();
229 }
230 }
231
232 void assignSlotsToNode(List!(int) targetSlots, HostAndPort targetNode) {
233 mutex.lock();
234 try {
235 RedisPool targetPool = setupNodeIfNotExist(targetNode);
236 foreach(int slot ; targetSlots) {
237 slots.put(slot, targetPool);
238 }
239 } finally {
240 mutex.unlock();
241 }
242 }
243
244 RedisPool getNode(string nodeKey) {
245 mutex.lock();
246 try {
247 return nodes.get(nodeKey);
248 } finally {
249 mutex.unlock();
250 }
251 }
252
253 RedisPool getSlotPool(int slot) {
254 mutex.lock();
255 try {
256 return slots.get(slot);
257 } finally {
258 mutex.unlock();
259 }
260 }
261
262 Map!(string, RedisPool) getNodes() {
263 mutex.lock();
264 try {
265 return new HashMap!(string, RedisPool)(nodes);
266 } finally {
267 mutex.unlock();
268 }
269 }
270
271 List!(RedisPool) getShuffledNodesPool() {
272 mutex.lock();
273 try {
274 List!(RedisPool) pools = new ArrayList!(RedisPool)(nodes.values());
275 // TODO: Tasks pending completion -@zxp at 7/17/2019, 10:01:18 AM
276 //
277 // Collections.shuffle(pools);
278 return pools;
279 } finally {
280 mutex.unlock();
281 }
282 }
283
284 /**
285 * Clear discovered nodes collections and gently release allocated resources
286 */
287 void reset() {
288 mutex.lock();
289 scope(exit) {
290 mutex.unlock();
291 }
292
293 doReset();
294 }
295
296 private void doReset() {
297 foreach(RedisPool pool ; nodes.values()) {
298 try {
299 if (pool !is null) {
300 pool.destroy();
301 }
302 } catch (Exception ex) {
303 // pass
304 debug warning(ex.msg);
305 version(HUNT_REDIS_DEBUG) warning(ex);
306 }
307 }
308 nodes.clear();
309 slots.clear();
310 }
311
312 static string getNodeKey(HostAndPort hnp) {
313 return hnp.getHost() ~ ":" ~ hnp.getPort().to!string();
314 }
315
316 static string getNodeKey(Client client) {
317 return client.getHost() ~ ":" ~ client.getPort().to!string();
318 }
319
320 static string getNodeKey(Redis redis) {
321 return getNodeKey(redis.getClient());
322 }
323
324 private List!(int) getAssignedSlotArray(List!(Object) slotInfo) {
325 List!(int) slotNums = new ArrayList!(int)();
326 for (int slot = (cast(Long) slotInfo.get(0)).intValue(); slot <= (cast(Long) slotInfo.get(1))
327 .intValue(); slot++) {
328 slotNums.add(slot);
329 }
330 return slotNums;
331 }
332 }