1 ///
2 module drmi.ps.accessor;
3 
4 import drmi.exceptions;
5 import drmi.types;
6 import drmi.base;
7 
8 import drmi.ps.helpers;
9 import drmi.ps.iface;
10 
11 import std.datetime;
12 import std.string;
13 import std.exception : enforce;
14 
15 public import vibe.data.json : Json;
16 import vibe.data.json;
17 import vibe.core.core;
18 import vibe.core.log;
19 
20 enum REQ_ROOM = "/request";
21 enum RES_ROOM = "/response/";
22 
23 ///
24 interface Broadcaster
25 {
26     ///
27     void publish(string, Rel lvl=Rel.undefined);
28 
29     ///
30     final void publish(Json j, Rel lvl=Rel.undefined)
31     { this.publish(j.toPrettyString, lvl); }
32     ///
33     void publish(T)(T val, Rel lvl=Rel.undefined)
34         if (!is(T == Json))
35     { this.publish(val.serializeToPrettyJson, lvl); }
36 }
37 
38 ///
39 class Accessor(T)
40 {
41 protected:
42     Transport ll;
43 
44     Rel defaultRel;
45 
46     Duration waitTime;
47     Duration waitSleepStep;
48     size_t maxWaitResponses;
49 
50     string name;
51 
52     void publish(V)(string topic, V val, Rel lvl=Rel.undefined)
53         if (!is(V == Json))
54     { publish(topic, val.serializeToJson, lvl); }
55 
56     void publish(string topic, Json val, Rel lvl=Rel.undefined)
57     { publish(topic, val.toPrettyString, lvl); }
58 
59     void publish(string topic, string data, Rel lvl=Rel.undefined)
60     {
61         if (lvl == Rel.undefined) lvl = defaultRel;
62         ll.publish(topic, cast(const(ubyte)[])data, lvl);
63     }
64 
65     class BCaster : Broadcaster
66     {
67         string topic;
68         this(string t) { topic = t; }
69         override void publish(string s, Rel lvl=Rel.undefined)
70         { this.outer.publish(topic, s, lvl); }
71     }
72 
73     RMISkeleton!T skeleton;
74 
75     void receive(string t, Json msg)
76     {
77         RMICall call = void;
78         try call = msg.deserializeJson!RMICall;
79         catch (Throwable e)
80         {
81             logError("error in parse request: %s", e.msg);
82             return;
83         }
84         import std.range : repeat;
85         import std.algorithm : joiner;
86 
87         logDebug("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func);
88         auto res = skeleton.process(call);
89         publish(name ~ RES_ROOM ~ call.caller, res, defaultRel);
90         logDebug("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts);
91     }
92 
93     class CliCom : RMIStubCom
94     {
95         import std.exception : enforce;
96         import std.conv : text;
97         import std.string;
98 
99         string target, reqbus;
100         RMIResponse[string] responses;
101         string[string] waitList;
102 
103         this(string target)
104         {
105             this.target = target;
106             this.reqbus = target ~ REQ_ROOM;
107         }
108 
109         string calcHash(RMICall call)
110         { return format("%s:%s", call.func, call.ts); }
111 
112         void receive(string t, Json msg)
113         {
114             RMIResponse r = void;
115             try r = msg.deserializeJson!RMIResponse;
116             catch (Throwable e)
117             {
118                 logError("unexpected request message: %s (%s)", msg, e.msg);
119                 return;
120             }
121             if (r.call.caller != caller)
122             {
123                 logError("unexpected response for %s in bus for %s", r.call.caller, caller);
124                 return;
125             }
126             logDebug("[%s]  in %s %s", cts, r.call.ts, r.call.func);
127             auto ch = calcHash(r.call);
128             if (ch in waitList)
129             {
130                 enforce(ch !in responses, format("internal error: unexpect having result in res for %s", ch));
131                 responses[ch] = r;
132                 waitList.remove(ch);
133             }
134             else logError("unexpected %s for calls: %s", r, waitList.keys);
135         }
136 
137         override string caller() const @property { return name; }
138 
139         override RMIResponse process(RMICall call)
140         {
141             while (waitList.length >= maxWaitResponses)
142                 sleep(waitSleepStep * 10);
143             auto ch = calcHash(call);
144             waitList[ch] = ch;
145             logDebug("[%s] out %s %s", cts, call.ts, call.func);
146             publish(reqbus, call, defaultRel);
147             auto tm = StopWatch(AutoStart.yes);
148             while (ch in waitList)
149             {
150                 if (cast(Duration)tm.peek > waitTime)
151                 {
152                     logDebug("[%s] ### %s %s", cts, call.ts, call.func);
153                     waitList.remove(ch);
154                     throw new RMITimeoutException(call);
155                 }
156                 else sleep(waitSleepStep);
157             }
158             enforce(ch in responses, format("internal error: no result then not wait and not except for %s", ch));
159             auto r = responses[ch];
160             responses.remove(ch);
161             return r;
162         }
163     }
164 
165 public:
166 
167     this(Transport t, T serv, string uniqName="",
168             Duration waitTime=30.seconds, size_t maxWaitResponses=10)
169     {
170         ll = enforce(t, "transport is null");
171         name = rmiPSClientName!T(uniqName);
172         ll.init(name);
173 
174         defaultRel = Rel.level2;
175         this.waitTime = waitTime;
176         waitSleepStep = 1.msecs;
177         this.maxWaitResponses = maxWaitResponses;
178 
179         skeleton = new RMISkeleton!T(serv);
180 
181         subscribe(name~REQ_ROOM, &this.receive);
182     }
183 
184     Broadcaster getBroadcaster(string topic) { return new BCaster(topic); }
185 
186     void subscribe(string topic, void delegate(string, const(ubyte)[]) dlg)
187     { ll.subscribe(topic, dlg, defaultRel); }
188 
189     void subscribe(string bus, void delegate(string, Json) dlg)
190     {
191         subscribe(bus, (string t, const(ubyte)[] data)
192         {
193             auto j = Json.undefined;
194             try j = (cast(string)data).parseJsonString;
195             catch (JSONException e)
196                 logError("error while parsing json msg: ", e.msg);
197             if (j != Json.undefined) dlg(t, j);
198             else logWarn("parsed json is undefined, don't call dlg");
199         });
200     }
201 
202     void subscribe(V)(string bus, void delegate(string, V) dlg)
203         if (!is(V == Json) && !is(V == const(ubyte)[]))
204     {
205         subscribe(bus, (string t, const(ubyte)[] data)
206         {
207             V bm = void;
208             // for accurance exception handling
209             bool converted = false;
210             try // catch exceptions only while deserialization
211             {
212                 bm = (cast(string)data)
213                         .parseJsonString
214                         .deserializeJson!V;
215                 converted = true;
216             }
217             catch (Exception e)
218                 // vibe.data.json.deserializeJson has no throwable exception
219                 // list in documentation
220                 logError("error while parse %s: %s", V.stringof, e.msg);
221 
222             // if all is ok call delegate
223             if (converted) dlg(t, bm);
224         });
225     }
226 
227     RMIStub!X getClient(X)(string uniqName="")
228     {
229         auto cn = rmiPSClientName!X(uniqName);
230         auto clicom = new CliCom(cn);
231         subscribe(cn ~ RES_ROOM ~ name, &clicom.receive);
232         return new RMIStub!X(clicom);
233     }
234 
235     void connect() { ll.connect(); }
236 }
237 
238 private long cts()()
239 {
240     import std.datetime;
241     return Clock.currStdTime;
242 }