1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.AbstractClient; 13 14 // import hunt.redis.commands.Command; 15 import hunt.redis.BuilderFactory; 16 import hunt.redis.Exceptions; 17 import hunt.redis.Protocol; 18 import hunt.redis.util.IOUtils; 19 import hunt.redis.util.RedisInputStream; 20 import hunt.redis.util.RedisOutputStream; 21 import hunt.redis.util.SafeEncoder; 22 23 import hunt.Byte; 24 import hunt.collection.ArrayList; 25 import hunt.collection.List; 26 import hunt.Exceptions; 27 import hunt.logging.ConsoleLogger; 28 import hunt.Long; 29 import hunt.String; 30 import hunt.util.Common; 31 32 // import javax.net.ssl.HostnameVerifier; 33 // import javax.net.ssl.SSLParameters; 34 // import javax.net.ssl.SSLSocket; 35 // import javax.net.ssl.SSLSocketFactory; 36 37 import hunt.net; 38 39 import hunt.io.TcpStream; 40 import hunt.stream.Common; 41 import hunt.stream.TcpInputStream; 42 import hunt.stream.TcpOutputStream; 43 44 import core.sync.condition; 45 import core.sync.mutex; 46 import core.time; 47 48 import std.array; 49 import std.format; 50 import std.socket; 51 52 53 alias Protocol = hunt.redis.Protocol.Protocol; 54 alias Command = Protocol.Command; 55 alias ConstUBytes = const(ubyte)[]; 56 57 58 /** 59 * 60 */ 61 class AbstractClient : Closeable { 62 private NetClient _client; 63 private Mutex _doneLocker; 64 private Condition _doneCondition; 65 66 private enum const(ubyte)[][] EMPTY_ARGS = null; 67 68 private string host = Protocol.DEFAULT_HOST; 69 private int port = Protocol.DEFAULT_PORT; 70 private RedisOutputStream outputStream; 71 private RedisInputStream inputStream; 72 private int connectionTimeout = Protocol.DEFAULT_TIMEOUT; 73 private int soTimeout = Protocol.DEFAULT_TIMEOUT; 74 private bool broken = false; 75 private bool ssl; 76 // private SSLSocketFactory sslSocketFactory; 77 // private SSLParameters sslParameters; 78 // private HostnameVerifier hostnameVerifier; 79 80 this() { 81 initialize(); 82 } 83 84 this(string host) { 85 this.host = host; 86 initialize(); 87 } 88 89 this(string host, int port) { 90 this.host = host; 91 this.port = port; 92 initialize(); 93 } 94 95 this(string host, int port, bool ssl) { 96 this.host = host; 97 this.port = port; 98 this.ssl = ssl; 99 initialize(); 100 } 101 102 103 104 // this(string host, int port, bool ssl, 105 // SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, 106 // HostnameVerifier hostnameVerifier) { 107 // this.host = host; 108 // this.port = port; 109 // this.ssl = ssl; 110 // this.sslSocketFactory = sslSocketFactory; 111 // this.sslParameters = sslParameters; 112 // this.hostnameVerifier = hostnameVerifier; 113 // } 114 115 private void initialize() { 116 _doneLocker = new Mutex(); 117 _doneCondition = new Condition(_doneLocker); 118 } 119 120 int getConnectionTimeout() { 121 return connectionTimeout; 122 } 123 124 void setConnectionTimeout(int connectionTimeout) { 125 this.connectionTimeout = connectionTimeout; 126 } 127 128 int getSoTimeout() { 129 return soTimeout; 130 } 131 132 void setSoTimeout(int soTimeout) { 133 this.soTimeout = soTimeout; 134 } 135 136 string getHost() { 137 return host; 138 } 139 140 void setHost(string host) { 141 this.host = host; 142 } 143 144 int getPort() { 145 return port; 146 } 147 148 void setPort(int port) { 149 this.port = port; 150 } 151 152 void connect() { 153 if(isConnected()) 154 return; 155 156 157 if(soTimeout <= 0) { 158 soTimeout = Protocol.DEFAULT_TIMEOUT; 159 } 160 161 Duration idleTimeout = soTimeout.msecs; 162 NetClientOptions options = new NetClientOptions(); 163 options.setConnectTimeout(connectionTimeout.msecs); 164 options.setIdleTimeout(idleTimeout); 165 166 _client = NetUtil.createNetClient(options); 167 168 _client.setHandler(new class NetConnectionHandler { 169 170 override void connectionOpened(Connection connection) { 171 version (HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress()); 172 _doneLocker.lock(); 173 scope (exit) 174 _doneLocker.unlock(); 175 176 outputStream = new RedisOutputStream(new TcpOutputStream(connection.getStream())); 177 inputStream = new RedisInputStream(new TcpInputStream(connection.getStream(), idleTimeout)); 178 179 _doneCondition.notifyAll(); 180 } 181 182 override void connectionClosed(Connection connection) { 183 version (HUNT_DEBUG) infof("Connection closed: %s", connection.getRemoteAddress()); 184 } 185 186 override DataHandleStatus messageReceived(Connection connection, Object message) { 187 version(HUNT_REDIS_DEBUG) { 188 tracef("message type: %s", typeid(message).name); 189 string str = format("data received: %s", message.toString()); 190 tracef(str); 191 } 192 // if(count< 10) { 193 // connection.encode(new String(str)); 194 // } 195 // count++; 196 197 return DataHandleStatus.Done; 198 } 199 200 override void exceptionCaught(Connection connection, Throwable t) { 201 version (HUNT_DEBUG) warning(t); 202 } 203 204 override void failedOpeningConnection(int sessionId, Throwable t) { 205 version (HUNT_DEBUG) warning(t); 206 _client.close(); 207 } 208 209 override void failedAcceptingConnection(int sessionId, Throwable t) { 210 version (HUNT_DEBUG) warning(t); 211 } 212 }).connect(host, port); 213 214 if(connectionTimeout <= 0) { 215 connectionTimeout = Protocol.DEFAULT_TIMEOUT; 216 } 217 218 _doneLocker.lock(); 219 scope (exit) 220 _doneLocker.unlock(); 221 if(outputStream is null) { 222 version (HUNT_DEBUG) { 223 infof("Waiting for a connection in %s...", msecs(connectionTimeout)); 224 } 225 _doneCondition.wait(connectionTimeout.msecs); 226 } 227 228 if(!isConnected()) { 229 string msg = format("Unable to connect to the server in %s.", 230 connectionTimeout.msecs); 231 debug warning(msg); 232 throw new RedisConnectionException(msg); 233 } 234 } 235 236 override 237 void close() { 238 if (isConnected()) _client.close(); 239 } 240 241 // void disconnect() { 242 // close(); 243 // } 244 245 bool isConnected() { 246 return _client !is null && _client.isConnected(); 247 } 248 249 void setTimeoutInfinite() { 250 if (!isConnected()) { 251 try { 252 connect(); 253 } catch (SocketException ex) { 254 broken = true; 255 throw new RedisConnectionException(ex); 256 } 257 } 258 } 259 260 void rollbackTimeout() { 261 try { 262 // socket.setSoTimeout(soTimeout); 263 // implementationMissing(false); 264 } catch (SocketException ex) { 265 broken = true; 266 throw new RedisConnectionException(ex); 267 } 268 } 269 270 void sendCommand(Command cmd, string[] args...) { 271 const(ubyte)[][] bargs = new const(ubyte)[][args.length]; 272 for (int i = 0; i < args.length; i++) { 273 bargs[i] = SafeEncoder.encode(args[i]); 274 } 275 sendCommand(cmd, bargs); 276 } 277 278 void sendCommand(Command cmd) { 279 sendCommand(cmd, EMPTY_ARGS); 280 } 281 282 void sendCommand(Command cmd, const(ubyte)[][] args...) { 283 try { 284 connect(); 285 if(isConnected()) { 286 Protocol.sendCommand(outputStream, cmd, args); 287 } 288 } catch (RedisConnectionException ex) { 289 if(inputStream is null) { 290 warning("inputStream is null"); 291 } else { 292 /* 293 * When client send request which formed by invalid protocol, Redis send back error message 294 * before close connection. We try to read it to provide reason of failure. 295 */ 296 try { 297 string errorMessage = Protocol.readErrorLineIfPossible(inputStream); 298 if (errorMessage !is null && errorMessage.length > 0) { 299 ex = new RedisConnectionException(errorMessage, ex.next); 300 } 301 } catch (Exception e) { 302 /* 303 * Catch any IOException or RedisConnectionException occurred from InputStream#read and just 304 * ignore. This approach is safe because reading error message is optional and connection 305 * will eventually be closed. 306 */ 307 debug warning(e.msg); 308 version(HUNT_REDIS_DEBUG) warning(e); 309 } 310 } 311 // Any other exceptions related to connection? 312 broken = true; 313 throw ex; 314 } 315 } 316 317 string getStatusCodeReply() { 318 // flush(); 319 // Object obj = readProtocolWithCheckingBroken(); 320 // Bytes bytesObj = cast(Bytes)obj; 321 // if(bytesObj is null) { 322 // warning("The obj is not a Bytes."); 323 // throw new NullPointerException(); 324 // } 325 326 // byte[] resp = bytesObj.value(); 327 // if (resp.empty()) { 328 // return null; 329 // } else { 330 // return SafeEncoder.encode(cast(const(ubyte)[])resp); 331 // } 332 return getBulkReply(); 333 } 334 335 string getBulkReply() { 336 flush(); 337 338 Object obj = readProtocolWithCheckingBroken(); 339 Bytes bytesObj = cast(Bytes)obj; 340 if(bytesObj is null) { 341 warning("The obj is not a Bytes."); 342 throw new NullPointerException(); 343 } 344 345 byte[] resp = bytesObj.value(); 346 string r = cast(string)resp; 347 version(HUNT_REDIS_DEBUG_MORE) { 348 tracef("reply: %s", r); 349 } 350 351 return r; 352 // if (resp.empty()) { 353 // return null; 354 // } else { 355 // return SafeEncoder.encode(cast(const(ubyte)[])resp); 356 // } 357 } 358 359 const(ubyte)[] getBinaryBulkReply() { 360 flush(); 361 362 Object obj = readProtocolWithCheckingBroken(); 363 Bytes bytesObj = cast(Bytes)obj; 364 if(bytesObj is null) { 365 warning("The obj is not a String."); 366 throw new NullPointerException(); 367 } 368 369 return cast(const(ubyte)[])bytesObj.value(); 370 } 371 372 Long getIntegerReply() { 373 flush(); 374 Object obj = readProtocolWithCheckingBroken(); 375 if(obj is null) { 376 warning("No value"); 377 return null; 378 } else { 379 import hunt.Number; 380 Long v = cast(Long)obj; 381 if(v is null) { 382 Number number = cast(Number)obj; 383 Bytes bytes = cast(Bytes)obj; 384 if(number !is null) { 385 v = new Long(number.longValue()); 386 return v; 387 } else if(bytes !is null) { 388 warningf("%(%02X %)", bytes.value()); 389 // v = new Long(number.longValue()); 390 } 391 392 warningf("Not a number: %s", typeid(obj)); 393 return null; 394 } 395 396 return v; 397 } 398 } 399 400 List!(string) getMultiBulkReply() { 401 flush(); 402 403 return BuilderFactory.STRING_LIST.build(readProtocolWithCheckingBroken()); 404 } 405 406 407 List!(const(ubyte)[]) getBinaryMultiBulkReply() { 408 flush(); 409 410 return BuilderFactory.BYTE_ARRAY_LIST.build(readProtocolWithCheckingBroken()); 411 412 // return cast(List!(const(ubyte)[])) readProtocolWithCheckingBroken(); 413 // List!Object lst = cast(List!Object)readProtocolWithCheckingBroken(); 414 // if(lst is null) { 415 // version(HUNT_DEBUG) warning("lst is null"); 416 // return null; 417 // } else { 418 419 // } 420 } 421 422 // deprecated("") 423 // List!(Object) getRawObjectMultiBulkReply() { 424 // return getUnflushedObjectMultiBulkReply(); 425 // } 426 427 428 List!(Object) getUnflushedObjectMultiBulkReply() { 429 return cast(List!(Object)) readProtocolWithCheckingBroken(); 430 } 431 432 List!(Object) getObjectMultiBulkReply() { 433 flush(); 434 return getUnflushedObjectMultiBulkReply(); 435 } 436 437 438 List!(long) getIntegerMultiBulkReply() { 439 flush(); 440 List!(Long) items = cast(List!(Long)) readProtocolWithCheckingBroken(); 441 442 List!(long) r = new ArrayList!long(); 443 foreach(Long v; items) { 444 r.add(v.value()); 445 } 446 447 return r; 448 } 449 450 Object getOne() { 451 flush(); 452 return readProtocolWithCheckingBroken(); 453 } 454 455 bool isBroken() { 456 return broken; 457 } 458 459 void flush() { 460 try { 461 outputStream.flush(); 462 } catch (IOException ex) { 463 broken = true; 464 throw new RedisConnectionException(ex); 465 } 466 } 467 468 protected Object readProtocolWithCheckingBroken() { 469 if (broken) { 470 throw new RedisConnectionException("Attempting to read from a broken connection"); 471 } 472 473 try { 474 Object obj = Protocol.read(inputStream); 475 // version(HUNT_DEBUG) trace(typeid(obj)); 476 return obj; 477 } catch (RedisConnectionException exc) { 478 broken = true; 479 throw exc; 480 } 481 } 482 483 List!(Object) getMany(int count) { 484 flush(); 485 List!(Object) responses = new ArrayList!(Object)(count); 486 for (int i = 0; i < count; i++) { 487 try { 488 responses.add(readProtocolWithCheckingBroken()); 489 } catch (RedisDataException e) { 490 responses.add(e); 491 } 492 } 493 return responses; 494 } 495 }