1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.Pipeline;
13 
14 import hunt.redis.Builder;
15 import hunt.redis.BuilderFactory;
16 import hunt.redis.Client;
17 import hunt.redis.Exceptions;
18 import hunt.redis.MultiKeyPipelineBase;
19 import hunt.redis.Response;
20 
21 import hunt.util.Common;
22 import hunt.collection;
23 
24 
25 import std.conv;
26 
27 class Pipeline : MultiKeyPipelineBase, Closeable {
28 
29     private MultiResponseBuilder currentMulti;
30 
31     private class MultiResponseBuilder : Builder!(List!(Object)) {
32         private List!AbstractResponse responses;
33 
34         this() {
35                 responses = new ArrayList!AbstractResponse();
36         }
37 
38         override
39         List!(Object) build(Object data) {
40             
41             List!(Object) list = cast(List!(Object)) data;
42             List!(Object) values = new ArrayList!(Object)();
43 
44             if (list.size() != responses.size()) {
45                 throw new RedisDataException("Expected data size " ~ responses.size().to!string() ~ " but was "
46                         ~ list.size().to!string());
47             }
48 
49             for (int i = 0; i < list.size(); i++) {
50                 // FIXME: Needing refactor or cleanup -@zxp at 7/16/2019, 8:15:19 PM
51                 // 
52                 AbstractResponse response = responses.get(i);
53                 response.set(list.get(i));
54                 Object builtResponse;
55                 try {
56                     builtResponse = response;
57                 } catch (RedisDataException e) {
58                     builtResponse = e;
59                 }
60                 values.add(builtResponse);
61             }
62             return values;
63         }
64 
65         void setResponseDependency(AbstractResponse dependency) {
66             foreach(AbstractResponse response ; responses) {
67                 response.setDependency(dependency);
68             }
69         }
70 
71         void addResponse(AbstractResponse response) {
72             responses.add(response);
73         }
74     }
75 
76 //   override
77 //   protected <T> Response!(T) getResponse(Builder!(T) builder) {
78 //     if (currentMulti !is null) {
79 //       super.getResponse(BuilderFactory.STRING); // Expected QUEUED
80 
81 //       Response!(T) lr = new Response!(T)(builder);
82 //       currentMulti.addResponse(lr);
83 //       return lr;
84 //     } else {
85 //       return super.getResponse(builder);
86 //     }
87 //   }
88 
89     void setClient(Client client) {
90         this.client = client;
91     }
92 
93     override
94     protected Client getClient(string key) {
95         return client;
96     }
97 
98     override
99     protected Client getClient(const(ubyte)[] key) {
100         return client;
101     }
102 
103     void clear() {
104         if (isInMulti()) {
105             discard();
106         }
107 
108         sync();
109     }
110 
111     bool isInMulti() {
112         return currentMulti !is null;
113     }
114 
115     /**
116      * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
117      * get return values from pipelined commands, capture the different Response&lt;?&gt; of the
118      * commands you execute.
119      */
120     void sync() {
121         if (getPipelinedResponseLength() > 0) {
122             List!(Object) unformatted = client.getMany(getPipelinedResponseLength());
123             foreach(Object o ; unformatted) {
124                 generateResponse(o);
125             }
126         }
127     }
128 
129     /**
130      * Synchronize pipeline by reading all responses. This operation close the pipeline. Whenever
131      * possible try to avoid using this version and use Pipeline.sync() as it won't go through all the
132      * responses and generate the right response type (usually it is a waste of time).
133      * @return A list of all the responses in the order you executed them.
134      */
135     List!(Object) syncAndReturnAll() {
136         if (getPipelinedResponseLength() > 0) {
137             List!(Object) unformatted = client.getMany(getPipelinedResponseLength());
138             List!(Object) formatted = new ArrayList!(Object)();
139             foreach(Object o ; unformatted) {
140                 try {
141                     formatted.add(generateResponse(o));
142                 } catch (RedisDataException e) {
143                     formatted.add(e);
144                 }
145             }
146             return formatted;
147         } else {
148             return Collections.emptyList!Object();
149         }
150     }
151 
152     Response!(string) discard() {
153         if (currentMulti is null) throw new RedisDataException("DISCARD without MULTI");
154         client.discard();
155         currentMulti = null;
156         return getResponse(BuilderFactory.STRING);
157     }
158 
159     Response!(List!(Object)) exec() {
160         if (currentMulti is null) throw new RedisDataException("EXEC without MULTI");
161 
162         client.exec();
163         Response!(List!(Object)) response = super.getResponse(currentMulti);
164         currentMulti.setResponseDependency(response);
165         currentMulti = null;
166         return response;
167     }
168 
169     Response!(string) multi() {
170         if (currentMulti !is null) throw new RedisDataException("MULTI calls can not be nested");
171 
172         client.multi();
173         Response!(string) response = getResponse(BuilderFactory.STRING); // Expecting
174         // OK
175         currentMulti = new MultiResponseBuilder();
176         return response;
177     }
178 
179     override
180     void close() {
181         clear();
182     }
183 
184 }