1 module drmi.mqtt.accessor; 2 3 import mqttd; 4 import vibe.data.json; 5 import vibe.core.log; 6 import vibe.core.core; 7 8 import drmi; 9 10 import drmi.mqtt.types; 11 import drmi.mqtt.subscriber; 12 13 import std.datetime; 14 import std.string; 15 16 enum REQ_ROOM = "/request"; 17 enum RES_ROOM = "/response/"; 18 19 interface Broadcaster 20 { 21 void pub(BusData, QoSLevel q=QoSLevel.Reserved); 22 23 final void pub(T)(T val, string type="", QoSLevel q=QoSLevel.Reserved) if (!is(T == BData)) 24 { this.pub(BusData(val, type), q); } 25 } 26 27 string cliName(T)(string un) 28 { return un.length ? format("%s_%s", T.stringof, un) : T.stringof; } 29 30 class Accessor(T) 31 { 32 protected: 33 Subscriber sub; 34 MqttClient pub; 35 36 QoSLevel qos; 37 38 Duration waitTime; 39 size_t maxWaitResponses; 40 41 string name; 42 43 void publish(V)(string topic, V val, QoSLevel q=QoSLevel.Reserved) 44 { 45 if (q == QoSLevel.Reserved) q = qos; 46 pub.publish(topic, val.serializeToPrettyJson, q); 47 } 48 49 class BCaster : Broadcaster 50 { 51 string bus; 52 this(string bus) { this.bus = bus; } 53 override void pub(BusData d, QoSLevel q=QoSLevel.Reserved) 54 { publish(bus, d, q); } 55 } 56 57 RMISkeleton!T skeleton; 58 59 void receive(string t, BusData msg) 60 { 61 if (msg.type == RMICall.stringof) 62 { 63 import std.range : repeat; 64 import std.algorithm : joiner; 65 auto call = msg.as!RMICall; 66 logDebug("[%s] *** %s %s %s", cts, call.caller, call.ts, call.func); 67 auto res = skeleton.process(call); 68 publish(name ~ RES_ROOM ~ call.caller, BusData(res), qos); 69 logDebug("[%s] === %s %s", cts, " ".repeat(call.caller.length).joiner(""), call.ts); 70 } 71 else logWarn("unexpected request message: %s", msg); 72 } 73 74 class CliCom : RMIStubCom 75 { 76 import std.exception : enforce; 77 import std.conv : text; 78 import std.string; 79 80 string target, reqbus; 81 RMIResponse[string] res; 82 string[string] waitList; 83 84 Duration sleepStep; 85 86 this(string target) 87 { 88 this.target = target; 89 this.reqbus = target ~ REQ_ROOM; 90 sleepStep = 1.msecs; 91 } 92 93 string callHash(RMICall call) 94 { return format("%s:%s", call.func, call.ts); } 95 96 void receive(string t, BusData msg) 97 { 98 if (msg.type == RMIResponse.stringof) 99 { 100 auto r = msg.as!RMIResponse; 101 if (r.call.caller != caller) 102 { 103 logError("unexpected response for %s in bus for %s", r.call.caller, caller); 104 return; 105 } 106 logDebug("[%s] in %s %s", cts, r.call.ts, r.call.func); 107 auto ch = callHash(r.call); 108 if (ch in waitList) 109 { 110 enforce(ch !in res, format("internal error: unexpect having result in res for %s", ch)); 111 res[ch] = r; 112 waitList.remove(ch); 113 } 114 else logTrace("unexpected %s for calls: %s", r, waitList.keys); 115 } 116 else logWarn("unexpected request message: %s", msg); 117 } 118 119 override string caller() const @property { return name; } 120 121 override RMIResponse process(RMICall call) 122 { 123 while (waitList.length >= maxWaitResponses) sleep(sleepStep * 10); 124 auto ch = callHash(call); 125 waitList[ch] = ch; 126 logDebug("[%s] out %s %s", cts, call.ts, call.func); 127 publish(reqbus, BusData(call)); 128 auto tm = StopWatch(AutoStart.yes); 129 while (ch in waitList) 130 { 131 if (cast(Duration)tm.peek > waitTime) 132 { 133 logDebug("[%s] ### %s %s", cts, call.ts, call.func); 134 waitList.remove(ch); 135 throw new RMITimeoutException(call); 136 } 137 else sleep(sleepStep); 138 } 139 enforce(ch in res, format("internal error: no result then not wait and not except for %s", ch)); 140 auto r = res[ch]; 141 res.remove(ch); 142 return r; 143 } 144 } 145 146 public: 147 148 this(T serv, string uniqName="") 149 { 150 name = cliName!T(uniqName); 151 qos = QoSLevel.QoS2; 152 waitTime = 5.minutes; 153 maxWaitResponses = 100; 154 155 skeleton = new RMISkeleton!T(serv); 156 157 Settings pset, sset; 158 pset.clientId = name ~ ".pub"; 159 sset.clientId = name ~ ".sub"; 160 161 pub = new MqttClient(pset); 162 sub = new Subscriber(sset, qos); 163 164 subscribe(name~REQ_ROOM, &this.receive); 165 } 166 167 Broadcaster getBroadcaster(string bus) { return new BCaster(bus); } 168 169 void subscribe(string bus, void delegate(string, const(ubyte)[]) dlg) 170 { sub.subscribe(bus, dlg); } 171 172 void subscribe(string bus, void delegate(string, Json) dlg) 173 { 174 subscribe(bus, (string t, const(ubyte)[] data) 175 { 176 auto j = Json.undefined; 177 try j = (cast(string)data).parseJsonString; 178 catch (JSONException e) 179 logError("error while parsing json msg: ", e.msg); 180 if (j != Json.undefined) dlg(t, j); 181 else logWarn("parsed json is undefined, don't call dlg"); 182 }); 183 } 184 185 void subscribe(string bus, void delegate(string, BusData bm) dlg) 186 { 187 subscribe(bus, (string t, Json j) 188 { 189 BusData bm; 190 // for accurance exception handling 191 bool converted = false; 192 try // catch exceptions only while deserialization 193 { 194 bm = j.deserializeJson!BusData; 195 converted = true; 196 } 197 catch (Exception e) 198 // vibe.data.json.deserializeJson has no throwable exception 199 // list in documentation 200 logError("error while convert json to BusMessage: ", e.msg); 201 202 // if all is ok call delegate 203 if (converted) dlg(t, bm); 204 }); 205 } 206 207 RMIStub!X getClient(X)(string uniqName="") 208 { 209 auto cn = cliName!X(uniqName); 210 auto clicom = new CliCom(cn); 211 subscribe(cn ~ RES_ROOM ~ name, &clicom.receive); 212 return new RMIStub!X(clicom); 213 } 214 215 void connect() 216 { 217 pub.connect(); 218 sub.connect(); 219 } 220 } 221 222 private long cts()() 223 { 224 import std.datetime; 225 return Clock.currStdTime; 226 }