1 module drmi.mqtt.accessor;
2 
3 import mqttd;
4 import vibe.data.json;
5 import vibe.core.log;
6 import vibe.core.core;
7 
8 import drmi;
9 
10 import drmi.mqtt.types;
11 import drmi.mqtt.subscriber;
12 
13 import std.datetime;
14 import std.string;
15 
16 enum REQ_ROOM = "/request";
17 enum RES_ROOM = "/response/";
18 
19 interface Broadcaster
20 {
21     void pub(BusData, QoSLevel q=QoSLevel.Reserved);
22 
23     final void pub(T)(T val, string type="", QoSLevel q=QoSLevel.Reserved) if (!is(T == BData))
24     { this.pub(BusData(val, type), q); }
25 }
26 
27 string cliName(T)(string un)
28 { return un.length ? format("%s_%s", T.stringof, un) : T.stringof; }
29 
30 class Accessor(T)
31 {
32 protected:
33     Subscriber sub;
34     MqttClient pub;
35 
36     QoSLevel qos;
37 
38     Duration waitTime;
39     size_t maxWaitResponses;
40 
41     string name;
42 
43     void publish(V)(string topic, V val, QoSLevel q=QoSLevel.Reserved)
44     {
45         if (q == QoSLevel.Reserved) q = qos;
46         pub.publish(topic, val.serializeToPrettyJson, q);
47     }
48 
49     class BCaster : Broadcaster
50     {
51         string bus;
52         this(string bus) { this.bus = bus; }
53         override void pub(BusData d, QoSLevel q=QoSLevel.Reserved)
54         { publish(bus, d, q); }
55     }
56 
57     RMISkeleton!T skeleton;
58 
59     void receive(string t, BusData msg)
60     {
61         if (msg.type == RMICall.stringof)
62         {
63             import std.range : repeat;
64             import std.algorithm : joiner;
65             auto call = msg.as!RMICall;
66             logDebug("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func);
67             auto res = skeleton.process(call);
68             publish(name ~ RES_ROOM ~ call.caller, BusData(res), qos);
69             logDebug("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts);
70         }
71         else logWarn("unexpected request message: %s", msg);
72     }
73 
74     class CliCom : RMIStubCom
75     {
76         import std.exception : enforce;
77         import std.conv : text;
78         import std.string;
79 
80         string target, reqbus;
81         RMIResponse[string] res;
82         string[string] waitList;
83 
84         Duration sleepStep;
85 
86         this(string target)
87         {
88             this.target = target;
89             this.reqbus = target ~ REQ_ROOM;
90             sleepStep = 1.msecs;
91         }
92 
93         string callHash(RMICall call)
94         { return format("%s:%s", call.func, call.ts); }
95 
96         void receive(string t, BusData msg)
97         {
98             if (msg.type == RMIResponse.stringof)
99             {
100                 auto r = msg.as!RMIResponse;
101                 if (r.call.caller != caller)
102                 {
103                     logError("unexpected response for %s in bus for %s", r.call.caller, caller);
104                     return;
105                 }
106                 logDebug("[%s]  in %s %s", cts, r.call.ts, r.call.func);
107                 auto ch = callHash(r.call);
108                 if (ch in waitList)
109                 {
110                     enforce(ch !in res, format("internal error: unexpect having result in res for %s", ch));
111                     res[ch] = r;
112                     waitList.remove(ch);
113                 }
114                 else logTrace("unexpected %s for calls: %s", r, waitList.keys);
115             }
116             else logWarn("unexpected request message: %s", msg);
117         }
118 
119         override string caller() const @property { return name; }
120 
121         override RMIResponse process(RMICall call)
122         {
123             while (waitList.length >= maxWaitResponses) sleep(sleepStep * 10);
124             auto ch = callHash(call);
125             waitList[ch] = ch;
126             logDebug("[%s] out %s %s", cts, call.ts, call.func);
127             publish(reqbus, BusData(call));
128             auto tm = StopWatch(AutoStart.yes);
129             while (ch in waitList)
130             {
131                 if (cast(Duration)tm.peek > waitTime)
132                 {
133                     logDebug("[%s] ### %s %s", cts, call.ts, call.func);
134                     waitList.remove(ch);
135                     throw new RMITimeoutException(call);
136                 }
137                 else sleep(sleepStep);
138             }
139             enforce(ch in res, format("internal error: no result then not wait and not except for %s", ch));
140             auto r = res[ch];
141             res.remove(ch);
142             return r;
143         }
144     }
145 
146 public:
147 
148     this(T serv, string uniqName="")
149     {
150         name = cliName!T(uniqName);
151         qos = QoSLevel.QoS2;
152         waitTime = 5.minutes;
153         maxWaitResponses = 100;
154 
155         skeleton = new RMISkeleton!T(serv);
156 
157         Settings pset, sset;
158         pset.clientId = name ~ ".pub";
159         sset.clientId = name ~ ".sub";
160 
161         pub = new MqttClient(pset);
162         sub = new Subscriber(sset, qos);
163 
164         subscribe(name~REQ_ROOM, &this.receive);
165     }
166 
167     Broadcaster getBroadcaster(string bus) { return new BCaster(bus); }
168 
169     void subscribe(string bus, void delegate(string, const(ubyte)[]) dlg)
170     { sub.subscribe(bus, dlg); }
171 
172     void subscribe(string bus, void delegate(string, Json) dlg)
173     {
174         subscribe(bus, (string t, const(ubyte)[] data)
175         {
176             auto j = Json.undefined;
177             try j = (cast(string)data).parseJsonString;
178             catch (JSONException e)
179                 logError("error while parsing json msg: ", e.msg);
180             if (j != Json.undefined) dlg(t, j);
181             else logWarn("parsed json is undefined, don't call dlg");
182         });
183     }
184 
185     void subscribe(string bus, void delegate(string, BusData bm) dlg)
186     {
187         subscribe(bus, (string t, Json j)
188         {
189             BusData bm;
190             // for accurance exception handling
191             bool converted = false;
192             try // catch exceptions only while deserialization
193             {
194                 bm = j.deserializeJson!BusData;
195                 converted = true;
196             }
197             catch (Exception e)
198                 // vibe.data.json.deserializeJson has no throwable exception
199                 // list in documentation
200                 logError("error while convert json to BusMessage: ", e.msg);
201 
202             // if all is ok call delegate
203             if (converted) dlg(t, bm);
204         });
205     }
206 
207     RMIStub!X getClient(X)(string uniqName="")
208     {
209         auto cn = cliName!X(uniqName);
210         auto clicom = new CliCom(cn);
211         subscribe(cn ~ RES_ROOM ~ name, &clicom.receive);
212         return new RMIStub!X(clicom);
213     }
214 
215     void connect()
216     {
217         pub.connect();
218         sub.connect();
219     }
220 }
221 
222 private long cts()()
223 {
224     import std.datetime;
225     return Clock.currStdTime;
226 }