1 /// 2 module drmi.ps.accessor; 3 4 import drmi.core; 5 6 import drmi.ps.helpers; 7 import drmi.ps.iface; 8 9 import std.datetime.stopwatch; 10 import std..string; 11 import std.array : appender, Appender; 12 import std.exception : enforce; 13 import std.experimental.logger; 14 15 enum REQ_ROOM = "/request"; 16 enum RES_ROOM = "/response/"; 17 18 /// 19 interface Broadcaster 20 { 21 /// 22 void publish(const(ubyte)[], QoS qos=QoS.undefined); 23 24 /// 25 final void publish(T)(T val, QoS qos=QoS.undefined) 26 if (!is(Unqual!T == ubyte[])) 27 { 28 auto buf = localAppender(); 29 buf.clear(); 30 buf.sbinSerialize(val); 31 this.publish(buf.data, qos); 32 } 33 34 protected ref Appender!(ubyte[]) localAppender() @property; 35 } 36 37 /// 38 class Accessor(T) 39 { 40 protected: 41 Transport tport; 42 43 QoS defaultQoS; 44 45 Duration waitTime; 46 Duration waitSleepStep; 47 size_t maxWaitResponses; 48 49 string name; 50 51 auto sBuffer = appender!(ubyte[]); 52 53 void delegate(Duration d) sleepFunc; 54 55 void sleep(Duration t) 56 { 57 import core.thread; 58 if (sleepFunc !is null) sleepFunc(t); 59 else 60 { 61 if (auto f = Fiber.getThis()) f.yield(); 62 else Thread.sleep(t); 63 } 64 } 65 66 class BCaster : Broadcaster 67 { 68 string topic; 69 this(string t) { topic = t; } 70 override void publish(const(ubyte)[] data, QoS qos=QoS.undefined) 71 { this.outer.publish(topic, data, qos); } 72 protected override ref Appender!(ubyte[]) localAppender() @property 73 { return this.outer.sBuffer; } 74 } 75 76 RMISkeleton!T skeleton; 77 78 void receive(string t, RMICall call) 79 { 80 import std.range : repeat; 81 import std.algorithm : joiner; 82 83 version (drmi_verbose) .infof("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func); 84 auto res = skeleton.process(call); 85 publish(name ~ RES_ROOM ~ call.caller, res, defaultQoS); 86 version (drmi_verbose) .infof("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts); 87 } 88 89 class CliCom : RMIStubCom 90 { 91 import std.exception : enforce; 92 import std.conv : text; 93 import std..string; 94 95 alias rhash_t = ubyte[28]; 96 97 string target, reqbus; 98 RMIResponse[rhash_t] responses; 99 rhash_t[rhash_t] waitList; 100 101 this(string target) 102 { 103 this.target = target; 104 this.reqbus = target ~ REQ_ROOM; 105 } 106 107 rhash_t calcHash(RMICall call) @nogc 108 { 109 import std.digest.sha; 110 auto r1 = sha224Of(call.func); 111 auto r2 = sha224Of(cast(ulong[1])[call.ts]); 112 r1[] += r2[]; 113 return r1; 114 } 115 116 void receive(string t, RMIResponse r) 117 { 118 if (r.call.caller != caller) 119 { 120 .errorf("unexpected response for %s in bus for %s", r.call.caller, caller); 121 return; 122 } 123 124 version (drmi_verbose) .infof("[%s] in %s %s", cts, r.call.ts, r.call.func); 125 auto ch = calcHash(r.call); 126 127 if (ch !in waitList) 128 { 129 .errorf("unexpected %s for calls: %s", r, waitList.keys); 130 return; 131 } 132 133 enforce(ch !in responses, format("internal error: unexpect having result in res for %s", ch)); 134 responses[ch] = r; 135 waitList.remove(ch); 136 } 137 138 override string caller() const @property { return name; } 139 140 override RMIResponse process(RMICall call) 141 { 142 while (waitList.length >= maxWaitResponses) 143 this.outer.sleep(waitSleepStep * 10); 144 auto ch = calcHash(call); 145 waitList[ch] = ch; 146 version (drmi_verbose) .infof("[%s] out %s %s", cts, call.ts, call.func); 147 publish(reqbus, call, defaultQoS); 148 auto tm = StopWatch(AutoStart.yes); 149 while (ch in waitList) 150 { 151 if (cast(Duration)tm.peek > waitTime) 152 { 153 version (drmi_verbose) .infof("[%s] ### %s %s", cts, call.ts, call.func); 154 waitList.remove(ch); 155 throw new RMITimeoutException(call); 156 } 157 else this.outer.sleep(waitSleepStep); 158 } 159 enforce(ch in responses, format("internal error: no result then not wait and not except for %s", ch)); 160 auto r = responses[ch]; 161 responses.remove(ch); 162 return r; 163 } 164 } 165 166 public: 167 168 this(Transport t, T serv, string uniqName="", void delegate(Duration) sf=null, 169 Duration waitTime=30.seconds, Duration waitSleepStep=1.msecs, size_t maxWaitResponses=10) 170 { 171 tport = enforce(t, "transport is null"); 172 name = rmiPSClientName!T(uniqName); 173 tport.init(name); 174 175 defaultQoS = QoS.l2; 176 this.waitTime = waitTime; 177 this.waitSleepStep = waitSleepStep; 178 this.maxWaitResponses = maxWaitResponses; 179 180 skeleton = new RMISkeleton!T(serv); 181 182 subscribe(name~REQ_ROOM, &this.receive); 183 } 184 185 this(Transport t, T serv, void delegate(Duration) sf=null) 186 { this(t, serv, "", sf); } 187 188 void publish(V)(string topic, V val, QoS qos=QoS.undefined) 189 if (!is(Unqual!T == ubyte[])) 190 { 191 sBuffer.clear(); 192 sBuffer.sbinSerialize(val); 193 publish(topic, sBuffer.data, qos); 194 } 195 196 void publish(string topic, const(ubyte)[] data, QoS qos=QoS.undefined) 197 { 198 if (qos == QoS.undefined) qos = defaultQoS; 199 tport.publish(topic, data, qos); 200 } 201 202 Broadcaster getBroadcaster(string topic) { return new BCaster(topic); } 203 204 void subscribe(string topic, void delegate(string, const(ubyte)[]) dlg, QoS qos=QoS.undefined) 205 { tport.subscribe(topic, dlg, qos==QoS.undefined ? defaultQoS : qos); } 206 207 void subscribe(V)(string bus, void delegate(string, V) dlg, QoS qos=QoS.undefined) 208 if (!is(V == const(ubyte)[])) 209 { 210 subscribe(bus, (string t, const(ubyte)[] data) 211 { 212 V bm = void; 213 // for accurance exception handling 214 bool converted = false; 215 try // catch exceptions only while deserialization 216 { 217 bm = data.sbinDeserialize!V; 218 converted = true; 219 } 220 catch (Exception e) 221 // vibe.data.json.deserializeJson has no throwable exception 222 // list in documentation 223 .errorf("error while parse %s: %s", V.stringof, e.msg); 224 225 // if all is ok call delegate 226 if (converted) dlg(t, bm); 227 }, qos); 228 } 229 230 RMIStub!X getClient(X)(string uniqName="") 231 { 232 auto cn = rmiPSClientName!X(uniqName); 233 auto clicom = new CliCom(cn); 234 subscribe(cn ~ RES_ROOM ~ name, &clicom.receive); 235 return new RMIStub!X(clicom); 236 } 237 238 void connect() { tport.connect(); } 239 240 bool connected() { return tport.connected(); } 241 } 242 243 private long cts()() 244 { 245 import std.datetime; 246 return Clock.currStdTime; 247 }