1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.RedisPubSub; 13 14 import hunt.redis.Client; 15 import hunt.redis.Protocol; 16 import hunt.redis.Exceptions; 17 import hunt.redis.util.SafeEncoder; 18 19 import hunt.Exceptions; 20 import hunt.util.ArrayHelper; 21 import hunt.collection.List; 22 23 abstract class RedisPubSub { 24 25 private enum string JEDIS_SUBSCRIPTION_MESSAGE = "RedisPubSub is not subscribed to a Redis instance."; 26 private int subscribedChannels = 0; 27 private Client client; 28 29 void onMessage(string channel, string message) { 30 } 31 32 void onPMessage(string pattern, string channel, string message) { 33 } 34 35 void onSubscribe(string channel, int subscribedChannels) { 36 } 37 38 void onUnsubscribe(string channel, int subscribedChannels) { 39 } 40 41 void onPUnsubscribe(string pattern, int subscribedChannels) { 42 } 43 44 void onPSubscribe(string pattern, int subscribedChannels) { 45 } 46 47 void onPong(string pattern) { 48 49 } 50 51 void unsubscribe() { 52 if (client is null) { 53 throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE); 54 } 55 client.unsubscribe(); 56 client.flush(); 57 } 58 59 void unsubscribe(string[] channels...) { 60 if (client is null) { 61 throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE); 62 } 63 client.unsubscribe(channels); 64 client.flush(); 65 } 66 67 void subscribe(string[] channels...) { 68 if (client is null) { 69 throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE); 70 } 71 client.subscribe(channels); 72 client.flush(); 73 } 74 75 void psubscribe(string[] patterns...) { 76 if (client is null) { 77 throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE); 78 } 79 client.psubscribe(patterns); 80 client.flush(); 81 } 82 83 void punsubscribe() { 84 if (client is null) { 85 throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE); 86 } 87 client.punsubscribe(); 88 client.flush(); 89 } 90 91 void punsubscribe(string[] patterns...) { 92 if (client is null) { 93 throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE); 94 } 95 client.punsubscribe(patterns); 96 client.flush(); 97 } 98 99 void ping() { 100 if (client is null) { 101 throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE); 102 } 103 client.ping(); 104 client.flush(); 105 } 106 107 bool isSubscribed() { 108 return subscribedChannels > 0; 109 } 110 111 void proceedWithPatterns(Client client, string[] patterns...) { 112 this.client = client; 113 client.psubscribe(patterns); 114 client.flush(); 115 process(client); 116 } 117 118 void proceed(Client client, string[] channels...) { 119 this.client = client; 120 client.subscribe(channels); 121 client.flush(); 122 process(client); 123 } 124 125 private void process(Client client) { 126 127 implementationMissing(false); 128 // do { 129 // List!(Object) reply = client.getUnflushedObjectMultiBulkReply(); 130 // Object firstObj = reply.get(0); 131 // if (!(firstObj instanceof byte[])) { 132 // throw new RedisException("Unknown message type: " ~ firstObj); 133 // } 134 // byte[] resp = (byte[]) firstObj; 135 // if (Arrays.equals(SUBSCRIBE.raw, resp)) { 136 // subscribedChannels = ((Long) reply.get(2)).intValue(); 137 // byte[] bchannel = (byte[]) reply.get(1); 138 // string strchannel = (bchannel is null) ? null : SafeEncoder.encode(bchannel); 139 // onSubscribe(strchannel, subscribedChannels); 140 // } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { 141 // subscribedChannels = ((Long) reply.get(2)).intValue(); 142 // byte[] bchannel = (byte[]) reply.get(1); 143 // string strchannel = (bchannel is null) ? null : SafeEncoder.encode(bchannel); 144 // onUnsubscribe(strchannel, subscribedChannels); 145 // } else if (Arrays.equals(MESSAGE.raw, resp)) { 146 // byte[] bchannel = (byte[]) reply.get(1); 147 // byte[] bmesg = (byte[]) reply.get(2); 148 // string strchannel = (bchannel is null) ? null : SafeEncoder.encode(bchannel); 149 // string strmesg = (bmesg is null) ? null : SafeEncoder.encode(bmesg); 150 // onMessage(strchannel, strmesg); 151 // } else if (Arrays.equals(PMESSAGE.raw, resp)) { 152 // byte[] bpattern = (byte[]) reply.get(1); 153 // byte[] bchannel = (byte[]) reply.get(2); 154 // byte[] bmesg = (byte[]) reply.get(3); 155 // string strpattern = (bpattern is null) ? null : SafeEncoder.encode(bpattern); 156 // string strchannel = (bchannel is null) ? null : SafeEncoder.encode(bchannel); 157 // string strmesg = (bmesg is null) ? null : SafeEncoder.encode(bmesg); 158 // onPMessage(strpattern, strchannel, strmesg); 159 // } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { 160 // subscribedChannels = ((Long) reply.get(2)).intValue(); 161 // byte[] bpattern = (byte[]) reply.get(1); 162 // string strpattern = (bpattern is null) ? null : SafeEncoder.encode(bpattern); 163 // onPSubscribe(strpattern, subscribedChannels); 164 // } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { 165 // subscribedChannels = ((Long) reply.get(2)).intValue(); 166 // byte[] bpattern = (byte[]) reply.get(1); 167 // string strpattern = (bpattern is null) ? null : SafeEncoder.encode(bpattern); 168 // onPUnsubscribe(strpattern, subscribedChannels); 169 // } else if (Arrays.equals(PONG.raw, resp)) { 170 // byte[] bpattern = (byte[]) reply.get(1); 171 // string strpattern = (bpattern is null) ? null : SafeEncoder.encode(bpattern); 172 // onPong(strpattern); 173 // } else { 174 // throw new RedisException("Unknown message type: " ~ firstObj); 175 // } 176 // } while (isSubscribed()); 177 178 /* Invalidate instance since this thread is no longer listening */ 179 this.client = null; 180 } 181 182 int getSubscribedChannels() { 183 return subscribedChannels; 184 } 185 }