1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.Protocol; 13 14 // import hunt.redis.commands.Command; 15 import hunt.redis.Exceptions; 16 import hunt.redis.HostAndPort; 17 import hunt.redis.util.RedisInputStream; 18 import hunt.redis.util.RedisOutputStream; 19 import hunt.redis.util.SafeEncoder; 20 21 import hunt.Byte; 22 import hunt.Exceptions; 23 import hunt.collection.ArrayList; 24 import hunt.collection.List; 25 import hunt.Long; 26 import hunt.String; 27 import hunt.text.StringUtils; 28 29 import std.conv; 30 import std.format; 31 import std.string; 32 33 34 /** 35 */ 36 final class Protocol { 37 38 private enum string ASK_PREFIX = "ASK "; 39 private enum string MOVED_PREFIX = "MOVED "; 40 private enum string CLUSTERDOWN_PREFIX = "CLUSTERDOWN "; 41 private enum string BUSY_PREFIX = "BUSY "; 42 private enum string NOSCRIPT_PREFIX = "NOSCRIPT "; 43 44 enum string DEFAULT_HOST = "localhost"; 45 enum int DEFAULT_PORT = 6379; 46 enum int DEFAULT_SENTINEL_PORT = 26379; 47 enum int DEFAULT_TIMEOUT = 2000; 48 enum int DEFAULT_MAX_ATTEMPTS = 5; 49 enum int DEFAULT_DATABASE = 0; 50 51 enum string CHARSET = "UTF-8"; 52 53 enum byte DOLLAR_BYTE = '$'; 54 enum byte ASTERISK_BYTE = '*'; 55 enum byte PLUS_BYTE = '+'; 56 enum byte MINUS_BYTE = '-'; 57 enum byte COLON_BYTE = ':'; 58 59 enum string SENTINEL_MASTERS = "masters"; 60 enum string SENTINEL_GET_MASTER_ADDR_BY_NAME = "get-master-addr-by-name"; 61 enum string SENTINEL_RESET = "reset"; 62 enum string SENTINEL_SLAVES = "slaves"; 63 enum string SENTINEL_FAILOVER = "failover"; 64 enum string SENTINEL_MONITOR = "monitor"; 65 enum string SENTINEL_REMOVE = "remove"; 66 enum string SENTINEL_SET = "set"; 67 68 enum string CLUSTER_NODES = "nodes"; 69 enum string CLUSTER_MEET = "meet"; 70 enum string CLUSTER_RESET = "reset"; 71 enum string CLUSTER_ADDSLOTS = "addslots"; 72 enum string CLUSTER_DELSLOTS = "delslots"; 73 enum string CLUSTER_INFO = "info"; 74 enum string CLUSTER_GETKEYSINSLOT = "getkeysinslot"; 75 enum string CLUSTER_SETSLOT = "setslot"; 76 enum string CLUSTER_SETSLOT_NODE = "node"; 77 enum string CLUSTER_SETSLOT_MIGRATING = "migrating"; 78 enum string CLUSTER_SETSLOT_IMPORTING = "importing"; 79 enum string CLUSTER_SETSLOT_STABLE = "stable"; 80 enum string CLUSTER_FORGET = "forget"; 81 enum string CLUSTER_FLUSHSLOT = "flushslots"; 82 enum string CLUSTER_KEYSLOT = "keyslot"; 83 enum string CLUSTER_COUNTKEYINSLOT = "countkeysinslot"; 84 enum string CLUSTER_SAVECONFIG = "saveconfig"; 85 enum string CLUSTER_REPLICATE = "replicate"; 86 enum string CLUSTER_SLAVES = "slaves"; 87 enum string CLUSTER_FAILOVER = "failover"; 88 enum string CLUSTER_SLOTS = "slots"; 89 enum string PUBSUB_CHANNELS = "channels"; 90 enum string PUBSUB_NUMSUB = "numsub"; 91 enum string PUBSUB_NUM_PAT = "numpat"; 92 93 enum const(ubyte)[] BYTES_TRUE = cast(const(ubyte)[])"1"; 94 enum const(ubyte)[] BYTES_FALSE = cast(const(ubyte)[])"0"; 95 enum const(ubyte)[] BYTES_TILDE = cast(const(ubyte)[])"~"; 96 97 enum const(ubyte)[] POSITIVE_INFINITY_BYTES = cast(const(ubyte)[])"+inf"; 98 enum const(ubyte)[] NEGATIVE_INFINITY_BYTES = cast(const(ubyte)[])"-inf"; 99 100 private this() { 101 // this prevent the class from instantiation 102 } 103 104 static void sendCommand(RedisOutputStream os, Command command, 105 const(ubyte)[][] args...) { 106 sendCommand(os, command.getRaw(), args); 107 } 108 109 private static void sendCommand(RedisOutputStream os, const(ubyte)[] command, 110 const(ubyte)[][] args...) { 111 try { 112 os.write(ASTERISK_BYTE); 113 os.writeIntCrLf(cast(int)args.length + 1); 114 os.write(DOLLAR_BYTE); 115 os.writeIntCrLf(cast(int)command.length); 116 os.write(cast(byte[])command); 117 os.writeCrLf(); 118 119 foreach (const(ubyte)[] arg ; args) { 120 os.write(DOLLAR_BYTE); 121 os.writeIntCrLf(cast(int)arg.length); 122 os.write(cast(byte[])arg); 123 os.writeCrLf(); 124 } 125 } catch (IOException e) { 126 throw new RedisConnectionException(e); 127 } 128 } 129 130 private static void processError(RedisInputStream inputStream) { 131 string message = inputStream.readLine(); 132 // TODO: I'm not sure if this is the best way to do this. 133 // Maybe Read only first 5 bytes instead? 134 if (message.startsWith(MOVED_PREFIX)) { 135 string[] movedInfo = parseTargetHostAndSlot(message); 136 throw new RedisMovedDataException(message, new HostAndPort(movedInfo[1], 137 to!int(movedInfo[2])), to!int(movedInfo[0])); 138 } else if (message.startsWith(ASK_PREFIX)) { 139 string[] askInfo = parseTargetHostAndSlot(message); 140 throw new RedisAskDataException(message, new HostAndPort(askInfo[1], 141 to!int(askInfo[2])), to!int(askInfo[0])); 142 } else if (message.startsWith(CLUSTERDOWN_PREFIX)) { 143 throw new RedisClusterException(message); 144 } else if (message.startsWith(BUSY_PREFIX)) { 145 throw new RedisBusyException(message); 146 } else if (message.startsWith(NOSCRIPT_PREFIX) ) { 147 throw new RedisNoScriptException(message); 148 } 149 throw new RedisDataException(message); 150 } 151 152 static string readErrorLineIfPossible(RedisInputStream inputStream) { 153 byte b = inputStream.readByte(); 154 // if buffer contains other type of response, just ignore. 155 if (b != MINUS_BYTE) { 156 return null; 157 } 158 return inputStream.readLine(); 159 } 160 161 private static string[] parseTargetHostAndSlot(string clusterRedirectResponse) { 162 string[] response = new string[3]; 163 string[] messageInfo = clusterRedirectResponse.split(" "); 164 string[] targetHostAndPort = HostAndPort.extractParts(messageInfo[2]); 165 response[0] = messageInfo[1]; 166 response[1] = targetHostAndPort[0]; 167 response[2] = targetHostAndPort[1]; 168 return response; 169 } 170 171 private static Object process(RedisInputStream inputStream) { 172 byte b = inputStream.readByte(); 173 switch(b) { 174 case PLUS_BYTE: 175 return new Bytes(processStatusCodeReply(inputStream)); 176 case DOLLAR_BYTE: 177 return new Bytes(processBulkReply(inputStream)); 178 case ASTERISK_BYTE: 179 return cast(Object)processMultiBulkReply(inputStream); 180 case COLON_BYTE: 181 return new Long(processInteger(inputStream)); 182 case MINUS_BYTE: 183 processError(inputStream); 184 return null; 185 default: 186 throw new RedisConnectionException(format("Unknown reply: %s", cast(char) b)); 187 } 188 } 189 190 private static byte[] processStatusCodeReply(RedisInputStream inputStream) { 191 return inputStream.readLineBytes(); 192 } 193 194 private static byte[] processBulkReply(RedisInputStream inputStream) { 195 int len = inputStream.readIntCrLf(); 196 if (len == -1) { 197 return null; 198 } 199 200 byte[] read = new byte[len]; 201 int offset = 0; 202 while (offset < len) { 203 int size = inputStream.read(read, offset, (len - offset)); 204 if (size == -1) throw new RedisConnectionException( 205 "It seems like server has closed the connection."); 206 offset += size; 207 } 208 209 // read 2 more bytes for the command delimiter 210 inputStream.readByte(); 211 inputStream.readByte(); 212 213 return read; 214 } 215 216 private static long processInteger(RedisInputStream inputStream) { 217 return inputStream.readLongCrLf(); 218 } 219 220 private static List!(Object) processMultiBulkReply(RedisInputStream inputStream) { 221 int num = inputStream.readIntCrLf(); 222 if (num == -1) { 223 return null; 224 } 225 List!(Object) ret = new ArrayList!(Object)(num); 226 for (int i = 0; i < num; i++) { 227 try { 228 ret.add(process(inputStream)); 229 } catch (RedisDataException e) { 230 ret.add(e); 231 } 232 } 233 return ret; 234 } 235 236 static Object read(RedisInputStream inputStream) { 237 return process(inputStream); 238 } 239 240 static const(ubyte)[] toByteArray(bool value) { 241 return value ? BYTES_TRUE : BYTES_FALSE; 242 } 243 244 static const(ubyte)[] toByteArray(int value) { 245 return cast(const(ubyte)[])to!string(value); 246 } 247 248 static const(ubyte)[] toByteArray(long value) { 249 return cast(const(ubyte)[])to!string(value); 250 } 251 252 static const(ubyte)[] toByteArray(double value) { 253 if (value == double.infinity) { 254 return POSITIVE_INFINITY_BYTES; 255 } else if (value == -double.infinity) { 256 return NEGATIVE_INFINITY_BYTES; 257 } else { 258 string v = format("%f", value); 259 260 return cast(const(ubyte)[])trimZero(v); 261 } 262 } 263 264 private static string trimZero(string value) { 265 int pointPos = -1; 266 bool canTrim = true; 267 268 for(size_t i=0; i< value.length; i++) { 269 if(value[i] == '.') { 270 pointPos = cast(int)i; 271 } else if(pointPos >=0 && value[i] != '0') { 272 canTrim = false; 273 break; 274 } 275 } 276 277 if(canTrim && pointPos>0) { 278 return value[0..pointPos]; 279 } else { 280 return value; 281 } 282 } 283 // dfmt off 284 static enum Command { 285 PING, SET, GET, QUIT, EXISTS, DEL, UNLINK, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, 286 RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, 287 MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, 288 HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, 289 LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, 290 SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, 291 ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, 292 BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, 293 ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, 294 ZLEXCOUNT, ZRANGEBYLEX, ZREVRANGEBYLEX, ZREMRANGEBYLEX, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, 295 SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, 296 DEBUG, BRPOPLPUSH, SETBIT, GETBIT, BITPOS, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, 297 OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, 298 PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, 299 PFADD, PFCOUNT, PFMERGE, READONLY, GEOADD, GEODIST, GEOHASH, GEOPOS, GEORADIUS, GEORADIUS_RO, 300 GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, MODULE, BITFIELD, HSTRLEN, TOUCH, SWAPDB, MEMORY, 301 XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM 302 } 303 304 static enum Keyword { 305 AGGREGATE, ALPHA, ASC, BY, DESC, GET, LIMIT, MESSAGE, NO, NOSORT, PMESSAGE, PSUBSCRIBE, 306 PUNSUBSCRIBE, OK, ONE, QUEUED, SET, STORE, SUBSCRIBE, UNSUBSCRIBE, WEIGHTS, WITHSCORES, 307 RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, KILL, LEN, REFCOUNT, ENCODING, IDLETIME, 308 GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR, 309 BLOCK, NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, 310 IDLE, TIME, RETRYCOUNT, FORCE 311 } 312 313 // dfmt on 314 } 315 316 317 alias ProtocolCommand = Protocol.Command; 318 alias ProtocolKeyword = Protocol.Keyword; 319 320 321 const(ubyte)[] getRaw(ProtocolKeyword k) { 322 return cast(const(ubyte)[])k.to!string(); 323 } 324 325 const(ubyte)[] getRaw(ProtocolCommand c) { 326 return cast(const(ubyte)[])c.to!string(); 327 } 328