1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.Protocol;
13 
14 // import hunt.redis.commands.Command;
15 import hunt.redis.Exceptions;
16 import hunt.redis.HostAndPort;
17 import hunt.redis.util.RedisInputStream;
18 import hunt.redis.util.RedisOutputStream;
19 import hunt.redis.util.SafeEncoder;
20 
21 import hunt.Byte;
22 import hunt.Exceptions;
23 import hunt.collection.ArrayList;
24 import hunt.collection.List;
25 import hunt.Long;
26 import hunt.String;
27 import hunt.text.StringUtils;
28 
29 import std.conv;
30 import std.format;
31 import std.string;
32 
33 
34 /**
35 */
36 final class Protocol {
37 
38     private enum string ASK_PREFIX = "ASK ";
39     private enum string MOVED_PREFIX = "MOVED ";
40     private enum string CLUSTERDOWN_PREFIX = "CLUSTERDOWN ";
41     private enum string BUSY_PREFIX = "BUSY ";
42     private enum string NOSCRIPT_PREFIX = "NOSCRIPT ";
43 
44     enum string DEFAULT_HOST = "localhost";
45     enum int DEFAULT_PORT = 6379;
46     enum int DEFAULT_SENTINEL_PORT = 26379;
47     enum int DEFAULT_TIMEOUT = 2000;
48     enum int DEFAULT_MAX_ATTEMPTS = 5;
49     enum int DEFAULT_DATABASE = 0;
50 
51     enum string CHARSET = "UTF-8";
52 
53     enum byte DOLLAR_BYTE = '$';
54     enum byte ASTERISK_BYTE = '*';
55     enum byte PLUS_BYTE = '+';
56     enum byte MINUS_BYTE = '-';
57     enum byte COLON_BYTE = ':';
58 
59     enum string SENTINEL_MASTERS = "masters";
60     enum string SENTINEL_GET_MASTER_ADDR_BY_NAME = "get-master-addr-by-name";
61     enum string SENTINEL_RESET = "reset";
62     enum string SENTINEL_SLAVES = "slaves";
63     enum string SENTINEL_FAILOVER = "failover";
64     enum string SENTINEL_MONITOR = "monitor";
65     enum string SENTINEL_REMOVE = "remove";
66     enum string SENTINEL_SET = "set";
67 
68     enum string CLUSTER_NODES = "nodes";
69     enum string CLUSTER_MEET = "meet";
70     enum string CLUSTER_RESET = "reset";
71     enum string CLUSTER_ADDSLOTS = "addslots";
72     enum string CLUSTER_DELSLOTS = "delslots";
73     enum string CLUSTER_INFO = "info";
74     enum string CLUSTER_GETKEYSINSLOT = "getkeysinslot";
75     enum string CLUSTER_SETSLOT = "setslot";
76     enum string CLUSTER_SETSLOT_NODE = "node";
77     enum string CLUSTER_SETSLOT_MIGRATING = "migrating";
78     enum string CLUSTER_SETSLOT_IMPORTING = "importing";
79     enum string CLUSTER_SETSLOT_STABLE = "stable";
80     enum string CLUSTER_FORGET = "forget";
81     enum string CLUSTER_FLUSHSLOT = "flushslots";
82     enum string CLUSTER_KEYSLOT = "keyslot";
83     enum string CLUSTER_COUNTKEYINSLOT = "countkeysinslot";
84     enum string CLUSTER_SAVECONFIG = "saveconfig";
85     enum string CLUSTER_REPLICATE = "replicate";
86     enum string CLUSTER_SLAVES = "slaves";
87     enum string CLUSTER_FAILOVER = "failover";
88     enum string CLUSTER_SLOTS = "slots";
89     enum string PUBSUB_CHANNELS = "channels";
90     enum string PUBSUB_NUMSUB = "numsub";
91     enum string PUBSUB_NUM_PAT = "numpat";
92 
93     enum const(ubyte)[] BYTES_TRUE = cast(const(ubyte)[])"1";
94     enum const(ubyte)[] BYTES_FALSE = cast(const(ubyte)[])"0";
95     enum const(ubyte)[] BYTES_TILDE = cast(const(ubyte)[])"~";
96 
97     enum const(ubyte)[] POSITIVE_INFINITY_BYTES = cast(const(ubyte)[])"+inf";
98     enum const(ubyte)[] NEGATIVE_INFINITY_BYTES = cast(const(ubyte)[])"-inf";
99 
100     private this() {
101         // this prevent the class from instantiation
102     }
103 
104     static void sendCommand(RedisOutputStream os, Command command,
105             const(ubyte)[][] args...) {
106         sendCommand(os, command.getRaw(), args);
107     }
108 
109     private static void sendCommand(RedisOutputStream os, const(ubyte)[] command,
110             const(ubyte)[][] args...) {
111         try {
112             os.write(ASTERISK_BYTE);
113             os.writeIntCrLf(cast(int)args.length + 1);
114             os.write(DOLLAR_BYTE);
115             os.writeIntCrLf(cast(int)command.length);
116             os.write(cast(byte[])command);
117             os.writeCrLf();
118 
119             foreach (const(ubyte)[] arg ; args) {
120                 os.write(DOLLAR_BYTE);
121                 os.writeIntCrLf(cast(int)arg.length);
122                 os.write(cast(byte[])arg);
123                 os.writeCrLf();
124             }
125         } catch (IOException e) {
126             throw new RedisConnectionException(e);
127         }
128     }
129 
130     private static void processError(RedisInputStream inputStream) {
131         string message = inputStream.readLine();
132         // TODO: I'm not sure if this is the best way to do this.
133         // Maybe Read only first 5 bytes instead?
134         if (message.startsWith(MOVED_PREFIX)) {
135             string[] movedInfo = parseTargetHostAndSlot(message);
136             throw new RedisMovedDataException(message, new HostAndPort(movedInfo[1],
137                     to!int(movedInfo[2])), to!int(movedInfo[0]));
138         } else if (message.startsWith(ASK_PREFIX)) {
139             string[] askInfo = parseTargetHostAndSlot(message);
140             throw new RedisAskDataException(message, new HostAndPort(askInfo[1],
141                     to!int(askInfo[2])), to!int(askInfo[0]));
142         } else if (message.startsWith(CLUSTERDOWN_PREFIX)) {
143             throw new RedisClusterException(message);
144         } else if (message.startsWith(BUSY_PREFIX)) {
145             throw new RedisBusyException(message);
146         } else if (message.startsWith(NOSCRIPT_PREFIX) ) {
147             throw new RedisNoScriptException(message);
148         }
149         throw new RedisDataException(message);
150     }
151 
152     static string readErrorLineIfPossible(RedisInputStream inputStream) {
153         byte b = inputStream.readByte();
154         // if buffer contains other type of response, just ignore.
155         if (b != MINUS_BYTE) {
156             return null;
157         }
158         return inputStream.readLine();
159     }
160 
161     private static string[] parseTargetHostAndSlot(string clusterRedirectResponse) {
162         string[] response = new string[3];
163         string[] messageInfo = clusterRedirectResponse.split(" ");
164         string[] targetHostAndPort = HostAndPort.extractParts(messageInfo[2]);
165         response[0] = messageInfo[1];
166         response[1] = targetHostAndPort[0];
167         response[2] = targetHostAndPort[1];
168         return response;
169     }
170 
171     private static Object process(RedisInputStream inputStream) {
172         byte b = inputStream.readByte();
173         switch(b) {
174         case PLUS_BYTE:
175             return new Bytes(processStatusCodeReply(inputStream));
176         case DOLLAR_BYTE:
177             return new Bytes(processBulkReply(inputStream));
178         case ASTERISK_BYTE:
179             return cast(Object)processMultiBulkReply(inputStream);
180         case COLON_BYTE:
181             return new Long(processInteger(inputStream));
182         case MINUS_BYTE:
183             processError(inputStream);
184             return null;
185         default:
186             throw new RedisConnectionException(format("Unknown reply: %s", cast(char) b));
187         }
188     }
189 
190     private static byte[] processStatusCodeReply(RedisInputStream inputStream) {
191         return inputStream.readLineBytes();
192     }
193 
194     private static byte[] processBulkReply(RedisInputStream inputStream) {
195         int len = inputStream.readIntCrLf();
196         if (len == -1) {
197             return null;
198         }
199 
200         byte[] read = new byte[len];
201         int offset = 0;
202         while (offset < len) {
203             int size = inputStream.read(read, offset, (len - offset));
204             if (size == -1) throw new RedisConnectionException(
205                     "It seems like server has closed the connection.");
206             offset += size;
207         }
208 
209         // read 2 more bytes for the command delimiter
210         inputStream.readByte();
211         inputStream.readByte();
212 
213         return read;
214     }
215 
216     private static long processInteger(RedisInputStream inputStream) {
217         return inputStream.readLongCrLf();
218     }
219 
220     private static List!(Object) processMultiBulkReply(RedisInputStream inputStream) {
221         int num = inputStream.readIntCrLf();
222         if (num == -1) {
223             return null;
224         }
225         List!(Object) ret = new ArrayList!(Object)(num);
226         for (int i = 0; i < num; i++) {
227             try {
228                 ret.add(process(inputStream));
229             } catch (RedisDataException e) {
230                 ret.add(e);
231             }
232         }
233         return ret;
234     }
235 
236     static Object read(RedisInputStream inputStream) {
237         return process(inputStream);
238     }
239 
240     static const(ubyte)[] toByteArray(bool value) {
241         return value ? BYTES_TRUE : BYTES_FALSE;
242     }
243 
244     static const(ubyte)[] toByteArray(int value) {
245         return cast(const(ubyte)[])to!string(value);
246     }
247 
248     static const(ubyte)[] toByteArray(long value) {
249         return cast(const(ubyte)[])to!string(value);
250     }
251 
252     static const(ubyte)[] toByteArray(double value) {
253         if (value == double.infinity) {
254             return POSITIVE_INFINITY_BYTES;
255         } else if (value == -double.infinity) {
256             return NEGATIVE_INFINITY_BYTES;
257         } else {
258             string v = format("%f", value);
259 
260             return cast(const(ubyte)[])trimZero(v);
261         }
262     } 
263     
264     private static string trimZero(string value) {
265         int pointPos = -1;
266         bool canTrim = true;
267 
268         for(size_t i=0; i< value.length; i++) {
269             if(value[i] == '.') {
270                 pointPos = cast(int)i;
271             } else if(pointPos >=0 && value[i] != '0') {
272                 canTrim = false;
273                 break;
274             }
275         }
276 
277         if(canTrim && pointPos>0) {
278             return value[0..pointPos];
279         } else {
280             return value;
281         }
282     }    
283 // dfmt off
284     static enum Command {
285         PING, SET, GET, QUIT, EXISTS, DEL, UNLINK, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX,
286         RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX,
287         MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET,
288         HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX,
289         LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER,
290         SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY,
291         ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP,
292         BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT,
293         ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE,
294         ZLEXCOUNT, ZRANGEBYLEX, ZREVRANGEBYLEX, ZREMRANGEBYLEX, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE,
295         SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT,
296         DEBUG, BRPOPLPUSH, SETBIT, GETBIT, BITPOS, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG,
297         OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT,
298         PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING,
299         PFADD, PFCOUNT, PFMERGE, READONLY, GEOADD, GEODIST, GEOHASH, GEOPOS, GEORADIUS, GEORADIUS_RO,
300         GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, MODULE, BITFIELD, HSTRLEN, TOUCH, SWAPDB, MEMORY,
301         XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM
302     }
303 
304     static enum Keyword {
305         AGGREGATE, ALPHA, ASC, BY, DESC, GET, LIMIT, MESSAGE, NO, NOSORT, PMESSAGE, PSUBSCRIBE,
306         PUNSUBSCRIBE, OK, ONE, QUEUED, SET, STORE, SUBSCRIBE, UNSUBSCRIBE, WEIGHTS, WITHSCORES,
307         RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, KILL, LEN, REFCOUNT, ENCODING, IDLETIME,
308         GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR, 
309         BLOCK, NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, 
310         IDLE, TIME, RETRYCOUNT, FORCE
311     }
312 
313 // dfmt on    
314 }
315 
316 
317 alias ProtocolCommand = Protocol.Command;
318 alias ProtocolKeyword = Protocol.Keyword;
319 
320 
321 const(ubyte)[] getRaw(ProtocolKeyword k) {
322     return cast(const(ubyte)[])k.to!string();
323 }
324 
325 const(ubyte)[] getRaw(ProtocolCommand c) {
326     return cast(const(ubyte)[])c.to!string();
327 }
328