1 /// 2 module drmi.ps.accessor; 3 4 import drmi.core; 5 6 import drmi.ps.helpers; 7 import drmi.ps.iface; 8 9 import std.datetime; 10 import std..string; 11 import std.array : appender, Appender; 12 import std.exception : enforce; 13 import std.experimental.logger; 14 15 enum REQ_ROOM = "/request"; 16 enum RES_ROOM = "/response/"; 17 18 /// 19 interface Broadcaster 20 { 21 /// 22 void publish(const(ubyte)[], QoS qos=QoS.undefined); 23 24 /// 25 final void publish(T)(T val, QoS qos=QoS.undefined) 26 if (!is(Unqual!T == ubyte[])) 27 { 28 auto buf = localAppender(); 29 buf.clear(); 30 val.sbinSerialize(buf); 31 this.publish(buf.data, qos); 32 } 33 34 protected ref Appender!(ubyte[]) localAppender() @property; 35 } 36 37 /// 38 class Accessor(T) 39 { 40 protected: 41 Transport tport; 42 43 QoS defaultQoS; 44 45 Duration waitTime; 46 Duration waitSleepStep; 47 size_t maxWaitResponses; 48 49 string name; 50 51 auto sBuffer = appender!(ubyte[]); 52 53 void delegate(Duration d) sleepFunc; 54 55 void sleep(Duration t) 56 { 57 import core.thread; 58 if (sleepFunc !is null) sleepFunc(t); 59 else 60 { 61 if (auto f = Fiber.getThis()) f.yield(); 62 else Thread.sleep(t); 63 } 64 } 65 66 void publish(V)(string topic, V val, QoS qos=QoS.undefined) 67 if (!is(Unqual!T == ubyte[])) 68 { 69 sBuffer.clear(); 70 sBuffer.sbinSerialize(val); 71 publish(topic, sBuffer.data, qos); 72 } 73 74 void publish(string topic, const(ubyte)[] data, QoS qos=QoS.undefined) 75 { 76 if (qos == QoS.undefined) qos = defaultQoS; 77 tport.publish(topic, data, qos); 78 } 79 80 class BCaster : Broadcaster 81 { 82 string topic; 83 this(string t) { topic = t; } 84 override void publish(const(ubyte)[] data, QoS qos=QoS.undefined) 85 { this.outer.publish(topic, data, qos); } 86 protected override ref Appender!(ubyte[]) localAppender() @property 87 { return this.outer.sBuffer; } 88 } 89 90 RMISkeleton!T skeleton; 91 92 void receive(string t, RMICall call) 93 { 94 import std.range : repeat; 95 import std.algorithm : joiner; 96 97 version (drmi_verbose) .infof("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func); 98 auto res = skeleton.process(call); 99 publish(name ~ RES_ROOM ~ call.caller, res, defaultQoS); 100 version (drmi_verbose) .infof("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts); 101 } 102 103 class CliCom : RMIStubCom 104 { 105 import std.exception : enforce; 106 import std.conv : text; 107 import std..string; 108 109 alias rhash_t = ubyte[28]; 110 111 string target, reqbus; 112 RMIResponse[rhash_t] responses; 113 rhash_t[rhash_t] waitList; 114 115 this(string target) 116 { 117 this.target = target; 118 this.reqbus = target ~ REQ_ROOM; 119 } 120 121 rhash_t calcHash(RMICall call) @nogc 122 { 123 import std.digest.sha; 124 auto r1 = sha224Of(call.func); 125 auto r2 = sha224Of(cast(ulong[1])[call.ts]); 126 r1[] += r2[]; 127 return r1; 128 } 129 130 void receive(string t, RMIResponse r) 131 { 132 if (r.call.caller != caller) 133 { 134 .errorf("unexpected response for %s in bus for %s", r.call.caller, caller); 135 return; 136 } 137 138 version (drmi_verbose) .infof("[%s] in %s %s", cts, r.call.ts, r.call.func); 139 auto ch = calcHash(r.call); 140 141 if (ch !in waitList) 142 { 143 .errorf("unexpected %s for calls: %s", r, waitList.keys); 144 return; 145 } 146 147 enforce(ch !in responses, format("internal error: unexpect having result in res for %s", ch)); 148 responses[ch] = r; 149 waitList.remove(ch); 150 } 151 152 override string caller() const @property { return name; } 153 154 override RMIResponse process(RMICall call) 155 { 156 while (waitList.length >= maxWaitResponses) 157 this.outer.sleep(waitSleepStep * 10); 158 auto ch = calcHash(call); 159 waitList[ch] = ch; 160 version (drmi_verbose) .infof("[%s] out %s %s", cts, call.ts, call.func); 161 publish(reqbus, call, defaultQoS); 162 auto tm = StopWatch(AutoStart.yes); 163 while (ch in waitList) 164 { 165 if (cast(Duration)tm.peek > waitTime) 166 { 167 version (drmi_verbose) .infof("[%s] ### %s %s", cts, call.ts, call.func); 168 waitList.remove(ch); 169 throw new RMITimeoutException(call); 170 } 171 else this.outer.sleep(waitSleepStep); 172 } 173 enforce(ch in responses, format("internal error: no result then not wait and not except for %s", ch)); 174 auto r = responses[ch]; 175 responses.remove(ch); 176 return r; 177 } 178 } 179 180 public: 181 182 this(Transport t, T serv, string uniqName="", void delegate(Duration) sf=null, 183 Duration waitTime=30.seconds, Duration waitSleepStep=1.msecs, size_t maxWaitResponses=10) 184 { 185 tport = enforce(t, "transport is null"); 186 name = rmiPSClientName!T(uniqName); 187 tport.init(name); 188 189 defaultQoS = QoS.l2; 190 this.waitTime = waitTime; 191 this.waitSleepStep = waitSleepStep; 192 this.maxWaitResponses = maxWaitResponses; 193 194 skeleton = new RMISkeleton!T(serv); 195 196 subscribe(name~REQ_ROOM, &this.receive); 197 } 198 199 this(Transport t, T serv, void delegate(Duration) sf=null) 200 { this(t, serv, "", sf); } 201 202 Broadcaster getBroadcaster(string topic) { return new BCaster(topic); } 203 204 void subscribe(string topic, void delegate(string, const(ubyte)[]) dlg, QoS qos=QoS.undefined) 205 { tport.subscribe(topic, dlg, qos==QoS.undefined ? defaultQoS : qos); } 206 207 void subscribe(V)(string bus, void delegate(string, V) dlg, QoS qos=QoS.undefined) 208 if (!is(V == const(ubyte)[])) 209 { 210 subscribe(bus, (string t, const(ubyte)[] data) 211 { 212 V bm = void; 213 // for accurance exception handling 214 bool converted = false; 215 try // catch exceptions only while deserialization 216 { 217 bm = data.sbinDeserialize!V; 218 converted = true; 219 } 220 catch (Exception e) 221 // vibe.data.json.deserializeJson has no throwable exception 222 // list in documentation 223 .errorf("error while parse %s: %s", V.stringof, e.msg); 224 225 // if all is ok call delegate 226 if (converted) dlg(t, bm); 227 }, qos); 228 } 229 230 RMIStub!X getClient(X)(string uniqName="") 231 { 232 auto cn = rmiPSClientName!X(uniqName); 233 auto clicom = new CliCom(cn); 234 subscribe(cn ~ RES_ROOM ~ name, &clicom.receive); 235 return new RMIStub!X(clicom); 236 } 237 238 void connect() { tport.connect(); } 239 240 bool connected() { return tport.connected(); } 241 } 242 243 private long cts()() 244 { 245 import std.datetime; 246 return Clock.currStdTime; 247 }