1 /* 2 * Hunt - A redis client library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.redis.BinaryRedisPubSub; 13 14 import hunt.redis.Client; 15 import hunt.redis.Exceptions; 16 import hunt.redis.Protocol; 17 18 import hunt.util.ArrayHelper; 19 import hunt.collection.List; 20 import hunt.Exceptions; 21 22 abstract class BinaryRedisPubSub { 23 private int subscribedChannels = 0; 24 private Client client; 25 26 void onMessage(const(ubyte)[] channel, const(ubyte)[] message) { 27 } 28 29 void onPMessage(const(ubyte)[] pattern, const(ubyte)[] channel, const(ubyte)[] message) { 30 } 31 32 void onSubscribe(const(ubyte)[] channel, int subscribedChannels) { 33 } 34 35 void onUnsubscribe(const(ubyte)[] channel, int subscribedChannels) { 36 } 37 38 void onPUnsubscribe(const(ubyte)[] pattern, int subscribedChannels) { 39 } 40 41 void onPSubscribe(const(ubyte)[] pattern, int subscribedChannels) { 42 } 43 44 void unsubscribe() { 45 client.unsubscribe(); 46 client.flush(); 47 } 48 49 void unsubscribe(const(ubyte)[][] channels...) { 50 client.unsubscribe(channels); 51 client.flush(); 52 } 53 54 void subscribe(const(ubyte)[][] channels...) { 55 client.subscribe(channels); 56 client.flush(); 57 } 58 59 void psubscribe(const(ubyte)[][] patterns...) { 60 client.psubscribe(patterns); 61 client.flush(); 62 } 63 64 void punsubscribe() { 65 client.punsubscribe(); 66 client.flush(); 67 } 68 69 void punsubscribe(const(ubyte)[][] patterns...) { 70 client.punsubscribe(patterns); 71 client.flush(); 72 } 73 74 bool isSubscribed() { 75 return subscribedChannels > 0; 76 } 77 78 void proceedWithPatterns(Client client, const(ubyte)[][] patterns...) { 79 this.client = client; 80 client.psubscribe(patterns); 81 client.flush(); 82 process(client); 83 } 84 85 void proceed(Client client, const(ubyte)[][] channels...) { 86 this.client = client; 87 client.subscribe(channels); 88 client.flush(); 89 process(client); 90 } 91 92 private void process(Client client) { 93 implementationMissing(false); 94 // do { 95 // List!(Object) reply = client.getUnflushedObjectMultiBulkReply(); 96 // Object firstObj = reply.get(0); 97 // if (!(firstObj instanceof const(ubyte)[])) { 98 // throw new RedisException("Unknown message type: " ~ firstObj); 99 // } 100 // const(ubyte)[] resp = (const(ubyte)[]) firstObj; 101 // if (Arrays.equals(SUBSCRIBE.raw, resp)) { 102 // subscribedChannels = ((Long) reply.get(2)).intValue(); 103 // const(ubyte)[] bchannel = (const(ubyte)[]) reply.get(1); 104 // onSubscribe(bchannel, subscribedChannels); 105 // } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { 106 // subscribedChannels = ((Long) reply.get(2)).intValue(); 107 // const(ubyte)[] bchannel = (const(ubyte)[]) reply.get(1); 108 // onUnsubscribe(bchannel, subscribedChannels); 109 // } else if (Arrays.equals(MESSAGE.raw, resp)) { 110 // const(ubyte)[] bchannel = (const(ubyte)[]) reply.get(1); 111 // const(ubyte)[] bmesg = (const(ubyte)[]) reply.get(2); 112 // onMessage(bchannel, bmesg); 113 // } else if (Arrays.equals(PMESSAGE.raw, resp)) { 114 // const(ubyte)[] bpattern = (const(ubyte)[]) reply.get(1); 115 // const(ubyte)[] bchannel = (const(ubyte)[]) reply.get(2); 116 // const(ubyte)[] bmesg = (const(ubyte)[]) reply.get(3); 117 // onPMessage(bpattern, bchannel, bmesg); 118 // } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { 119 // subscribedChannels = ((Long) reply.get(2)).intValue(); 120 // const(ubyte)[] bpattern = (const(ubyte)[]) reply.get(1); 121 // onPSubscribe(bpattern, subscribedChannels); 122 // } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { 123 // subscribedChannels = ((Long) reply.get(2)).intValue(); 124 // const(ubyte)[] bpattern = (const(ubyte)[]) reply.get(1); 125 // onPUnsubscribe(bpattern, subscribedChannels); 126 // } else { 127 // throw new RedisException("Unknown message type: " ~ firstObj); 128 // } 129 // } while (isSubscribed()); 130 } 131 132 int getSubscribedChannels() { 133 return subscribedChannels; 134 } 135 }