1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.util.RedisInputStream;
13 
14 import hunt.redis.Exceptions;
15 
16 import hunt.Exceptions;
17 import hunt.logging.ConsoleLogger;
18 import hunt.stream.Common;
19 import hunt.stream.ByteArrayOutputStream;
20 import hunt.stream.FilterInputStream;
21 import hunt.stream.FilterOutputStream;
22 import hunt.util.StringBuilder;
23 
24 import std.algorithm;
25 
26 /**
27  * This class assumes (to some degree) that we are reading a RESP stream. As such it assumes certain
28  * conventions regarding CRLF line termination. It also assumes that if the Protocol layer requires
29  * a byte that if that byte is not there it is a stream error.
30  */
31 class RedisInputStream : FilterInputStream {
32 
33     protected byte[] buf;
34 
35     protected int count, limit;
36 
37     this(InputStream inputStream, int size) {
38         super(inputStream);
39         if (size <= 0) {
40             throw new IllegalArgumentException("Buffer size <= 0");
41         }
42         buf = new byte[size];
43     }
44 
45     this(InputStream inputStream) {
46         this(inputStream, 8192);
47     }
48 
49     byte readByte() {
50         ensureFill();
51         return buf[count++];
52     }
53 
54     string readLine() {
55         StringBuilder sb = new StringBuilder();
56         while (true) {
57             ensureFill();
58 
59             byte b = buf[count++];
60             if (b == '\r') {
61                 ensureFill(); // Must be one more byte
62 
63                 byte c = buf[count++];
64                 if (c == '\n') {
65                     break;
66                 }
67                 sb.append(cast(char) b);
68                 sb.append(cast(char) c);
69             } else {
70                 sb.append(cast(char) b);
71             }
72         }
73 
74         string reply = sb.toString();
75         if (reply.length == 0) {
76             throw new RedisConnectionException("It seems like server has closed the connection.");
77         }
78 
79         return reply;
80     }
81 
82     byte[] readLineBytes() {
83 
84         /*
85          * This operation should only require one fill. In that typical case we optimize allocation and
86          * copy of the byte array. In the edge case where more than one fill is required then we take a
87          * slower path and expand a byte array output stream as is necessary.
88          */
89 
90         ensureFill();
91 
92         int pos = count;
93         byte[] buf = this.buf;
94         while (true) {
95             if (pos == limit) {
96                 return readLineBytesSlowly();
97             }
98 
99             if (buf[pos++] == '\r') {
100                 if (pos == limit) {
101                     return readLineBytesSlowly();
102                 }
103 
104                 if (buf[pos++] == '\n') {
105                     break;
106                 }
107             }
108         }
109 
110         int N = (pos - count) - 2;
111         byte[] line = buf[count .. count+N].dup;
112         // System.arraycopy(buf, count, line, 0, N);
113 
114         count = pos;
115         return line;
116     }
117 
118     /**
119      * Slow path in case a line of bytes cannot be read in one #fill() operation. This is still faster
120      * than creating the StrinbBuilder, string, then encoding as byte[] in Protocol, then decoding
121      * back into a string.
122      */
123     private byte[] readLineBytesSlowly() {
124         ByteArrayOutputStream bout = null;
125         while (true) {
126             ensureFill();
127 
128             byte b = buf[count++];
129             if (b == '\r') {
130                 ensureFill(); // Must be one more byte
131 
132                 byte c = buf[count++];
133                 if (c == '\n') {
134                     break;
135                 }
136 
137                 if (bout is null) {
138                     bout = new ByteArrayOutputStream(16);
139                 }
140 
141                 bout.write(b);
142                 bout.write(c);
143             } else {
144                 if (bout is null) {
145                     bout = new ByteArrayOutputStream(16);
146                 }
147 
148                 bout.write(b);
149             }
150         }
151 
152         return bout is null ? new byte[0] : bout.toByteArray();
153     }
154 
155     int readIntCrLf() {
156         return cast(int) readLongCrLf();
157     }
158 
159     long readLongCrLf() {
160         byte[] buf = this.buf;
161 
162         ensureFill();
163 
164         bool isNeg = buf[count] == '-';
165         if (isNeg) {
166             ++count;
167         }
168 
169         long value = 0;
170         while (true) {
171             ensureFill();
172 
173             int b = buf[count++];
174             if (b == '\r') {
175                 ensureFill();
176 
177                 if (buf[count++] != '\n') {
178                     throw new RedisConnectionException("Unexpected character!");
179                 }
180 
181                 break;
182             } else {
183                 value = value * 10 + b - '0';
184             }
185         }
186 
187         return (isNeg ? -value : value);
188     }
189 
190     override
191     int read(byte[] b, int off, int len) {
192         ensureFill();
193 
194         int length = min(limit - count, len);
195         // System.arraycopy(buf, count, b, off, length);
196         b[off .. off + length] = buf[count .. count+length];
197         count += length;
198         return length;
199     }
200 
201     /**
202      * This methods assumes there are required bytes to be read. If we cannot read anymore bytes an
203      * exception is thrown to quickly ascertain that the stream was smaller than expected.
204      */
205     private void ensureFill() {
206         if (count >= limit) {
207             try {
208                 limit = inputStream.read(buf);
209                 count = 0;
210                 if (limit == -1) {
211                     throw new RedisConnectionException("Unexpected end of stream.");
212                 }
213                 version(HUNT_REDIS_DEBUG_MORE) {
214                     if(limit<=32)
215                         tracef("incoming: %s", cast(string)buf[0..limit]);
216                     else
217                         tracef("incoming(partly): %s", cast(string)buf[0..32]);
218                 }
219             } catch (IOException e) {
220                 throw new RedisConnectionException(e);
221             }
222         }
223     }
224 }