1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.util.RedisInputStream; 13 14 import hunt.redis.Exceptions; 15 16 import hunt.Exceptions; 17 import hunt.logging.ConsoleLogger; 18 import hunt.stream.Common; 19 import hunt.stream.ByteArrayOutputStream; 20 import hunt.stream.FilterInputStream; 21 import hunt.stream.FilterOutputStream; 22 import hunt.util.StringBuilder; 23 24 import std.algorithm; 25 26 /** 27 * This class assumes (to some degree) that we are reading a RESP stream. As such it assumes certain 28 * conventions regarding CRLF line termination. It also assumes that if the Protocol layer requires 29 * a byte that if that byte is not there it is a stream error. 30 */ 31 class RedisInputStream : FilterInputStream { 32 33 protected byte[] buf; 34 35 protected int count, limit; 36 37 this(InputStream inputStream, int size) { 38 super(inputStream); 39 if (size <= 0) { 40 throw new IllegalArgumentException("Buffer size <= 0"); 41 } 42 buf = new byte[size]; 43 } 44 45 this(InputStream inputStream) { 46 this(inputStream, 8192); 47 } 48 49 byte readByte() { 50 ensureFill(); 51 return buf[count++]; 52 } 53 54 string readLine() { 55 StringBuilder sb = new StringBuilder(); 56 while (true) { 57 ensureFill(); 58 59 byte b = buf[count++]; 60 if (b == '\r') { 61 ensureFill(); // Must be one more byte 62 63 byte c = buf[count++]; 64 if (c == '\n') { 65 break; 66 } 67 sb.append(cast(char) b); 68 sb.append(cast(char) c); 69 } else { 70 sb.append(cast(char) b); 71 } 72 } 73 74 string reply = sb.toString(); 75 if (reply.length == 0) { 76 throw new RedisConnectionException("It seems like server has closed the connection."); 77 } 78 79 return reply; 80 } 81 82 byte[] readLineBytes() { 83 84 /* 85 * This operation should only require one fill. In that typical case we optimize allocation and 86 * copy of the byte array. In the edge case where more than one fill is required then we take a 87 * slower path and expand a byte array output stream as is necessary. 88 */ 89 90 ensureFill(); 91 92 int pos = count; 93 byte[] buf = this.buf; 94 while (true) { 95 if (pos == limit) { 96 return readLineBytesSlowly(); 97 } 98 99 if (buf[pos++] == '\r') { 100 if (pos == limit) { 101 return readLineBytesSlowly(); 102 } 103 104 if (buf[pos++] == '\n') { 105 break; 106 } 107 } 108 } 109 110 int N = (pos - count) - 2; 111 byte[] line = buf[count .. count+N].dup; 112 // System.arraycopy(buf, count, line, 0, N); 113 114 count = pos; 115 return line; 116 } 117 118 /** 119 * Slow path in case a line of bytes cannot be read in one #fill() operation. This is still faster 120 * than creating the StrinbBuilder, string, then encoding as byte[] in Protocol, then decoding 121 * back into a string. 122 */ 123 private byte[] readLineBytesSlowly() { 124 ByteArrayOutputStream bout = null; 125 while (true) { 126 ensureFill(); 127 128 byte b = buf[count++]; 129 if (b == '\r') { 130 ensureFill(); // Must be one more byte 131 132 byte c = buf[count++]; 133 if (c == '\n') { 134 break; 135 } 136 137 if (bout is null) { 138 bout = new ByteArrayOutputStream(16); 139 } 140 141 bout.write(b); 142 bout.write(c); 143 } else { 144 if (bout is null) { 145 bout = new ByteArrayOutputStream(16); 146 } 147 148 bout.write(b); 149 } 150 } 151 152 return bout is null ? new byte[0] : bout.toByteArray(); 153 } 154 155 int readIntCrLf() { 156 return cast(int) readLongCrLf(); 157 } 158 159 long readLongCrLf() { 160 byte[] buf = this.buf; 161 162 ensureFill(); 163 164 bool isNeg = buf[count] == '-'; 165 if (isNeg) { 166 ++count; 167 } 168 169 long value = 0; 170 while (true) { 171 ensureFill(); 172 173 int b = buf[count++]; 174 if (b == '\r') { 175 ensureFill(); 176 177 if (buf[count++] != '\n') { 178 throw new RedisConnectionException("Unexpected character!"); 179 } 180 181 break; 182 } else { 183 value = value * 10 + b - '0'; 184 } 185 } 186 187 return (isNeg ? -value : value); 188 } 189 190 override 191 int read(byte[] b, int off, int len) { 192 ensureFill(); 193 194 int length = min(limit - count, len); 195 // System.arraycopy(buf, count, b, off, length); 196 b[off .. off + length] = buf[count .. count+length]; 197 count += length; 198 return length; 199 } 200 201 /** 202 * This methods assumes there are required bytes to be read. If we cannot read anymore bytes an 203 * exception is thrown to quickly ascertain that the stream was smaller than expected. 204 */ 205 private void ensureFill() { 206 if (count >= limit) { 207 try { 208 limit = inputStream.read(buf); 209 count = 0; 210 if (limit == -1) { 211 throw new RedisConnectionException("Unexpected end of stream."); 212 } 213 version(HUNT_REDIS_DEBUG_MORE) { 214 if(limit<=32) 215 tracef("incoming: %s", cast(string)buf[0..limit]); 216 else 217 tracef("incoming(partly): %s", cast(string)buf[0..32]); 218 } 219 } catch (IOException e) { 220 throw new RedisConnectionException(e); 221 } 222 } 223 } 224 }