1 /// 2 module drmi.ps.accessor; 3 4 import drmi.exceptions; 5 import drmi.types; 6 import drmi.base; 7 8 import drmi.ps.helpers; 9 import drmi.ps.iface; 10 11 import std.datetime; 12 import std.string; 13 import std.exception : enforce; 14 15 public import vibe.data.json : Json; 16 import vibe.data.json; 17 import vibe.core.core; 18 import vibe.core.log; 19 20 enum REQ_ROOM = "/request"; 21 enum RES_ROOM = "/response/"; 22 23 /// 24 interface Broadcaster 25 { 26 /// 27 void publish(string, Rel lvl=Rel.undefined); 28 29 /// 30 final void publish(Json j, Rel lvl=Rel.undefined) 31 { this.publish(j.toPrettyString, lvl); } 32 /// 33 void publish(T)(T val, Rel lvl=Rel.undefined) 34 if (!is(T == Json)) 35 { this.publish(val.serializeToPrettyJson, lvl); } 36 } 37 38 /// 39 class Accessor(T) 40 { 41 protected: 42 Transport ll; 43 44 Rel defaultRel; 45 46 Duration waitTime; 47 Duration waitSleepStep; 48 size_t maxWaitResponses; 49 50 string name; 51 52 void publish(V)(string topic, V val, Rel lvl=Rel.undefined) 53 if (!is(V == Json)) 54 { publish(topic, val.serializeToJson, lvl); } 55 56 void publish(string topic, Json val, Rel lvl=Rel.undefined) 57 { publish(topic, val.toPrettyString, lvl); } 58 59 void publish(string topic, string data, Rel lvl=Rel.undefined) 60 { 61 if (lvl == Rel.undefined) lvl = defaultRel; 62 ll.publish(topic, cast(const(ubyte)[])data, lvl); 63 } 64 65 class BCaster : Broadcaster 66 { 67 string topic; 68 this(string t) { topic = t; } 69 override void publish(string s, Rel lvl=Rel.undefined) 70 { this.outer.publish(topic, s, lvl); } 71 } 72 73 RMISkeleton!T skeleton; 74 75 void receive(string t, Json msg) 76 { 77 RMICall call = void; 78 try call = msg.deserializeJson!RMICall; 79 catch (Throwable e) 80 { 81 logError("error in parse request: %s", e.msg); 82 return; 83 } 84 import std.range : repeat; 85 import std.algorithm : joiner; 86 87 logDebug("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func); 88 auto res = skeleton.process(call); 89 publish(name ~ RES_ROOM ~ call.caller, res, defaultRel); 90 logDebug("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts); 91 } 92 93 class CliCom : RMIStubCom 94 { 95 import std.exception : enforce; 96 import std.conv : text; 97 import std.string; 98 99 string target, reqbus; 100 RMIResponse[string] responses; 101 string[string] waitList; 102 103 this(string target) 104 { 105 this.target = target; 106 this.reqbus = target ~ REQ_ROOM; 107 } 108 109 string calcHash(RMICall call) 110 { return format("%s:%s", call.func, call.ts); } 111 112 void receive(string t, Json msg) 113 { 114 RMIResponse r = void; 115 try r = msg.deserializeJson!RMIResponse; 116 catch (Throwable e) 117 { 118 logError("unexpected request message: %s (%s)", msg, e.msg); 119 return; 120 } 121 if (r.call.caller != caller) 122 { 123 logError("unexpected response for %s in bus for %s", r.call.caller, caller); 124 return; 125 } 126 logDebug("[%s] in %s %s", cts, r.call.ts, r.call.func); 127 auto ch = calcHash(r.call); 128 if (ch in waitList) 129 { 130 enforce(ch !in responses, format("internal error: unexpect having result in res for %s", ch)); 131 responses[ch] = r; 132 waitList.remove(ch); 133 } 134 else logError("unexpected %s for calls: %s", r, waitList.keys); 135 } 136 137 override string caller() const @property { return name; } 138 139 override RMIResponse process(RMICall call) 140 { 141 while (waitList.length >= maxWaitResponses) 142 sleep(waitSleepStep * 10); 143 auto ch = calcHash(call); 144 waitList[ch] = ch; 145 logDebug("[%s] out %s %s", cts, call.ts, call.func); 146 publish(reqbus, call, defaultRel); 147 auto tm = StopWatch(AutoStart.yes); 148 while (ch in waitList) 149 { 150 if (cast(Duration)tm.peek > waitTime) 151 { 152 logDebug("[%s] ### %s %s", cts, call.ts, call.func); 153 waitList.remove(ch); 154 throw new RMITimeoutException(call); 155 } 156 else sleep(waitSleepStep); 157 } 158 enforce(ch in responses, format("internal error: no result then not wait and not except for %s", ch)); 159 auto r = responses[ch]; 160 responses.remove(ch); 161 return r; 162 } 163 } 164 165 public: 166 167 this(Transport t, T serv, string uniqName="", 168 Duration waitTime=30.seconds, size_t maxWaitResponses=10) 169 { 170 ll = enforce(t, "transport is null"); 171 name = rmiPSClientName!T(uniqName); 172 ll.init(name); 173 174 defaultRel = Rel.level2; 175 this.waitTime = waitTime; 176 waitSleepStep = 1.msecs; 177 this.maxWaitResponses = maxWaitResponses; 178 179 skeleton = new RMISkeleton!T(serv); 180 181 subscribe(name~REQ_ROOM, &this.receive); 182 } 183 184 Broadcaster getBroadcaster(string topic) { return new BCaster(topic); } 185 186 void subscribe(string topic, void delegate(string, const(ubyte)[]) dlg) 187 { ll.subscribe(topic, dlg, defaultRel); } 188 189 void subscribe(string bus, void delegate(string, Json) dlg) 190 { 191 subscribe(bus, (string t, const(ubyte)[] data) 192 { 193 auto j = Json.undefined; 194 try j = (cast(string)data).parseJsonString; 195 catch (JSONException e) 196 logError("error while parsing json msg: ", e.msg); 197 if (j != Json.undefined) dlg(t, j); 198 else logWarn("parsed json is undefined, don't call dlg"); 199 }); 200 } 201 202 void subscribe(V)(string bus, void delegate(string, V) dlg) 203 if (!is(V == Json) && !is(V == const(ubyte)[])) 204 { 205 subscribe(bus, (string t, const(ubyte)[] data) 206 { 207 V bm = void; 208 // for accurance exception handling 209 bool converted = false; 210 try // catch exceptions only while deserialization 211 { 212 bm = (cast(string)data) 213 .parseJsonString 214 .deserializeJson!V; 215 converted = true; 216 } 217 catch (Exception e) 218 // vibe.data.json.deserializeJson has no throwable exception 219 // list in documentation 220 logError("error while parse %s: %s", V.stringof, e.msg); 221 222 // if all is ok call delegate 223 if (converted) dlg(t, bm); 224 }); 225 } 226 227 RMIStub!X getClient(X)(string uniqName="") 228 { 229 auto cn = rmiPSClientName!X(uniqName); 230 auto clicom = new CliCom(cn); 231 subscribe(cn ~ RES_ROOM ~ name, &clicom.receive); 232 return new RMIStub!X(clicom); 233 } 234 235 void connect() { ll.connect(); } 236 } 237 238 private long cts()() 239 { 240 import std.datetime; 241 return Clock.currStdTime; 242 }