1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.AbstractClient;
13 
14 // import hunt.redis.commands.Command;
15 import hunt.redis.BuilderFactory;
16 import hunt.redis.Exceptions;
17 import hunt.redis.Protocol;
18 import hunt.redis.util.IOUtils;
19 import hunt.redis.util.RedisInputStream;
20 import hunt.redis.util.RedisOutputStream;
21 import hunt.redis.util.SafeEncoder;
22 
23 import hunt.Byte;
24 import hunt.collection.ArrayList;
25 import hunt.collection.List;
26 import hunt.Exceptions;
27 import hunt.logging.ConsoleLogger;
28 import hunt.Long;
29 import hunt.String;
30 import hunt.util.Common;
31 
32 // import javax.net.ssl.HostnameVerifier;
33 // import javax.net.ssl.SSLParameters;
34 // import javax.net.ssl.SSLSocket;
35 // import javax.net.ssl.SSLSocketFactory;
36 
37 import hunt.net;
38 
39 import hunt.io.TcpStream;
40 import hunt.stream.Common;
41 import hunt.stream.TcpInputStream;
42 import hunt.stream.TcpOutputStream;
43 
44 import core.sync.condition;
45 import core.sync.mutex;
46 import core.time;
47 
48 import std.array;
49 import std.format;
50 import std.socket;
51 
52 
53 alias Protocol = hunt.redis.Protocol.Protocol;
54 alias Command = Protocol.Command;
55 alias ConstUBytes = const(ubyte)[];
56 
57 
58 /**
59  * 
60  */
61 class AbstractClient : Closeable {
62     private NetClient _client;
63     private Mutex _doneLocker;
64     private Condition _doneCondition;
65 
66     private enum const(ubyte)[][] EMPTY_ARGS = null;
67 
68     private string host = Protocol.DEFAULT_HOST;
69     private int port = Protocol.DEFAULT_PORT;
70     private RedisOutputStream outputStream;
71     private RedisInputStream inputStream;
72     private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
73     private int soTimeout = Protocol.DEFAULT_TIMEOUT;
74     private bool broken = false;
75     private bool ssl;
76     // private SSLSocketFactory sslSocketFactory;
77     // private SSLParameters sslParameters;
78     // private HostnameVerifier hostnameVerifier;
79 
80     this() {
81         initialize();
82     }
83 
84     this(string host) {
85         this.host = host;
86         initialize();
87     }
88 
89     this(string host, int port) {
90         this.host = host;
91         this.port = port;
92         initialize();
93     }
94 
95     this(string host, int port, bool ssl) {
96         this.host = host;
97         this.port = port;
98         this.ssl = ssl;
99         initialize();
100     }
101 
102 
103 
104     // this(string host, int port, bool ssl,
105     //     SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
106     //     HostnameVerifier hostnameVerifier) {
107     //   this.host = host;
108     //   this.port = port;
109     //   this.ssl = ssl;
110     //   this.sslSocketFactory = sslSocketFactory;
111     //   this.sslParameters = sslParameters;
112     //   this.hostnameVerifier = hostnameVerifier;
113     // }
114 
115     private void initialize() {
116         _doneLocker = new Mutex();
117         _doneCondition = new Condition(_doneLocker);
118     }
119 
120     int getConnectionTimeout() {
121         return connectionTimeout;
122     }
123     
124     void setConnectionTimeout(int connectionTimeout) {
125         this.connectionTimeout = connectionTimeout;
126     }
127 
128     int getSoTimeout() {
129         return soTimeout;
130     }
131 
132     void setSoTimeout(int soTimeout) {
133         this.soTimeout = soTimeout;
134     }
135 
136     string getHost() {
137         return host;
138     }
139 
140     void setHost(string host) {
141         this.host = host;
142     }
143 
144     int getPort() {
145         return port;
146     }
147 
148     void setPort(int port) {
149         this.port = port;
150     }
151 
152     void connect() {
153         if(isConnected())
154             return;
155         
156 
157         if(soTimeout <= 0) {
158             soTimeout = Protocol.DEFAULT_TIMEOUT;
159         }
160         
161         Duration idleTimeout = soTimeout.msecs;
162         NetClientOptions options = new NetClientOptions();
163         options.setConnectTimeout(connectionTimeout.msecs);
164         options.setIdleTimeout(idleTimeout);
165 
166         _client = NetUtil.createNetClient(options);
167 
168         _client.setHandler(new class NetConnectionHandler {
169 
170             override void connectionOpened(Connection connection) {
171                 version (HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress());
172                 _doneLocker.lock();
173                 scope (exit)
174                     _doneLocker.unlock();
175                 
176                 outputStream = new RedisOutputStream(new TcpOutputStream(connection.getStream()));
177                 inputStream = new RedisInputStream(new TcpInputStream(connection.getStream(), idleTimeout));  
178 
179                 _doneCondition.notifyAll();
180             }
181 
182             override void connectionClosed(Connection connection) {
183                 version (HUNT_DEBUG) infof("Connection closed: %s", connection.getRemoteAddress());
184             }
185 
186             override DataHandleStatus messageReceived(Connection connection, Object message) {
187                 version(HUNT_REDIS_DEBUG) {
188                     tracef("message type: %s", typeid(message).name);
189                     string str = format("data received: %s", message.toString());
190                     tracef(str);
191                 }
192                 // if(count< 10) {
193                 //     connection.encode(new String(str));
194                 // }
195                 // count++;
196 
197                 return DataHandleStatus.Done;
198             }
199 
200             override void exceptionCaught(Connection connection, Throwable t) {
201                 version (HUNT_DEBUG) warning(t);
202             }
203 
204             override void failedOpeningConnection(int sessionId, Throwable t) {
205                 version (HUNT_DEBUG) warning(t);
206                 _client.close(); 
207             }
208 
209             override void failedAcceptingConnection(int sessionId, Throwable t) {
210                 version (HUNT_DEBUG) warning(t);
211             }
212         }).connect(host, port);      
213 
214         if(connectionTimeout <= 0) {
215             connectionTimeout = Protocol.DEFAULT_TIMEOUT;
216         }
217         
218         _doneLocker.lock();
219         scope (exit)
220             _doneLocker.unlock();
221         if(outputStream is null) {
222             version (HUNT_DEBUG) {
223                 infof("Waiting for a connection in %s...", msecs(connectionTimeout));
224             }
225             _doneCondition.wait(connectionTimeout.msecs);
226         }
227 
228         if(!isConnected()) {
229             string msg = format("Unable to connect to the server in %s.", 
230                 connectionTimeout.msecs);
231             debug warning(msg);
232             throw new RedisConnectionException(msg);
233         }
234     }
235 
236     override
237     void close() {
238         if (isConnected()) _client.close();
239     }
240 
241     // void disconnect() {
242     //     close();
243     // }
244 
245     bool isConnected() {
246         return _client !is null && _client.isConnected();
247     }
248 
249     void setTimeoutInfinite() {
250         if (!isConnected()) {
251             try {
252                 connect();
253             } catch (SocketException ex) {
254                 broken = true;
255                 throw new RedisConnectionException(ex);
256             }
257         }
258     }
259 
260     void rollbackTimeout() {
261         try {
262             // socket.setSoTimeout(soTimeout);
263             // implementationMissing(false);
264         } catch (SocketException ex) {
265             broken = true;
266             throw new RedisConnectionException(ex);
267         }
268     }
269 
270     void sendCommand(Command cmd, string[] args...) {
271         const(ubyte)[][] bargs = new const(ubyte)[][args.length];
272         for (int i = 0; i < args.length; i++) {
273             bargs[i] = SafeEncoder.encode(args[i]);
274         }
275         sendCommand(cmd, bargs);
276     }
277 
278     void sendCommand(Command cmd) {
279         sendCommand(cmd, EMPTY_ARGS);
280     }
281 
282     void sendCommand(Command cmd, const(ubyte)[][] args...) {
283         try {
284             connect();
285             if(isConnected()) {
286                 Protocol.sendCommand(outputStream, cmd, args);
287             }
288         } catch (RedisConnectionException ex) {
289             if(inputStream is null) {
290                 warning("inputStream is null");
291             } else {
292                 /*
293                 * When client send request which formed by invalid protocol, Redis send back error message
294                 * before close connection. We try to read it to provide reason of failure.
295                 */
296                 try {
297                     string errorMessage = Protocol.readErrorLineIfPossible(inputStream);
298                     if (errorMessage !is null && errorMessage.length > 0) {
299                         ex = new RedisConnectionException(errorMessage, ex.next);
300                     }
301                 } catch (Exception e) {
302                     /*
303                     * Catch any IOException or RedisConnectionException occurred from InputStream#read and just
304                     * ignore. This approach is safe because reading error message is optional and connection
305                     * will eventually be closed.
306                     */
307                    debug warning(e.msg);
308                    version(HUNT_REDIS_DEBUG) warning(e);
309                 }
310             }
311             // Any other exceptions related to connection?
312             broken = true;
313             throw ex;
314         }
315     }
316 
317     string getStatusCodeReply() {
318         // flush();
319         // Object obj = readProtocolWithCheckingBroken();
320         // Bytes bytesObj = cast(Bytes)obj;
321         // if(bytesObj is null) {
322         //     warning("The obj is not a Bytes.");
323         //     throw new NullPointerException();
324         // }
325 
326         // byte[] resp = bytesObj.value();
327         // if (resp.empty()) {
328         //     return null;
329         // } else {
330         //     return SafeEncoder.encode(cast(const(ubyte)[])resp);
331         // }
332         return getBulkReply();
333     }
334 
335     string getBulkReply() {
336         flush();
337 
338         Object obj = readProtocolWithCheckingBroken();
339         Bytes bytesObj = cast(Bytes)obj;
340         if(bytesObj is null) {
341             warning("The obj is not a Bytes.");
342             throw new NullPointerException();
343         }
344 
345         byte[] resp = bytesObj.value();
346         string r = cast(string)resp;
347         version(HUNT_REDIS_DEBUG_MORE) {
348             tracef("reply: %s", r);
349         }
350         
351         return r;
352         // if (resp.empty()) {
353         //     return null;
354         // } else {
355         //     return SafeEncoder.encode(cast(const(ubyte)[])resp);
356         // }
357     }
358 
359     const(ubyte)[] getBinaryBulkReply() {
360         flush();
361 
362         Object obj = readProtocolWithCheckingBroken();
363         Bytes bytesObj = cast(Bytes)obj;
364         if(bytesObj is null) {
365             warning("The obj is not a String.");
366             throw new NullPointerException();
367         }
368 
369         return cast(const(ubyte)[])bytesObj.value();
370     }
371 
372     Long getIntegerReply() {
373         flush();
374         Object obj = readProtocolWithCheckingBroken();
375         if(obj is null) {
376             warning("No value");
377             return null;
378         } else {
379             import hunt.Number;
380             Long v = cast(Long)obj;
381             if(v is null) {
382                 Number number = cast(Number)obj;
383                 Bytes bytes = cast(Bytes)obj;
384                 if(number !is null) {
385                     v = new Long(number.longValue());
386                     return v;
387                 } else if(bytes !is null) {
388                     warningf("%(%02X %)", bytes.value());
389                     // v = new Long(number.longValue());
390                 }
391 
392                 warningf("Not a number: %s", typeid(obj));
393                 return null;
394             }
395             
396             return v;
397         }
398     }
399 
400     List!(string) getMultiBulkReply() {
401         flush();
402 
403         return BuilderFactory.STRING_LIST.build(readProtocolWithCheckingBroken());
404     }
405     
406 
407     List!(const(ubyte)[]) getBinaryMultiBulkReply() {
408         flush();
409 
410         return BuilderFactory.BYTE_ARRAY_LIST.build(readProtocolWithCheckingBroken());
411 
412         // return cast(List!(const(ubyte)[])) readProtocolWithCheckingBroken();
413         // List!Object lst = cast(List!Object)readProtocolWithCheckingBroken();
414         // if(lst is null) {
415         //     version(HUNT_DEBUG) warning("lst is null");
416         //     return null;
417         // } else {
418 
419         // }
420     }
421 
422     // deprecated("")
423     // List!(Object) getRawObjectMultiBulkReply() {
424     //     return getUnflushedObjectMultiBulkReply();
425     // }
426 
427     
428     List!(Object) getUnflushedObjectMultiBulkReply() {
429         return cast(List!(Object)) readProtocolWithCheckingBroken();
430     }
431 
432     List!(Object) getObjectMultiBulkReply() {
433         flush();
434         return getUnflushedObjectMultiBulkReply();
435     }
436 
437     
438     List!(long) getIntegerMultiBulkReply() {
439         flush();
440         List!(Long) items = cast(List!(Long)) readProtocolWithCheckingBroken();
441         
442         List!(long) r = new ArrayList!long();
443         foreach(Long v; items) {
444             r.add(v.value());
445         }
446 
447         return r;
448     }
449 
450     Object getOne() {
451         flush();
452         return readProtocolWithCheckingBroken();
453     }
454 
455     bool isBroken() {
456         return broken;
457     }
458 
459     void flush() {
460         try {
461             outputStream.flush();
462         } catch (IOException ex) {
463             broken = true;
464             throw new RedisConnectionException(ex);
465         }
466     }
467 
468     protected Object readProtocolWithCheckingBroken() {
469         if (broken) {
470             throw new RedisConnectionException("Attempting to read from a broken connection");
471         }
472 
473         try {
474             Object obj = Protocol.read(inputStream);
475             // version(HUNT_DEBUG) trace(typeid(obj));
476             return obj;
477         } catch (RedisConnectionException exc) {
478             broken = true;
479             throw exc;
480         }
481     }
482 
483     List!(Object) getMany(int count) {
484         flush();
485         List!(Object) responses = new ArrayList!(Object)(count);
486         for (int i = 0; i < count; i++) {
487             try {
488                 responses.add(readProtocolWithCheckingBroken());
489             } catch (RedisDataException e) {
490                 responses.add(e);
491             }
492         }
493         return responses;
494     }
495 }