1 ///
2 module drmi.ps.accessor;
3 
4 import drmi.exceptions;
5 import drmi.types;
6 import drmi.base;
7 
8 import drmi.ps.helpers;
9 import drmi.ps.iface;
10 
11 import std.datetime;
12 import std.string;
13 import std.exception : enforce;
14 import std.experimental.logger;
15 
16 enum REQ_ROOM = "/request";
17 enum RES_ROOM = "/response/";
18 
19 ///
20 interface Broadcaster
21 {
22     ///
23     void publish(const(ubyte)[], Rel lvl=Rel.undefined);
24 
25     ///
26     final void publish(T)(T val, Rel lvl=Rel.undefined)
27         if (!is(Unqual!T == ubyte[]))
28     { this.publish(val.sbinSerialize, lvl); }
29 }
30 
31 ///
32 class Accessor(T)
33 {
34 protected:
35     Transport ll;
36 
37     Rel defaultRel;
38 
39     Duration waitTime;
40     Duration waitSleepStep;
41     size_t maxWaitResponses;
42 
43     string name;
44 
45     void delegate(Duration d) sleepFunc;
46 
47     void sleep(Duration t)
48     {
49         import core.thread;
50         if (sleepFunc !is null) sleepFunc(t);
51         else
52         {
53             if (auto f = Fiber.getThis()) f.yield();
54             else Thread.sleep(t);
55         }
56     }
57 
58     void publish(V)(string topic, V val, Rel lvl=Rel.undefined)
59         if (!is(Unqual!T == ubyte[]))
60     { publish(topic, val.sbinSerialize, lvl); }
61 
62     void publish(string topic, const(ubyte)[] data, Rel lvl=Rel.undefined)
63     {
64         if (lvl == Rel.undefined) lvl = defaultRel;
65         ll.publish(topic, data, lvl);
66     }
67 
68     class BCaster : Broadcaster
69     {
70         string topic;
71         this(string t) { topic = t; }
72         override void publish(const(ubyte)[] data, Rel lvl=Rel.undefined)
73         { this.outer.publish(topic, data, lvl); }
74     }
75 
76     RMISkeleton!T skeleton;
77 
78     void receive(string t, RMICall call)
79     {
80         import std.range : repeat;
81         import std.algorithm : joiner;
82 
83         .infof("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func);
84         auto res = skeleton.process(call);
85         publish(name ~ RES_ROOM ~ call.caller, res, defaultRel);
86         .infof("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts);
87     }
88 
89     class CliCom : RMIStubCom
90     {
91         import std.exception : enforce;
92         import std.conv : text;
93         import std.string;
94 
95         string target, reqbus;
96         RMIResponse[string] responses;
97         string[string] waitList;
98 
99         this(string target)
100         {
101             this.target = target;
102             this.reqbus = target ~ REQ_ROOM;
103         }
104 
105         string calcHash(RMICall call)
106         { return format("%s:%s", call.func, call.ts); }
107 
108         void receive(string t, RMIResponse r)
109         {
110             if (r.call.caller != caller)
111             {
112                 .errorf("unexpected response for %s in bus for %s", r.call.caller, caller);
113                 return;
114             }
115             .infof("[%s]  in %s %s", cts, r.call.ts, r.call.func);
116             auto ch = calcHash(r.call);
117             if (ch in waitList)
118             {
119                 enforce(ch !in responses, format("internal error: unexpect having result in res for %s", ch));
120                 responses[ch] = r;
121                 waitList.remove(ch);
122             }
123             else .errorf("unexpected %s for calls: %s", r, waitList.keys);
124         }
125 
126         override string caller() const @property { return name; }
127 
128         override RMIResponse process(RMICall call)
129         {
130             while (waitList.length >= maxWaitResponses)
131                 this.outer.sleep(waitSleepStep * 10);
132             auto ch = calcHash(call);
133             waitList[ch] = ch;
134             .infof("[%s] out %s %s", cts, call.ts, call.func);
135             publish(reqbus, call, defaultRel);
136             auto tm = StopWatch(AutoStart.yes);
137             while (ch in waitList)
138             {
139                 if (cast(Duration)tm.peek > waitTime)
140                 {
141                     .infof("[%s] ### %s %s", cts, call.ts, call.func);
142                     waitList.remove(ch);
143                     throw new RMITimeoutException(call);
144                 }
145                 else this.outer.sleep(waitSleepStep);
146             }
147             enforce(ch in responses, format("internal error: no result then not wait and not except for %s", ch));
148             auto r = responses[ch];
149             responses.remove(ch);
150             return r;
151         }
152     }
153 
154 public:
155 
156     this(Transport t, T serv, string uniqName="", void delegate(Duration) sf=null,
157             Duration waitTime=30.seconds, Duration waitSleepStep=1.msecs, size_t maxWaitResponses=10)
158     {
159         ll = enforce(t, "transport is null");
160         name = rmiPSClientName!T(uniqName);
161         ll.init(name);
162 
163         defaultRel = Rel.level2;
164         this.waitTime = waitTime;
165         this.waitSleepStep = waitSleepStep;
166         this.maxWaitResponses = maxWaitResponses;
167 
168         skeleton = new RMISkeleton!T(serv);
169 
170         subscribe(name~REQ_ROOM, &this.receive);
171     }
172 
173     this(Transport t, T serv, void delegate(Duration) sf=null)
174     {
175         this(t, serv, "", sf);
176     }
177 
178     Broadcaster getBroadcaster(string topic) { return new BCaster(topic); }
179 
180     void subscribe(string topic, void delegate(string, const(ubyte)[]) dlg)
181     { ll.subscribe(topic, dlg, defaultRel); }
182 
183     void subscribe(V)(string bus, void delegate(string, V) dlg)
184         if (!is(V == const(ubyte)[]))
185     {
186         subscribe(bus, (string t, const(ubyte)[] data)
187         {
188             V bm = void;
189             // for accurance exception handling
190             bool converted = false;
191             try // catch exceptions only while deserialization
192             {
193                 bm = data.sbinDeserialize!V;
194                 converted = true;
195             }
196             catch (Exception e)
197                 // vibe.data.json.deserializeJson has no throwable exception
198                 // list in documentation
199                 .errorf("error while parse %s: %s", V.stringof, e.msg);
200 
201             // if all is ok call delegate
202             if (converted) dlg(t, bm);
203         });
204     }
205 
206     RMIStub!X getClient(X)(string uniqName="")
207     {
208         auto cn = rmiPSClientName!X(uniqName);
209         auto clicom = new CliCom(cn);
210         subscribe(cn ~ RES_ROOM ~ name, &clicom.receive);
211         return new RMIStub!X(clicom);
212     }
213 
214     void connect() { ll.connect(); }
215 }
216 
217 private long cts()()
218 {
219     import std.datetime;
220     return Clock.currStdTime;
221 }