1 /*
2  * Hunt - A redis client library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11  
12 module hunt.redis.BinaryRedisPubSub;
13 
14 import hunt.redis.Client;
15 import hunt.redis.Exceptions;
16 import hunt.redis.Protocol;
17 
18 import hunt.util.ArrayHelper;
19 import hunt.collection.List;
20 import hunt.Exceptions;
21 
22 abstract class BinaryRedisPubSub {
23     private int subscribedChannels = 0;
24     private Client client;
25 
26     void onMessage(const(ubyte)[] channel, const(ubyte)[] message) {
27     }
28 
29     void onPMessage(const(ubyte)[] pattern, const(ubyte)[] channel, const(ubyte)[] message) {
30     }
31 
32     void onSubscribe(const(ubyte)[] channel, int subscribedChannels) {
33     }
34 
35     void onUnsubscribe(const(ubyte)[] channel, int subscribedChannels) {
36     }
37 
38     void onPUnsubscribe(const(ubyte)[] pattern, int subscribedChannels) {
39     }
40 
41     void onPSubscribe(const(ubyte)[] pattern, int subscribedChannels) {
42     }
43 
44     void unsubscribe() {
45         client.unsubscribe();
46         client.flush();
47     }
48 
49     void unsubscribe(const(ubyte)[][] channels...) {
50         client.unsubscribe(channels);
51         client.flush();
52     }
53 
54     void subscribe(const(ubyte)[][] channels...) {
55         client.subscribe(channels);
56         client.flush();
57     }
58 
59     void psubscribe(const(ubyte)[][] patterns...) {
60         client.psubscribe(patterns);
61         client.flush();
62     }
63 
64     void punsubscribe() {
65         client.punsubscribe();
66         client.flush();
67     }
68 
69     void punsubscribe(const(ubyte)[][] patterns...) {
70         client.punsubscribe(patterns);
71         client.flush();
72     }
73 
74     bool isSubscribed() {
75         return subscribedChannels > 0;
76     }
77 
78     void proceedWithPatterns(Client client, const(ubyte)[][] patterns...) {
79         this.client = client;
80         client.psubscribe(patterns);
81         client.flush();
82         process(client);
83     }
84 
85     void proceed(Client client, const(ubyte)[][] channels...) {
86         this.client = client;
87         client.subscribe(channels);
88         client.flush();
89         process(client);
90     }
91 
92     private void process(Client client) {
93         implementationMissing(false);
94         // do {
95         //   List!(Object) reply = client.getUnflushedObjectMultiBulkReply();
96         //   Object firstObj = reply.get(0);
97         //   if (!(firstObj instanceof const(ubyte)[])) {
98         //     throw new RedisException("Unknown message type: " ~ firstObj);
99         //   }
100         //   const(ubyte)[] resp = (const(ubyte)[]) firstObj;
101         //   if (Arrays.equals(SUBSCRIBE.raw, resp)) {
102         //     subscribedChannels = ((Long) reply.get(2)).intValue();
103         //     const(ubyte)[] bchannel = (const(ubyte)[]) reply.get(1);
104         //     onSubscribe(bchannel, subscribedChannels);
105         //   } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
106         //     subscribedChannels = ((Long) reply.get(2)).intValue();
107         //     const(ubyte)[] bchannel = (const(ubyte)[]) reply.get(1);
108         //     onUnsubscribe(bchannel, subscribedChannels);
109         //   } else if (Arrays.equals(MESSAGE.raw, resp)) {
110         //     const(ubyte)[] bchannel = (const(ubyte)[]) reply.get(1);
111         //     const(ubyte)[] bmesg = (const(ubyte)[]) reply.get(2);
112         //     onMessage(bchannel, bmesg);
113         //   } else if (Arrays.equals(PMESSAGE.raw, resp)) {
114         //     const(ubyte)[] bpattern = (const(ubyte)[]) reply.get(1);
115         //     const(ubyte)[] bchannel = (const(ubyte)[]) reply.get(2);
116         //     const(ubyte)[] bmesg = (const(ubyte)[]) reply.get(3);
117         //     onPMessage(bpattern, bchannel, bmesg);
118         //   } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
119         //     subscribedChannels = ((Long) reply.get(2)).intValue();
120         //     const(ubyte)[] bpattern = (const(ubyte)[]) reply.get(1);
121         //     onPSubscribe(bpattern, subscribedChannels);
122         //   } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
123         //     subscribedChannels = ((Long) reply.get(2)).intValue();
124         //     const(ubyte)[] bpattern = (const(ubyte)[]) reply.get(1);
125         //     onPUnsubscribe(bpattern, subscribedChannels);
126         //   } else {
127         //     throw new RedisException("Unknown message type: " ~ firstObj);
128         //   }
129         // } while (isSubscribed());
130     }
131 
132     int getSubscribedChannels() {
133         return subscribedChannels;
134     }
135 }