1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.ShardedRedis;
13 
14 import hunt.redis.BinaryRedis;
15 import hunt.redis.BinaryRedisPubSub;
16 import hunt.redis.BinaryShardedRedis;
17 import hunt.redis.BitOP;
18 import hunt.redis.BitPosParams;
19 import hunt.redis.Client;
20 import hunt.redis.ClusterReset;
21 import hunt.redis.GeoCoordinate;
22 import hunt.redis.GeoRadiusResponse;
23 import hunt.redis.GeoUnit;
24 import hunt.redis.HostAndPort;
25 import hunt.redis.ListPosition;
26 import hunt.redis.Module;
27 import hunt.redis.Pipeline;
28 import hunt.redis.Protocol;
29 import hunt.redis.Redis;
30 import hunt.redis.RedisMonitor;
31 import hunt.redis.RedisPoolAbstract;
32 import hunt.redis.RedisPubSub;
33 import hunt.redis.RedisShardInfo;
34 import hunt.redis.ScanParams;
35 import hunt.redis.ScanResult;
36 import hunt.redis.ShardedRedisPool;
37 import hunt.redis.SortingParams;
38 import hunt.redis.StreamEntry;
39 import hunt.redis.StreamEntryID;
40 import hunt.redis.StreamPendingEntry;
41 import hunt.redis.Transaction;
42 import hunt.redis.Tuple;
43 import hunt.redis.ZParams;
44 
45 import hunt.redis.commands.RedisCommands;
46 import hunt.redis.Protocol;
47 import hunt.redis.params.GeoRadiusParam;
48 import hunt.redis.params.SetParams;
49 import hunt.redis.params.ZAddParams;
50 import hunt.redis.params.ZIncrByParams;
51 import hunt.redis.util.Hashing;
52 
53 import hunt.util.Common;
54 import hunt.collection.List;
55 import hunt.collection.Map;
56 import hunt.collection.Set;
57 
58 import hunt.Boolean;
59 import hunt.Double;
60 import hunt.Long;
61 
62 import std.regex;
63 alias Pattern = Regex!char;
64 
65 class ShardedRedis : BinaryShardedRedis, RedisCommands, Closeable {
66 
67     protected ShardedRedisPool dataSource = null;
68 
69     this(List!(RedisShardInfo) shards) {
70         super(shards);
71     }
72 
73     this(List!(RedisShardInfo) shards, Hashing algo) {
74         super(shards, algo);
75     }
76 
77     this(List!(RedisShardInfo) shards, Pattern keyTagPattern) {
78         super(shards, keyTagPattern);
79     }
80 
81     this(List!(RedisShardInfo) shards, Hashing algo, Pattern keyTagPattern) {
82         super(shards, algo, keyTagPattern);
83     }
84 
85     override
86     string set(string key, string value) {
87         Redis j = getShard(key);
88         return j.set(key, value);
89     }
90 
91     override
92     string set(string key, string value, SetParams params) {
93         Redis j = getShard(key);
94         return j.set(key, value, params);
95     }
96 
97     override
98     string get(string key) {
99         Redis j = getShard(key);
100         return j.get(key);
101     }
102 
103     override
104     string echo(string string) {
105         Redis j = getShard(string);
106         return j.echo(string);
107     }
108 
109     override
110     bool exists(string key) {
111         Redis j = getShard(key);
112         return j.exists(key);
113     }
114 
115     override
116     string type(string key) {
117         Redis j = getShard(key);
118         return j.type(key);
119     }
120 
121     override
122     const(ubyte)[] dump(string key) {
123         Redis j = getShard(key);
124         return j.dump(key);
125     }
126 
127     override
128     string restore(string key, int ttl, const(ubyte)[] serializedValue) {
129         Redis j = getShard(key);
130         return j.restore(key, ttl, serializedValue);
131     }
132 
133     override
134     string restoreReplace(string key, int ttl, const(ubyte)[] serializedValue) {
135         Redis j = getShard(key);
136         return j.restoreReplace(key, ttl, serializedValue);
137     }
138 
139     override
140     Long expire(string key, int seconds) {
141         Redis j = getShard(key);
142         return j.expire(key, seconds);
143     }
144 
145     override
146     Long pexpire(string key, long milliseconds) {
147         Redis j = getShard(key);
148         return j.pexpire(key, milliseconds);
149     }
150 
151     override
152     Long expireAt(string key, long unixTime) {
153         Redis j = getShard(key);
154         return j.expireAt(key, unixTime);
155     }
156 
157     override
158     Long pexpireAt(string key, long millisecondsTimestamp) {
159         Redis j = getShard(key);
160         return j.pexpireAt(key, millisecondsTimestamp);
161     }
162 
163     override
164     Long ttl(string key) {
165         Redis j = getShard(key);
166         return j.ttl(key);
167     }
168 
169     override
170     Long pttl(string key) {
171         Redis j = getShard(key);
172         return j.pttl(key);
173     }
174 
175     override
176     bool setbit(string key, long offset, bool value) {
177         Redis j = getShard(key);
178         return j.setbit(key, offset, value);
179     }
180 
181     override
182     bool setbit(string key, long offset, string value) {
183         Redis j = getShard(key);
184         return j.setbit(key, offset, value);
185     }
186 
187     override
188     bool getbit(string key, long offset) {
189         Redis j = getShard(key);
190         return j.getbit(key, offset);
191     }
192 
193     // override
194     // long setrange(string key, long offset, string value) {
195     //     Redis j = getShard(key);
196     //     return j.setrange(key, offset, value);
197     // }
198 
199     // override
200     // string getrange(string key, long startOffset, long endOffset) {
201     //     Redis j = getShard(key);
202     //     return j.getrange(key, startOffset, endOffset);
203     // }
204 
205     // override
206     // string getSet(string key, string value) {
207     //     Redis j = getShard(key);
208     //     return j.getSet(key, value);
209     // }
210 
211     // override
212     // long setnx(string key, string value) {
213     //     Redis j = getShard(key);
214     //     return j.setnx(key, value);
215     // }
216 
217     // override
218     // string setex(string key, int seconds, string value) {
219     //     Redis j = getShard(key);
220     //     return j.setex(key, seconds, value);
221     // }
222 
223     // override
224     // string psetex(string key, long milliseconds, string value) {
225     //     Redis j = getShard(key);
226     //     return j.psetex(key, milliseconds, value);
227     // }
228 
229     // List!(string) blpop(string arg) {
230     //     Redis j = getShard(arg);
231     //     return j.blpop(arg);
232     // }
233 
234     // override
235     // List!(string) blpop(int timeout, string key) {
236     //     Redis j = getShard(key);
237     //     return j.blpop(timeout, key);
238     // }
239 
240     // List!(string) brpop(string arg) {
241     //     Redis j = getShard(arg);
242     //     return j.brpop(arg);
243     // }
244 
245     // override
246     // List!(string) brpop(int timeout, string key) {
247     //     Redis j = getShard(key);
248     //     return j.brpop(timeout, key);
249     // }
250 
251     // override
252     // long decrBy(string key, long decrement) {
253     //     Redis j = getShard(key);
254     //     return j.decrBy(key, decrement);
255     // }
256 
257     // override
258     // long decr(string key) {
259     //     Redis j = getShard(key);
260     //     return j.decr(key);
261     // }
262 
263     // override
264     // long incrBy(string key, long increment) {
265     //     Redis j = getShard(key);
266     //     return j.incrBy(key, increment);
267     // }
268 
269     // override
270     // Double incrByFloat(string key, double increment) {
271     //     Redis j = getShard(key);
272     //     return j.incrByFloat(key, increment);
273     // }
274 
275     // override
276     // long incr(string key) {
277     //     Redis j = getShard(key);
278     //     return j.incr(key);
279     // }
280 
281     // override
282     // long append(string key, string value) {
283     //     Redis j = getShard(key);
284     //     return j.append(key, value);
285     // }
286 
287     // override
288     // string substr(string key, int start, int end) {
289     //     Redis j = getShard(key);
290     //     return j.substr(key, start, end);
291     // }
292 
293     // override
294     // long hset(string key, string field, string value) {
295     //     Redis j = getShard(key);
296     //     return j.hset(key, field, value);
297     // }
298 
299     // override
300     // long hset(string key, Map!(string, string) hash) {
301     //     Redis j = getShard(key);
302     //     return j.hset(key, hash);
303     // }
304 
305     // override
306     // string hget(string key, string field) {
307     //     Redis j = getShard(key);
308     //     return j.hget(key, field);
309     // }
310 
311     // override
312     // long hsetnx(string key, string field, string value) {
313     //     Redis j = getShard(key);
314     //     return j.hsetnx(key, field, value);
315     // }
316 
317     // override
318     // string hmset(string key, Map!(string, string) hash) {
319     //     Redis j = getShard(key);
320     //     return j.hmset(key, hash);
321     // }
322 
323     // override
324     // List!(string) hmget(string key, string[] fields...) {
325     //     Redis j = getShard(key);
326     //     return j.hmget(key, fields);
327     // }
328 
329     // override
330     // long hincrBy(string key, string field, long value) {
331     //     Redis j = getShard(key);
332     //     return j.hincrBy(key, field, value);
333     // }
334 
335     // override
336     // Double hincrByFloat(string key, string field, double value) {
337     //     Redis j = getShard(key);
338     //     return j.hincrByFloat(key, field, value);
339     // }
340 
341     // override
342     // bool hexists(string key, string field) {
343     //     Redis j = getShard(key);
344     //     return j.hexists(key, field);
345     // }
346 
347     override
348     Long del(string key) {
349         Redis j = getShard(key);
350         return j.del(key);
351     }
352 
353     override
354     Long unlink(string key) {
355         Redis j = getShard(key);
356         return j.unlink(key);
357     }
358 
359     // override
360     // long hdel(string key, string[] fields...) {
361     //     Redis j = getShard(key);
362     //     return j.hdel(key, fields);
363     // }
364 
365     // override
366     // long hlen(string key) {
367     //     Redis j = getShard(key);
368     //     return j.hlen(key);
369     // }
370 
371     // override
372     // Set!(string) hkeys(string key) {
373     //     Redis j = getShard(key);
374     //     return j.hkeys(key);
375     // }
376 
377     // override
378     // List!(string) hvals(string key) {
379     //     Redis j = getShard(key);
380     //     return j.hvals(key);
381     // }
382 
383     // override
384     // Map!(string, string) hgetAll(string key) {
385     //     Redis j = getShard(key);
386     //     return j.hgetAll(key);
387     // }
388 
389     // override
390     // long rpush(string key, string[] strings...) {
391     //     Redis j = getShard(key);
392     //     return j.rpush(key, strings);
393     // }
394 
395     // override
396     // long lpush(string key, string[] strings...) {
397     //     Redis j = getShard(key);
398     //     return j.lpush(key, strings);
399     // }
400 
401     // override
402     // long lpushx(string key, string[] string...) {
403     //     Redis j = getShard(key);
404     //     return j.lpushx(key, string);
405     // }
406 
407     override
408     Long strlen(string key) {
409         Redis j = getShard(key);
410         return j.strlen(key);
411     }
412 
413     override
414     Long move(string key, int dbIndex) {
415         Redis j = getShard(key);
416         return j.move(key, dbIndex);
417     }
418 
419     // override
420     // long rpushx(string key, string[] string...) {
421     //     Redis j = getShard(key);
422     //     return j.rpushx(key, string);
423     // }
424 
425     override
426     Long persist(string key) {
427         Redis j = getShard(key);
428         return j.persist(key);
429     }
430 
431     // override
432     // long llen(string key) {
433     //     Redis j = getShard(key);
434     //     return j.llen(key);
435     // }
436 
437     // override
438     // List!(string) lrange(string key, long start, long stop) {
439     //     Redis j = getShard(key);
440     //     return j.lrange(key, start, stop);
441     // }
442 
443     // override
444     // string ltrim(string key, long start, long stop) {
445     //     Redis j = getShard(key);
446     //     return j.ltrim(key, start, stop);
447     // }
448 
449     // override
450     // string lindex(string key, long index) {
451     //     Redis j = getShard(key);
452     //     return j.lindex(key, index);
453     // }
454 
455     // override
456     // string lset(string key, long index, string value) {
457     //     Redis j = getShard(key);
458     //     return j.lset(key, index, value);
459     // }
460 
461     // override
462     // long lrem(string key, long count, string value) {
463     //     Redis j = getShard(key);
464     //     return j.lrem(key, count, value);
465     // }
466 
467     // override
468     // string lpop(string key) {
469     //     Redis j = getShard(key);
470     //     return j.lpop(key);
471     // }
472 
473     // override
474     // string rpop(string key) {
475     //     Redis j = getShard(key);
476     //     return j.rpop(key);
477     // }
478 
479     // override
480     // long sadd(string key, string[] members...) {
481     //     Redis j = getShard(key);
482     //     return j.sadd(key, members);
483     // }
484 
485     // override
486     // Set!(string) smembers(string key) {
487     //     Redis j = getShard(key);
488     //     return j.smembers(key);
489     // }
490 
491     // override
492     // long srem(string key, string[] members...) {
493     //     Redis j = getShard(key);
494     //     return j.srem(key, members);
495     // }
496 
497     // override
498     // string spop(string key) {
499     //     Redis j = getShard(key);
500     //     return j.spop(key);
501     // }
502 
503     // override
504     // Set!(string) spop(string key, long count) {
505     //     Redis j = getShard(key);
506     //     return j.spop(key, count);
507     // }
508 
509     // override
510     // long scard(string key) {
511     //     Redis j = getShard(key);
512     //     return j.scard(key);
513     // }
514 
515     // override
516     // bool sismember(string key, string member) {
517     //     Redis j = getShard(key);
518     //     return j.sismember(key, member);
519     // }
520 
521     // override
522     // string srandmember(string key) {
523     //     Redis j = getShard(key);
524     //     return j.srandmember(key);
525     // }
526 
527     // override
528     // List!(string) srandmember(string key, int count) {
529     //     Redis j = getShard(key);
530     //     return j.srandmember(key, count);
531     // }
532 
533     // override
534     // long zadd(string key, double score, string member) {
535     //     Redis j = getShard(key);
536     //     return j.zadd(key, score, member);
537     // }
538 
539     // override
540     // long zadd(string key, double score, string member, ZAddParams params) {
541     //     Redis j = getShard(key);
542     //     return j.zadd(key, score, member, params);
543     // }
544 
545     // override
546     // long zadd(string key, Map!(string, Double) scoreMembers) {
547     //     Redis j = getShard(key);
548     //     return j.zadd(key, scoreMembers);
549     // }
550 
551     // override
552     // long zadd(string key, Map!(string, Double) scoreMembers, ZAddParams params) {
553     //     Redis j = getShard(key);
554     //     return j.zadd(key, scoreMembers, params);
555     // }
556 
557     // override
558     // Set!(string) zrange(string key, long start, long stop) {
559     //     Redis j = getShard(key);
560     //     return j.zrange(key, start, stop);
561     // }
562 
563     // override
564     // long zrem(string key, string[] members...) {
565     //     Redis j = getShard(key);
566     //     return j.zrem(key, members);
567     // }
568 
569     // override
570     // Double zincrby(string key, double increment, string member) {
571     //     Redis j = getShard(key);
572     //     return j.zincrby(key, increment, member);
573     // }
574 
575     // override
576     // Double zincrby(string key, double increment, string member, ZIncrByParams params) {
577     //     Redis j = getShard(key);
578     //     return j.zincrby(key, increment, member, params);
579     // }
580 
581     // override
582     // long zrank(string key, string member) {
583     //     Redis j = getShard(key);
584     //     return j.zrank(key, member);
585     // }
586 
587     // override
588     // long zrevrank(string key, string member) {
589     //     Redis j = getShard(key);
590     //     return j.zrevrank(key, member);
591     // }
592 
593     // override
594     // Set!(string) zrevrange(string key, long start, long stop) {
595     //     Redis j = getShard(key);
596     //     return j.zrevrange(key, start, stop);
597     // }
598 
599     // override
600     // Set!(Tuple) zrangeWithScores(string key, long start, long stop) {
601     //     Redis j = getShard(key);
602     //     return j.zrangeWithScores(key, start, stop);
603     // }
604 
605     // override
606     // Set!(Tuple) zrevrangeWithScores(string key, long start, long stop) {
607     //     Redis j = getShard(key);
608     //     return j.zrevrangeWithScores(key, start, stop);
609     // }
610 
611     // override
612     // long zcard(string key) {
613     //     Redis j = getShard(key);
614     //     return j.zcard(key);
615     // }
616 
617     // override
618     // Double zscore(string key, string member) {
619     //     Redis j = getShard(key);
620     //     return j.zscore(key, member);
621     // }
622 
623     // override
624     // List!(string) sort(string key) {
625     //     Redis j = getShard(key);
626     //     return j.sort(key);
627     // }
628 
629     // override
630     // List!(string) sort(string key, SortingParams sortingParameters) {
631     //     Redis j = getShard(key);
632     //     return j.sort(key, sortingParameters);
633     // }
634 
635     // override
636     // long zcount(string key, double min, double max) {
637     //     Redis j = getShard(key);
638     //     return j.zcount(key, min, max);
639     // }
640 
641     // override
642     // long zcount(string key, string min, string max) {
643     //     Redis j = getShard(key);
644     //     return j.zcount(key, min, max);
645     // }
646 
647     // override
648     // Set!(string) zrangeByScore(string key, double min, double max) {
649     //     Redis j = getShard(key);
650     //     return j.zrangeByScore(key, min, max);
651     // }
652 
653     // override
654     // Set!(string) zrevrangeByScore(string key, double max, double min) {
655     //     Redis j = getShard(key);
656     //     return j.zrevrangeByScore(key, max, min);
657     // }
658 
659     // override
660     // Set!(string) zrangeByScore(string key, double min, double max, int offset, int count) {
661     //     Redis j = getShard(key);
662     //     return j.zrangeByScore(key, min, max, offset, count);
663     // }
664 
665     // override
666     // Set!(string) zrevrangeByScore(string key, double max, double min, int offset, int count) {
667     //     Redis j = getShard(key);
668     //     return j.zrevrangeByScore(key, max, min, offset, count);
669     // }
670 
671     // override
672     // Set!(Tuple) zrangeByScoreWithScores(string key, double min, double max) {
673     //     Redis j = getShard(key);
674     //     return j.zrangeByScoreWithScores(key, min, max);
675     // }
676 
677     // override
678     // Set!(Tuple) zrevrangeByScoreWithScores(string key, double max, double min) {
679     //     Redis j = getShard(key);
680     //     return j.zrevrangeByScoreWithScores(key, max, min);
681     // }
682 
683     // override
684     // Set!(Tuple) zrangeByScoreWithScores(string key, double min, double max, int offset,
685     //         int count) {
686     //     Redis j = getShard(key);
687     //     return j.zrangeByScoreWithScores(key, min, max, offset, count);
688     // }
689 
690     // override
691     // Set!(Tuple) zrevrangeByScoreWithScores(string key, double max, double min, int offset,
692     //         int count) {
693     //     Redis j = getShard(key);
694     //     return j.zrevrangeByScoreWithScores(key, max, min, offset, count);
695     // }
696 
697     // override
698     // Set!(string) zrangeByScore(string key, string min, string max) {
699     //     Redis j = getShard(key);
700     //     return j.zrangeByScore(key, min, max);
701     // }
702 
703     // override
704     // Set!(string) zrevrangeByScore(string key, string max, string min) {
705     //     Redis j = getShard(key);
706     //     return j.zrevrangeByScore(key, max, min);
707     // }
708 
709     // override
710     // Set!(string) zrangeByScore(string key, string min, string max, int offset, int count) {
711     //     Redis j = getShard(key);
712     //     return j.zrangeByScore(key, min, max, offset, count);
713     // }
714 
715     // override
716     // Set!(string) zrevrangeByScore(string key, string max, string min, int offset, int count) {
717     //     Redis j = getShard(key);
718     //     return j.zrevrangeByScore(key, max, min, offset, count);
719     // }
720 
721     // override
722     // Set!(Tuple) zrangeByScoreWithScores(string key, string min, string max) {
723     //     Redis j = getShard(key);
724     //     return j.zrangeByScoreWithScores(key, min, max);
725     // }
726 
727     // override
728     // Set!(Tuple) zrevrangeByScoreWithScores(string key, string max, string min) {
729     //     Redis j = getShard(key);
730     //     return j.zrevrangeByScoreWithScores(key, max, min);
731     // }
732 
733     // override
734     // Set!(Tuple) zrangeByScoreWithScores(string key, string min, string max, int offset,
735     //         int count) {
736     //     Redis j = getShard(key);
737     //     return j.zrangeByScoreWithScores(key, min, max, offset, count);
738     // }
739 
740     // override
741     // Set!(Tuple) zrevrangeByScoreWithScores(string key, string max, string min, int offset,
742     //         int count) {
743     //     Redis j = getShard(key);
744     //     return j.zrevrangeByScoreWithScores(key, max, min, offset, count);
745     // }
746 
747     // override
748     // long zremrangeByRank(string key, long start, long stop) {
749     //     Redis j = getShard(key);
750     //     return j.zremrangeByRank(key, start, stop);
751     // }
752 
753     // override
754     // long zremrangeByScore(string key, double min, double max) {
755     //     Redis j = getShard(key);
756     //     return j.zremrangeByScore(key, min, max);
757     // }
758 
759     // override
760     // long zremrangeByScore(string key, string min, string max) {
761     //     Redis j = getShard(key);
762     //     return j.zremrangeByScore(key, min, max);
763     // }
764 
765     // override
766     // long zlexcount(string key, string min, string max) {
767     //     return getShard(key).zlexcount(key, min, max);
768     // }
769 
770     // override
771     // Set!(string) zrangeByLex(string key, string min, string max) {
772     //     return getShard(key).zrangeByLex(key, min, max);
773     // }
774 
775     // override
776     // Set!(string) zrangeByLex(string key, string min, string max,
777     //         int offset, int count) {
778     //     return getShard(key).zrangeByLex(key, min, max, offset, count);
779     // }
780 
781     // override
782     // Set!(string) zrevrangeByLex(string key, string max, string min) {
783     //     return getShard(key).zrevrangeByLex(key, max, min);
784     // }
785 
786     // override
787     // Set!(string) zrevrangeByLex(string key, string max, string min, int offset, int count) {
788     //     return getShard(key).zrevrangeByLex(key, max, min, offset, count);
789     // }
790 
791     // override
792     // long zremrangeByLex(string key, string min, string max) {
793     //     return getShard(key).zremrangeByLex(key, min, max);
794     // }
795 
796     // override
797     // long linsert(string key, ListPosition where, string pivot, string value) {
798     //     Redis j = getShard(key);
799     //     return j.linsert(key, where, pivot, value);
800     // }
801 
802     override
803     Long bitcount(string key) {
804         Redis j = getShard(key);
805         return j.bitcount(key);
806     }
807 
808     override
809     Long bitcount(string key, long start, long end) {
810         Redis j = getShard(key);
811         return j.bitcount(key, start, end);
812     }
813 
814     // override
815     // long bitpos(string key, bool value) {
816     //     Redis j = getShard(key);
817     //     return j.bitpos(key, value);
818     // }
819 
820     // override
821     // long bitpos(string key, bool value, BitPosParams params) {
822     //     Redis j = getShard(key);
823     //     return j.bitpos(key, value, params);
824     // }
825 
826     // override
827     // ScanResult!(MapEntry!(string, string)) hscan(string key, string cursor) {
828     //     Redis j = getShard(key);
829     //     return j.hscan(key, cursor);
830     // }
831 
832     // override
833     // ScanResult!(MapEntry!(string, string)) hscan(string key, string cursor, ScanParams params) {
834     //     Redis j = getShard(key);
835     //     return j.hscan(key, cursor, params);
836     // }
837 
838     // override
839     // ScanResult!(string) sscan(string key, string cursor) {
840     //     Redis j = getShard(key);
841     //     return j.sscan(key, cursor);
842     // }
843 
844     // override
845     // ScanResult!(Tuple) zscan(string key, string cursor) {
846     //     Redis j = getShard(key);
847     //     return j.zscan(key, cursor);
848     // }
849 
850     // override
851     // ScanResult!(Tuple) zscan(string key, string cursor, ScanParams params) {
852     //     Redis j = getShard(key);
853     //     return j.zscan(key, cursor, params);
854     // }
855 
856     // override
857     // ScanResult!(string) sscan(string key, string cursor, ScanParams params) {
858     //     Redis j = getShard(key);
859     //     return j.sscan(key, cursor, params);
860     // }
861 
862     override
863     void close() {
864         if (dataSource !is null) {
865             bool broken = false;
866 
867             foreach(Redis jedis ; getAllShards()) {
868                 if (jedis.getClient().isBroken()) {
869                     broken = true;
870                     break;
871                 }
872             }
873             ShardedRedisPool pool = this.dataSource;
874             this.dataSource = null;
875             if (broken) {
876                 pool.returnBrokenResource(this);
877             } else {
878                 pool.returnResource(this);
879             }
880 
881         } else {
882             disconnect();
883         }
884     }
885 
886     void setDataSource(ShardedRedisPool shardedRedisPool) {
887         this.dataSource = shardedRedisPool;
888     }
889 
890     void resetState() {
891         foreach(Redis jedis ; getAllShards()) {
892             jedis.resetState();
893         }
894     }
895 
896     override
897     Long pfadd(string key, string[] elements...) {
898         Redis j = getShard(key);
899         return j.pfadd(key, elements);
900     }
901 
902     override
903     Long pfcount(string key) {
904         Redis j = getShard(key);
905         return j.pfcount(key);
906     }
907 
908     override
909     Long touch(string key) {
910         Redis j = getShard(key);
911         return j.touch(key);
912     }
913 
914     // override
915     // long geoadd(string key, double longitude, double latitude, string member) {
916     //     Redis j = getShard(key);
917     //     return j.geoadd(key, longitude, latitude, member);
918     // }
919 
920     // override
921     // long geoadd(string key, Map!(string, GeoCoordinate) memberCoordinateMap) {
922     //     Redis j = getShard(key);
923     //     return j.geoadd(key, memberCoordinateMap);
924     // }
925 
926     // override
927     // Double geodist(string key, string member1, string member2) {
928     //     Redis j = getShard(key);
929     //     return j.geodist(key, member1, member2);
930     // }
931 
932     // override
933     // Double geodist(string key, string member1, string member2, GeoUnit unit) {
934     //     Redis j = getShard(key);
935     //     return j.geodist(key, member1, member2, unit);
936     // }
937 
938     // override
939     // List!(string) geohash(string key, string[] members...) {
940     //     Redis j = getShard(key);
941     //     return j.geohash(key, members);
942     // }
943 
944     // override
945     // List!(GeoCoordinate) geopos(string key, string[] members...) {
946     //     Redis j = getShard(key);
947     //     return j.geopos(key, members);
948     // }
949 
950     // override
951     // List!(GeoRadiusResponse) georadius(string key, double longitude, double latitude,
952     //         double radius, GeoUnit unit) {
953     //     Redis j = getShard(key);
954     //     return j.georadius(key, longitude, latitude, radius, unit);
955     // }
956 
957     // override
958     // List!(GeoRadiusResponse) georadiusReadonly(string key, double longitude, double latitude,
959     //         double radius, GeoUnit unit) {
960     //     Redis j = getShard(key);
961     //     return j.georadiusReadonly(key, longitude, latitude, radius, unit);
962     // }
963 
964     // override
965     // List!(GeoRadiusResponse) georadius(string key, double longitude, double latitude,
966     //         double radius, GeoUnit unit, GeoRadiusParam param) {
967     //     Redis j = getShard(key);
968     //     return j.georadius(key, longitude, latitude, radius, unit, param);
969     // }
970 
971     // override
972     // List!(GeoRadiusResponse) georadiusReadonly(string key, double longitude, double latitude,
973     //         double radius, GeoUnit unit, GeoRadiusParam param) {
974     //     Redis j = getShard(key);
975     //     return j.georadiusReadonly(key, longitude, latitude, radius, unit, param);
976     // }
977 
978     // override
979     // List!(GeoRadiusResponse) georadiusByMember(string key, string member, double radius,
980     //         GeoUnit unit) {
981     //     Redis j = getShard(key);
982     //     return j.georadiusByMember(key, member, radius, unit);
983     // }
984 
985     // override
986     // List!(GeoRadiusResponse) georadiusByMemberReadonly(string key, string member, double radius,
987     //         GeoUnit unit) {
988     //     Redis j = getShard(key);
989     //     return j.georadiusByMemberReadonly(key, member, radius, unit);
990     // }
991 
992     // override
993     // List!(GeoRadiusResponse) georadiusByMember(string key, string member, double radius,
994     //         GeoUnit unit, GeoRadiusParam param) {
995     //     Redis j = getShard(key);
996     //     return j.georadiusByMember(key, member, radius, unit, param);
997     // }
998 
999     // override
1000     // List!(GeoRadiusResponse) georadiusByMemberReadonly(string key, string member, double radius,
1001     //         GeoUnit unit, GeoRadiusParam param) {
1002     //     Redis j = getShard(key);
1003     //     return j.georadiusByMemberReadonly(key, member, radius, unit, param);
1004     // }
1005 
1006     // override
1007     // List!(long) bitfield(string key, string[] arguments...) {
1008     //     Redis j = getShard(key);
1009     //     return j.bitfield(key, arguments);
1010     // }
1011 
1012     // override
1013     // long hstrlen(string key, string field) {
1014     //     Redis j = getShard(key);
1015     //     return j.hstrlen(key, field);
1016     // }
1017 
1018     // override
1019     // StreamEntryID xadd(string key, StreamEntryID id, Map!(string, string) hash) {
1020     //     Redis j = getShard(key);
1021     //     return j.xadd(key, id, hash);
1022     // }
1023     
1024     // override
1025     // StreamEntryID xadd(string key, StreamEntryID id, Map!(string, string) hash, long maxLen, bool approximateLength) {
1026     //     Redis j = getShard(key);
1027     //     return j.xadd(key, id, hash, maxLen, approximateLength);
1028     // }
1029 
1030     // override
1031     // long xlen(string key) {
1032     //     Redis j = getShard(key);
1033     //     return j.xlen(key);
1034     // }
1035     
1036     // override
1037     // List!(StreamEntry) xrange(string key, StreamEntryID start, StreamEntryID end, int count) {
1038     //     Redis j = getShard(key);
1039     //     return j.xrange(key, start, end, count);
1040     // }
1041 
1042     // override
1043     // long xack(string key, string group, StreamEntryID[] ids...) {
1044     //     Redis j = getShard(key);
1045     //     return j.xack(key, group, ids);
1046     // }
1047 
1048     // override
1049     // string xgroupCreate(string key, string consumer, StreamEntryID id, bool makeStream) {
1050     //     Redis j = getShard(key);
1051     //     return j.xgroupCreate(key, consumer, id, makeStream);
1052     // }
1053 
1054     // override
1055     // string xgroupSetID(string key, string groupname, StreamEntryID id) {
1056     //     Redis j = getShard(key);
1057     //     return j.xgroupSetID(key, groupname, id);
1058     // }
1059 
1060     // override
1061     // long xgroupDestroy(string key, string groupname) {
1062     //     Redis j = getShard(key);
1063     //     return j.xgroupDestroy(key, groupname);
1064     // }
1065 
1066     // override
1067     // string xgroupDelConsumer(string key, string groupname, string consumername) {
1068     //     Redis j = getShard(key);
1069     //     return j.xgroupDelConsumer(key, groupname, consumername);
1070     // }
1071 
1072 
1073     // override
1074     // long xdel(string key, StreamEntryID[] ids...) {
1075     //     Redis j = getShard(key);
1076     //     return j.xdel(key, ids);
1077     // }
1078 
1079     // override
1080     // long xtrim(string key, long maxLen, bool approximateLength) {
1081     //     Redis j = getShard(key);
1082     //     return j.xtrim(key, maxLen, approximateLength);
1083     // }
1084 
1085     // override
1086     // List!(StreamEntry) xrevrange(string key, StreamEntryID end, StreamEntryID start, int count) {
1087     //     Redis j = getShard(key);
1088     //     return j.xrevrange(key, end, start, count);
1089     // }
1090 
1091     // override
1092     // List!(StreamPendingEntry) xpending(string key, string groupname, StreamEntryID start, StreamEntryID end,
1093     //         int count, string consumername) {
1094     //     Redis j = getShard(key);
1095     //     return j.xpending(key, groupname, start, end, count, consumername);
1096     // }
1097 
1098     // override
1099     // List!(StreamEntry) xclaim(string key, string group, string consumername, long minIdleTime, long newIdleTime,
1100     //         int retries, bool force, StreamEntryID[] ids...) {
1101     //     Redis j = getShard(key);
1102     //     return j.xclaim(key, group, consumername, minIdleTime, newIdleTime, retries, force, ids);
1103     // }
1104 
1105     // override
1106     // Object sendCommand(ProtocolCommand cmd, string[] args...) {
1107     //     // default since no sample key provided in RedisCommands interface
1108     //     string sampleKey = args.length > 0 ? args[0] : cmd.to!string();
1109     //     Redis j = getShard(sampleKey);
1110     //     return j.sendCommand(cmd, args);
1111     // }
1112 }