1 ///
2 module drmi.ps.accessor;
3 
4 import drmi.core;
5 
6 import drmi.ps.helpers;
7 import drmi.ps.iface;
8 
9 import std.datetime.stopwatch;
10 import std..string;
11 import std.array : appender, Appender;
12 import std.exception : enforce;
13 import std.experimental.logger;
14 
15 enum REQ_ROOM = "/request";
16 enum RES_ROOM = "/response/";
17 
18 ///
19 interface Broadcaster
20 {
21     ///
22     void publish(const(ubyte)[], QoS qos=QoS.undefined);
23 
24     ///
25     final void publish(T)(T val, QoS qos=QoS.undefined)
26         if (!is(Unqual!T == ubyte[]))
27     {
28         auto buf = localAppender();
29         buf.clear();
30         buf.sbinSerialize(val);
31         this.publish(buf.data, qos);
32     }
33 
34     protected ref Appender!(ubyte[]) localAppender() @property;
35 }
36 
37 ///
38 class Accessor(T)
39 {
40 protected:
41     Transport tport;
42 
43     QoS defaultQoS;
44 
45     Duration waitTime;
46     Duration waitSleepStep;
47     size_t maxWaitResponses;
48 
49     string name;
50 
51     auto sBuffer = appender!(ubyte[]);
52 
53     void delegate(Duration d) sleepFunc;
54 
55     void sleep(Duration t)
56     {
57         import core.thread;
58         if (sleepFunc !is null) sleepFunc(t);
59         else
60         {
61             if (auto f = Fiber.getThis()) f.yield();
62             else Thread.sleep(t);
63         }
64     }
65 
66     class BCaster : Broadcaster
67     {
68         string topic;
69         this(string t) { topic = t; }
70         override void publish(const(ubyte)[] data, QoS qos=QoS.undefined)
71         { this.outer.publish(topic, data, qos); }
72         protected override ref Appender!(ubyte[]) localAppender() @property
73         { return this.outer.sBuffer; }
74     }
75 
76     RMISkeleton!T skeleton;
77 
78     void receive(string t, RMICall call)
79     {
80         import std.range : repeat;
81         import std.algorithm : joiner;
82 
83         version (drmi_verbose) .infof("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func);
84         auto res = skeleton.process(call);
85         publish(name ~ RES_ROOM ~ call.caller, res, defaultQoS);
86         version (drmi_verbose) .infof("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts);
87     }
88 
89     class CliCom : RMIStubCom
90     {
91         import std.exception : enforce;
92         import std.conv : text;
93         import std..string;
94 
95         alias rhash_t = ubyte[28];
96 
97         string target, reqbus;
98         RMIResponse[rhash_t] responses;
99         rhash_t[rhash_t] waitList;
100 
101         this(string target)
102         {
103             this.target = target;
104             this.reqbus = target ~ REQ_ROOM;
105         }
106 
107         rhash_t calcHash(RMICall call) @nogc
108         {
109             import std.digest.sha;
110             auto r1 = sha224Of(call.func);
111             auto r2 = sha224Of(cast(ulong[1])[call.ts]);
112             r1[] += r2[];
113             return r1;
114         }
115 
116         void receive(string t, RMIResponse r)
117         {
118             if (r.call.caller != caller)
119             {
120                 .errorf("unexpected response for %s in bus for %s", r.call.caller, caller);
121                 return;
122             }
123 
124             version (drmi_verbose) .infof("[%s]  in %s %s", cts, r.call.ts, r.call.func);
125             auto ch = calcHash(r.call);
126 
127             if (ch !in waitList)
128             {
129                 .errorf("unexpected %s for calls: %s", r, waitList.keys);
130                 return;
131             }
132 
133             enforce(ch !in responses, format("internal error: unexpect having result in res for %s", ch));
134             responses[ch] = r;
135             waitList.remove(ch);
136         }
137 
138         override string caller() const @property { return name; }
139 
140         override RMIResponse process(RMICall call)
141         {
142             while (waitList.length >= maxWaitResponses)
143                 this.outer.sleep(waitSleepStep * 10);
144             auto ch = calcHash(call);
145             waitList[ch] = ch;
146             version (drmi_verbose) .infof("[%s] out %s %s", cts, call.ts, call.func);
147             publish(reqbus, call, defaultQoS);
148             auto tm = StopWatch(AutoStart.yes);
149             while (ch in waitList)
150             {
151                 if (cast(Duration)tm.peek > waitTime)
152                 {
153                     version (drmi_verbose) .infof("[%s] ### %s %s", cts, call.ts, call.func);
154                     waitList.remove(ch);
155                     throw new RMITimeoutException(call);
156                 }
157                 else this.outer.sleep(waitSleepStep);
158             }
159             enforce(ch in responses, format("internal error: no result then not wait and not except for %s", ch));
160             auto r = responses[ch];
161             responses.remove(ch);
162             return r;
163         }
164     }
165 
166 public:
167 
168     this(Transport t, T serv, string uniqName="", void delegate(Duration) sf=null,
169             Duration waitTime=30.seconds, Duration waitSleepStep=1.msecs, size_t maxWaitResponses=10)
170     {
171         tport = enforce(t, "transport is null");
172         name = rmiPSClientName!T(uniqName);
173         tport.init(name);
174 
175         defaultQoS = QoS.l2;
176         this.waitTime = waitTime;
177         this.waitSleepStep = waitSleepStep;
178         this.maxWaitResponses = maxWaitResponses;
179 
180         skeleton = new RMISkeleton!T(serv);
181 
182         subscribe(name~REQ_ROOM, &this.receive);
183     }
184 
185     this(Transport t, T serv, void delegate(Duration) sf=null)
186     { this(t, serv, "", sf); }
187 
188     void publish(V)(string topic, V val, QoS qos=QoS.undefined)
189         if (!is(Unqual!T == ubyte[]))
190     {
191         sBuffer.clear();
192         sBuffer.sbinSerialize(val);
193         publish(topic, sBuffer.data, qos);
194     }
195 
196     void publish(string topic, const(ubyte)[] data, QoS qos=QoS.undefined)
197     {
198         if (qos == QoS.undefined) qos = defaultQoS;
199         tport.publish(topic, data, qos);
200     }
201 
202     Broadcaster getBroadcaster(string topic) { return new BCaster(topic); }
203 
204     void subscribe(string topic, void delegate(string, const(ubyte)[]) dlg, QoS qos=QoS.undefined)
205     { tport.subscribe(topic, dlg, qos==QoS.undefined ? defaultQoS : qos); }
206 
207     void subscribe(V)(string bus, void delegate(string, V) dlg, QoS qos=QoS.undefined)
208         if (!is(V == const(ubyte)[]))
209     {
210         subscribe(bus, (string t, const(ubyte)[] data)
211         {
212             V bm = void;
213             // for accurance exception handling
214             bool converted = false;
215             try // catch exceptions only while deserialization
216             {
217                 bm = data.sbinDeserialize!V;
218                 converted = true;
219             }
220             catch (Exception e)
221                 // vibe.data.json.deserializeJson has no throwable exception
222                 // list in documentation
223                 .errorf("error while parse %s: %s", V.stringof, e.msg);
224 
225             // if all is ok call delegate
226             if (converted) dlg(t, bm);
227         }, qos);
228     }
229 
230     RMIStub!X getClient(X)(string uniqName="")
231     {
232         auto cn = rmiPSClientName!X(uniqName);
233         auto clicom = new CliCom(cn);
234         subscribe(cn ~ RES_ROOM ~ name, &clicom.receive);
235         return new RMIStub!X(clicom);
236     }
237 
238     void connect() { tport.connect(); }
239 
240     bool connected() { return tport.connected(); }
241 }
242 
243 private long cts()()
244 {
245     import std.datetime;
246     return Clock.currStdTime;
247 }