1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.RedisClusterCommand;
13 
14 import hunt.redis.Redis;
15 import hunt.redis.RedisClusterConnectionHandler;
16 import hunt.redis.Exceptions;
17 import hunt.redis.util.RedisClusterCRC16;
18 
19 abstract class RedisClusterCommand(T) {
20 
21     private RedisClusterConnectionHandler connectionHandler;
22     private int maxAttempts;
23 
24     this(RedisClusterConnectionHandler connectionHandler, int maxAttempts) {
25         this.connectionHandler = connectionHandler;
26         this.maxAttempts = maxAttempts;
27     }
28 
29     abstract T execute(Redis connection);
30 
31     T run(string key) {
32         return runWithRetries(RedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
33     }
34 
35     T run(string[] keys...) { // int keyCount, 
36         if (keys is null || keys.length == 0) {
37             throw new RedisClusterOperationException("No way to dispatch this command to Redis Cluster.");
38         }
39         int keyCount = cast(int)keys.length;
40 
41         // For multiple keys, only execute if they all share the same connection slot.
42         int slot = RedisClusterCRC16.getSlot(keys[0]);
43         if (keys.length > 1) {
44             for (int i = 1; i < keyCount; i++) {
45                 int nextSlot = RedisClusterCRC16.getSlot(keys[i]);
46                 if (slot != nextSlot) {
47                     throw new RedisClusterOperationException("No way to dispatch this command to Redis "
48                             ~ "Cluster because keys have different slots.");
49                 }
50             }
51         }
52 
53         return runWithRetries(slot, this.maxAttempts, false, null);
54     }
55 
56     T runBinary(const(ubyte)[] key) {
57         return runWithRetries(RedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
58     }
59 
60     T runBinary(const(ubyte)[][] keys...) { // int keyCount, 
61         if (keys is null || keys.length == 0) {
62             throw new RedisClusterOperationException("No way to dispatch this command to Redis Cluster.");
63         }
64 
65         int keyCount = cast(int)keys.length;
66 
67         // For multiple keys, only execute if they all share the same connection slot.
68         int slot = RedisClusterCRC16.getSlot(keys[0]);
69         if (keys.length > 1) {
70             for (int i = 1; i < keyCount; i++) {
71                 int nextSlot = RedisClusterCRC16.getSlot(keys[i]);
72                 if (slot != nextSlot) {
73                     throw new RedisClusterOperationException("No way to dispatch this command to Redis "
74                             ~ "Cluster because keys have different slots.");
75                 }
76             }
77         }
78 
79         return runWithRetries(slot, this.maxAttempts, false, null);
80     }
81 
82     T runWithAnyNode() {
83         Redis connection = null;
84         try {
85             connection = connectionHandler.getConnection();
86             return execute(connection);
87         } catch (RedisConnectionException e) {
88             throw e;
89         } finally {
90             releaseConnection(connection);
91         }
92     }
93 
94     private T runWithRetries(int slot, int attempts, bool tryRandomNode, RedisRedirectionException redirect) {
95         if (attempts <= 0) {
96             throw new RedisClusterMaxAttemptsException("No more cluster attempts left.");
97         }
98 
99         Redis connection = null;
100         try {
101 
102             if (redirect !is null) {
103                 connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
104                 RedisAskDataException ex = cast(RedisAskDataException)redirect;
105                 if (ex !is null) {
106                     // TODO: Pipeline asking with the original command to make it[] faster....
107                     connection.asking();
108                 }
109             } else {
110                 if (tryRandomNode) {
111                     connection = connectionHandler.getConnection();
112                 } else {
113                     connection = connectionHandler.getConnectionFromSlot(slot);
114                 }
115             }
116 
117             return execute(connection);
118 
119         } catch (RedisNoReachableClusterNodeException jnrcne) {
120             throw jnrcne;
121         } catch (RedisConnectionException jce) {
122             // release current connection before recursion
123             releaseConnection(connection);
124             connection = null;
125 
126             if (attempts <= 1) {
127                 //We need this because if node is not reachable anymore - we need to finally initiate slots
128                 //renewing, or we can stuck with cluster state without one node in opposite case.
129                 //But now if maxAttempts = [1 or 2] we will do it too often.
130                 //TODO make tracking of successful/unsuccessful operations for node - do renewing only
131                 //if there were no successful responses from this node last few seconds
132                 this.connectionHandler.renewSlotCache();
133             }
134 
135             return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
136         } catch (RedisRedirectionException jre) {
137             // if MOVED redirection occurred,
138             RedisMovedDataException ex = cast(RedisMovedDataException)jre;
139             if (jre !is null) {
140                 // it rebuilds cluster's slot cache recommended by Redis cluster specification
141                 this.connectionHandler.renewSlotCache(connection);
142             }
143 
144             // release current connection before recursion
145             releaseConnection(connection);
146             connection = null;
147 
148             return runWithRetries(slot, attempts - 1, false, jre);
149         } finally {
150             releaseConnection(connection);
151         }
152     }
153 
154     private void releaseConnection(Redis connection) {
155         if (connection !is null) {
156             connection.close();
157         }
158     }
159 
160 }