1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.RedisClusterCommand; 13 14 import hunt.redis.Redis; 15 import hunt.redis.RedisClusterConnectionHandler; 16 import hunt.redis.Exceptions; 17 import hunt.redis.util.RedisClusterCRC16; 18 19 abstract class RedisClusterCommand(T) { 20 21 private RedisClusterConnectionHandler connectionHandler; 22 private int maxAttempts; 23 24 this(RedisClusterConnectionHandler connectionHandler, int maxAttempts) { 25 this.connectionHandler = connectionHandler; 26 this.maxAttempts = maxAttempts; 27 } 28 29 abstract T execute(Redis connection); 30 31 T run(string key) { 32 return runWithRetries(RedisClusterCRC16.getSlot(key), this.maxAttempts, false, null); 33 } 34 35 T run(string[] keys...) { // int keyCount, 36 if (keys is null || keys.length == 0) { 37 throw new RedisClusterOperationException("No way to dispatch this command to Redis Cluster."); 38 } 39 int keyCount = cast(int)keys.length; 40 41 // For multiple keys, only execute if they all share the same connection slot. 42 int slot = RedisClusterCRC16.getSlot(keys[0]); 43 if (keys.length > 1) { 44 for (int i = 1; i < keyCount; i++) { 45 int nextSlot = RedisClusterCRC16.getSlot(keys[i]); 46 if (slot != nextSlot) { 47 throw new RedisClusterOperationException("No way to dispatch this command to Redis " 48 ~ "Cluster because keys have different slots."); 49 } 50 } 51 } 52 53 return runWithRetries(slot, this.maxAttempts, false, null); 54 } 55 56 T runBinary(const(ubyte)[] key) { 57 return runWithRetries(RedisClusterCRC16.getSlot(key), this.maxAttempts, false, null); 58 } 59 60 T runBinary(const(ubyte)[][] keys...) { // int keyCount, 61 if (keys is null || keys.length == 0) { 62 throw new RedisClusterOperationException("No way to dispatch this command to Redis Cluster."); 63 } 64 65 int keyCount = cast(int)keys.length; 66 67 // For multiple keys, only execute if they all share the same connection slot. 68 int slot = RedisClusterCRC16.getSlot(keys[0]); 69 if (keys.length > 1) { 70 for (int i = 1; i < keyCount; i++) { 71 int nextSlot = RedisClusterCRC16.getSlot(keys[i]); 72 if (slot != nextSlot) { 73 throw new RedisClusterOperationException("No way to dispatch this command to Redis " 74 ~ "Cluster because keys have different slots."); 75 } 76 } 77 } 78 79 return runWithRetries(slot, this.maxAttempts, false, null); 80 } 81 82 T runWithAnyNode() { 83 Redis connection = null; 84 try { 85 connection = connectionHandler.getConnection(); 86 return execute(connection); 87 } catch (RedisConnectionException e) { 88 throw e; 89 } finally { 90 releaseConnection(connection); 91 } 92 } 93 94 private T runWithRetries(int slot, int attempts, bool tryRandomNode, RedisRedirectionException redirect) { 95 if (attempts <= 0) { 96 throw new RedisClusterMaxAttemptsException("No more cluster attempts left."); 97 } 98 99 Redis connection = null; 100 try { 101 102 if (redirect !is null) { 103 connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode()); 104 RedisAskDataException ex = cast(RedisAskDataException)redirect; 105 if (ex !is null) { 106 // TODO: Pipeline asking with the original command to make it[] faster.... 107 connection.asking(); 108 } 109 } else { 110 if (tryRandomNode) { 111 connection = connectionHandler.getConnection(); 112 } else { 113 connection = connectionHandler.getConnectionFromSlot(slot); 114 } 115 } 116 117 return execute(connection); 118 119 } catch (RedisNoReachableClusterNodeException jnrcne) { 120 throw jnrcne; 121 } catch (RedisConnectionException jce) { 122 // release current connection before recursion 123 releaseConnection(connection); 124 connection = null; 125 126 if (attempts <= 1) { 127 //We need this because if node is not reachable anymore - we need to finally initiate slots 128 //renewing, or we can stuck with cluster state without one node in opposite case. 129 //But now if maxAttempts = [1 or 2] we will do it too often. 130 //TODO make tracking of successful/unsuccessful operations for node - do renewing only 131 //if there were no successful responses from this node last few seconds 132 this.connectionHandler.renewSlotCache(); 133 } 134 135 return runWithRetries(slot, attempts - 1, tryRandomNode, redirect); 136 } catch (RedisRedirectionException jre) { 137 // if MOVED redirection occurred, 138 RedisMovedDataException ex = cast(RedisMovedDataException)jre; 139 if (jre !is null) { 140 // it rebuilds cluster's slot cache recommended by Redis cluster specification 141 this.connectionHandler.renewSlotCache(connection); 142 } 143 144 // release current connection before recursion 145 releaseConnection(connection); 146 connection = null; 147 148 return runWithRetries(slot, attempts - 1, false, jre); 149 } finally { 150 releaseConnection(connection); 151 } 152 } 153 154 private void releaseConnection(Redis connection) { 155 if (connection !is null) { 156 connection.close(); 157 } 158 } 159 160 }