1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.Pipeline; 13 14 import hunt.redis.Builder; 15 import hunt.redis.BuilderFactory; 16 import hunt.redis.Client; 17 import hunt.redis.Exceptions; 18 import hunt.redis.MultiKeyPipelineBase; 19 import hunt.redis.Response; 20 21 import hunt.util.Common; 22 import hunt.collection; 23 24 25 import std.conv; 26 27 class Pipeline : MultiKeyPipelineBase, Closeable { 28 29 private MultiResponseBuilder currentMulti; 30 31 private class MultiResponseBuilder : Builder!(List!(Object)) { 32 private List!AbstractResponse responses; 33 34 this() { 35 responses = new ArrayList!AbstractResponse(); 36 } 37 38 override 39 List!(Object) build(Object data) { 40 41 List!(Object) list = cast(List!(Object)) data; 42 List!(Object) values = new ArrayList!(Object)(); 43 44 if (list.size() != responses.size()) { 45 throw new RedisDataException("Expected data size " ~ responses.size().to!string() ~ " but was " 46 ~ list.size().to!string()); 47 } 48 49 for (int i = 0; i < list.size(); i++) { 50 // FIXME: Needing refactor or cleanup -@zxp at 7/16/2019, 8:15:19 PM 51 // 52 AbstractResponse response = responses.get(i); 53 response.set(list.get(i)); 54 Object builtResponse; 55 try { 56 builtResponse = response; 57 } catch (RedisDataException e) { 58 builtResponse = e; 59 } 60 values.add(builtResponse); 61 } 62 return values; 63 } 64 65 void setResponseDependency(AbstractResponse dependency) { 66 foreach(AbstractResponse response ; responses) { 67 response.setDependency(dependency); 68 } 69 } 70 71 void addResponse(AbstractResponse response) { 72 responses.add(response); 73 } 74 } 75 76 // override 77 // protected <T> Response!(T) getResponse(Builder!(T) builder) { 78 // if (currentMulti !is null) { 79 // super.getResponse(BuilderFactory.STRING); // Expected QUEUED 80 81 // Response!(T) lr = new Response!(T)(builder); 82 // currentMulti.addResponse(lr); 83 // return lr; 84 // } else { 85 // return super.getResponse(builder); 86 // } 87 // } 88 89 void setClient(Client client) { 90 this.client = client; 91 } 92 93 override 94 protected Client getClient(string key) { 95 return client; 96 } 97 98 override 99 protected Client getClient(const(ubyte)[] key) { 100 return client; 101 } 102 103 void clear() { 104 if (isInMulti()) { 105 discard(); 106 } 107 108 sync(); 109 } 110 111 bool isInMulti() { 112 return currentMulti !is null; 113 } 114 115 /** 116 * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to 117 * get return values from pipelined commands, capture the different Response<?> of the 118 * commands you execute. 119 */ 120 void sync() { 121 if (getPipelinedResponseLength() > 0) { 122 List!(Object) unformatted = client.getMany(getPipelinedResponseLength()); 123 foreach(Object o ; unformatted) { 124 generateResponse(o); 125 } 126 } 127 } 128 129 /** 130 * Synchronize pipeline by reading all responses. This operation close the pipeline. Whenever 131 * possible try to avoid using this version and use Pipeline.sync() as it won't go through all the 132 * responses and generate the right response type (usually it is a waste of time). 133 * @return A list of all the responses in the order you executed them. 134 */ 135 List!(Object) syncAndReturnAll() { 136 if (getPipelinedResponseLength() > 0) { 137 List!(Object) unformatted = client.getMany(getPipelinedResponseLength()); 138 List!(Object) formatted = new ArrayList!(Object)(); 139 foreach(Object o ; unformatted) { 140 try { 141 formatted.add(generateResponse(o)); 142 } catch (RedisDataException e) { 143 formatted.add(e); 144 } 145 } 146 return formatted; 147 } else { 148 return Collections.emptyList!Object(); 149 } 150 } 151 152 Response!(string) discard() { 153 if (currentMulti is null) throw new RedisDataException("DISCARD without MULTI"); 154 client.discard(); 155 currentMulti = null; 156 return getResponse(BuilderFactory.STRING); 157 } 158 159 Response!(List!(Object)) exec() { 160 if (currentMulti is null) throw new RedisDataException("EXEC without MULTI"); 161 162 client.exec(); 163 Response!(List!(Object)) response = super.getResponse(currentMulti); 164 currentMulti.setResponseDependency(response); 165 currentMulti = null; 166 return response; 167 } 168 169 Response!(string) multi() { 170 if (currentMulti !is null) throw new RedisDataException("MULTI calls can not be nested"); 171 172 client.multi(); 173 Response!(string) response = getResponse(BuilderFactory.STRING); // Expecting 174 // OK 175 currentMulti = new MultiResponseBuilder(); 176 return response; 177 } 178 179 override 180 void close() { 181 clear(); 182 } 183 184 }