1 /// 2 module drmi.ps.accessor; 3 4 import drmi.exceptions; 5 import drmi.types; 6 import drmi.base; 7 8 import drmi.ps.helpers; 9 import drmi.ps.iface; 10 11 import std.datetime; 12 import std.string; 13 import std.exception : enforce; 14 import std.experimental.logger; 15 16 enum REQ_ROOM = "/request"; 17 enum RES_ROOM = "/response/"; 18 19 /// 20 interface Broadcaster 21 { 22 /// 23 void publish(const(ubyte)[], Rel lvl=Rel.undefined); 24 25 /// 26 final void publish(T)(T val, Rel lvl=Rel.undefined) 27 if (!is(Unqual!T == ubyte[])) 28 { this.publish(val.sbinSerialize, lvl); } 29 } 30 31 /// 32 class Accessor(T) 33 { 34 protected: 35 Transport ll; 36 37 Rel defaultRel; 38 39 Duration waitTime; 40 Duration waitSleepStep; 41 size_t maxWaitResponses; 42 43 string name; 44 45 void delegate(Duration d) sleepFunc; 46 47 void sleep(Duration t) 48 { 49 import core.thread; 50 if (sleepFunc !is null) sleepFunc(t); 51 else 52 { 53 if (auto f = Fiber.getThis()) f.yield(); 54 else Thread.sleep(t); 55 } 56 } 57 58 void publish(V)(string topic, V val, Rel lvl=Rel.undefined) 59 if (!is(Unqual!T == ubyte[])) 60 { publish(topic, val.sbinSerialize, lvl); } 61 62 void publish(string topic, const(ubyte)[] data, Rel lvl=Rel.undefined) 63 { 64 if (lvl == Rel.undefined) lvl = defaultRel; 65 ll.publish(topic, data, lvl); 66 } 67 68 class BCaster : Broadcaster 69 { 70 string topic; 71 this(string t) { topic = t; } 72 override void publish(const(ubyte)[] data, Rel lvl=Rel.undefined) 73 { this.outer.publish(topic, data, lvl); } 74 } 75 76 RMISkeleton!T skeleton; 77 78 void receive(string t, RMICall call) 79 { 80 import std.range : repeat; 81 import std.algorithm : joiner; 82 83 .infof("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func); 84 auto res = skeleton.process(call); 85 publish(name ~ RES_ROOM ~ call.caller, res, defaultRel); 86 .infof("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts); 87 } 88 89 class CliCom : RMIStubCom 90 { 91 import std.exception : enforce; 92 import std.conv : text; 93 import std.string; 94 95 string target, reqbus; 96 RMIResponse[string] responses; 97 string[string] waitList; 98 99 this(string target) 100 { 101 this.target = target; 102 this.reqbus = target ~ REQ_ROOM; 103 } 104 105 string calcHash(RMICall call) 106 { return format("%s:%s", call.func, call.ts); } 107 108 void receive(string t, RMIResponse r) 109 { 110 if (r.call.caller != caller) 111 { 112 .errorf("unexpected response for %s in bus for %s", r.call.caller, caller); 113 return; 114 } 115 .infof("[%s] in %s %s", cts, r.call.ts, r.call.func); 116 auto ch = calcHash(r.call); 117 if (ch in waitList) 118 { 119 enforce(ch !in responses, format("internal error: unexpect having result in res for %s", ch)); 120 responses[ch] = r; 121 waitList.remove(ch); 122 } 123 else .errorf("unexpected %s for calls: %s", r, waitList.keys); 124 } 125 126 override string caller() const @property { return name; } 127 128 override RMIResponse process(RMICall call) 129 { 130 while (waitList.length >= maxWaitResponses) 131 this.outer.sleep(waitSleepStep * 10); 132 auto ch = calcHash(call); 133 waitList[ch] = ch; 134 .infof("[%s] out %s %s", cts, call.ts, call.func); 135 publish(reqbus, call, defaultRel); 136 auto tm = StopWatch(AutoStart.yes); 137 while (ch in waitList) 138 { 139 if (cast(Duration)tm.peek > waitTime) 140 { 141 .infof("[%s] ### %s %s", cts, call.ts, call.func); 142 waitList.remove(ch); 143 throw new RMITimeoutException(call); 144 } 145 else this.outer.sleep(waitSleepStep); 146 } 147 enforce(ch in responses, format("internal error: no result then not wait and not except for %s", ch)); 148 auto r = responses[ch]; 149 responses.remove(ch); 150 return r; 151 } 152 } 153 154 public: 155 156 this(Transport t, T serv, string uniqName="", void delegate(Duration) sf=null, 157 Duration waitTime=30.seconds, Duration waitSleepStep=1.msecs, size_t maxWaitResponses=10) 158 { 159 ll = enforce(t, "transport is null"); 160 name = rmiPSClientName!T(uniqName); 161 ll.init(name); 162 163 defaultRel = Rel.level2; 164 this.waitTime = waitTime; 165 this.waitSleepStep = waitSleepStep; 166 this.maxWaitResponses = maxWaitResponses; 167 168 skeleton = new RMISkeleton!T(serv); 169 170 subscribe(name~REQ_ROOM, &this.receive); 171 } 172 173 this(Transport t, T serv, void delegate(Duration) sf=null) 174 { 175 this(t, serv, "", sf); 176 } 177 178 Broadcaster getBroadcaster(string topic) { return new BCaster(topic); } 179 180 void subscribe(string topic, void delegate(string, const(ubyte)[]) dlg) 181 { ll.subscribe(topic, dlg, defaultRel); } 182 183 void subscribe(V)(string bus, void delegate(string, V) dlg) 184 if (!is(V == const(ubyte)[])) 185 { 186 subscribe(bus, (string t, const(ubyte)[] data) 187 { 188 V bm = void; 189 // for accurance exception handling 190 bool converted = false; 191 try // catch exceptions only while deserialization 192 { 193 bm = data.sbinDeserialize!V; 194 converted = true; 195 } 196 catch (Exception e) 197 // vibe.data.json.deserializeJson has no throwable exception 198 // list in documentation 199 .errorf("error while parse %s: %s", V.stringof, e.msg); 200 201 // if all is ok call delegate 202 if (converted) dlg(t, bm); 203 }); 204 } 205 206 RMIStub!X getClient(X)(string uniqName="") 207 { 208 auto cn = rmiPSClientName!X(uniqName); 209 auto clicom = new CliCom(cn); 210 subscribe(cn ~ RES_ROOM ~ name, &clicom.receive); 211 return new RMIStub!X(clicom); 212 } 213 214 void connect() { ll.connect(); } 215 } 216 217 private long cts()() 218 { 219 import std.datetime; 220 return Clock.currStdTime; 221 }