1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.RedisPubSub;
13 
14 import hunt.redis.Client;
15 import hunt.redis.Protocol;
16 import hunt.redis.Exceptions;
17 import hunt.redis.util.SafeEncoder;
18 
19 import hunt.Exceptions;
20 import hunt.util.ArrayHelper;
21 import hunt.collection.List;
22 
23 abstract class RedisPubSub {
24 
25   private enum string JEDIS_SUBSCRIPTION_MESSAGE = "RedisPubSub is not subscribed to a Redis instance.";
26   private int subscribedChannels = 0;
27   private Client client;
28 
29   void onMessage(string channel, string message) {
30   }
31 
32   void onPMessage(string pattern, string channel, string message) {
33   }
34 
35   void onSubscribe(string channel, int subscribedChannels) {
36   }
37 
38   void onUnsubscribe(string channel, int subscribedChannels) {
39   }
40 
41   void onPUnsubscribe(string pattern, int subscribedChannels) {
42   }
43 
44   void onPSubscribe(string pattern, int subscribedChannels) {
45   }
46 
47   void onPong(string pattern) {
48 
49   }
50 
51   void unsubscribe() {
52     if (client is null) {
53       throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
54     }
55     client.unsubscribe();
56     client.flush();
57   }
58 
59   void unsubscribe(string[] channels...) {
60     if (client is null) {
61       throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
62     }
63     client.unsubscribe(channels);
64     client.flush();
65   }
66 
67   void subscribe(string[] channels...) {
68     if (client is null) {
69       throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
70     }
71     client.subscribe(channels);
72     client.flush();
73   }
74 
75   void psubscribe(string[] patterns...) {
76     if (client is null) {
77       throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
78     }
79     client.psubscribe(patterns);
80     client.flush();
81   }
82 
83   void punsubscribe() {
84     if (client is null) {
85       throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
86     }
87     client.punsubscribe();
88     client.flush();
89   }
90 
91   void punsubscribe(string[] patterns...) {
92     if (client is null) {
93       throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
94     }
95     client.punsubscribe(patterns);
96     client.flush();
97   }
98 
99   void ping() {
100     if (client is null) {
101       throw new RedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
102     }
103     client.ping();
104     client.flush();
105   }
106 
107   bool isSubscribed() {
108     return subscribedChannels > 0;
109   }
110 
111   void proceedWithPatterns(Client client, string[] patterns...) {
112     this.client = client;
113     client.psubscribe(patterns);
114     client.flush();
115     process(client);
116   }
117 
118   void proceed(Client client, string[] channels...) {
119     this.client = client;
120     client.subscribe(channels);
121     client.flush();
122     process(client);
123   }
124 
125   private void process(Client client) {
126 
127     implementationMissing(false);
128     // do {
129     //   List!(Object) reply = client.getUnflushedObjectMultiBulkReply();
130     //   Object firstObj = reply.get(0);
131     //   if (!(firstObj instanceof byte[])) {
132     //     throw new RedisException("Unknown message type: " ~ firstObj);
133     //   }
134     //   byte[] resp = (byte[]) firstObj;
135     //   if (Arrays.equals(SUBSCRIBE.raw, resp)) {
136     //     subscribedChannels = ((Long) reply.get(2)).intValue();
137     //     byte[] bchannel = (byte[]) reply.get(1);
138     //     string strchannel = (bchannel is null) ? null : SafeEncoder.encode(bchannel);
139     //     onSubscribe(strchannel, subscribedChannels);
140     //   } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
141     //     subscribedChannels = ((Long) reply.get(2)).intValue();
142     //     byte[] bchannel = (byte[]) reply.get(1);
143     //     string strchannel = (bchannel is null) ? null : SafeEncoder.encode(bchannel);
144     //     onUnsubscribe(strchannel, subscribedChannels);
145     //   } else if (Arrays.equals(MESSAGE.raw, resp)) {
146     //     byte[] bchannel = (byte[]) reply.get(1);
147     //     byte[] bmesg = (byte[]) reply.get(2);
148     //     string strchannel = (bchannel is null) ? null : SafeEncoder.encode(bchannel);
149     //     string strmesg = (bmesg is null) ? null : SafeEncoder.encode(bmesg);
150     //     onMessage(strchannel, strmesg);
151     //   } else if (Arrays.equals(PMESSAGE.raw, resp)) {
152     //     byte[] bpattern = (byte[]) reply.get(1);
153     //     byte[] bchannel = (byte[]) reply.get(2);
154     //     byte[] bmesg = (byte[]) reply.get(3);
155     //     string strpattern = (bpattern is null) ? null : SafeEncoder.encode(bpattern);
156     //     string strchannel = (bchannel is null) ? null : SafeEncoder.encode(bchannel);
157     //     string strmesg = (bmesg is null) ? null : SafeEncoder.encode(bmesg);
158     //     onPMessage(strpattern, strchannel, strmesg);
159     //   } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
160     //     subscribedChannels = ((Long) reply.get(2)).intValue();
161     //     byte[] bpattern = (byte[]) reply.get(1);
162     //     string strpattern = (bpattern is null) ? null : SafeEncoder.encode(bpattern);
163     //     onPSubscribe(strpattern, subscribedChannels);
164     //   } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
165     //     subscribedChannels = ((Long) reply.get(2)).intValue();
166     //     byte[] bpattern = (byte[]) reply.get(1);
167     //     string strpattern = (bpattern is null) ? null : SafeEncoder.encode(bpattern);
168     //     onPUnsubscribe(strpattern, subscribedChannels);
169     //   } else if (Arrays.equals(PONG.raw, resp)) {
170     //     byte[] bpattern = (byte[]) reply.get(1);
171     //     string strpattern = (bpattern is null) ? null : SafeEncoder.encode(bpattern);
172     //     onPong(strpattern);
173     //   } else {
174     //     throw new RedisException("Unknown message type: " ~ firstObj);
175     //   }
176     // } while (isSubscribed());
177 
178     /* Invalidate instance since this thread is no longer listening */
179     this.client = null;
180   }
181 
182   int getSubscribedChannels() {
183     return subscribedChannels;
184   }
185 }