1 /// 2 module drmi.mqtt.accessor; 3 4 import drmi.ps.accessor; 5 import drmi.mqtt.transport; 6 7 import std.algorithm; 8 import std.array; 9 import std.datetime; 10 import core.thread; 11 import std.conv; 12 import std.experimental.logger; 13 14 class AFiber : Fiber 15 { 16 ulong nextTime; 17 this(void delegate() dlg) { super(dlg); } 18 this(void delegate() dlg, size_t sz) { super(dlg, sz); } 19 } 20 21 void sleep(Duration d) 22 { 23 if (auto f = Fiber.getThis()) 24 { 25 auto af = cast(AFiber)f; 26 if (af !is null) 27 { 28 af.nextTime = Clock.currStdTime + d.total!"hnsecs"; 29 af.yield(); 30 } 31 else 32 { 33 auto sw = StopWatch(AutoStart.yes); 34 while (sw.peek.to!Duration < d) f.yield(); 35 } 36 } 37 else Thread.sleep(d); 38 } 39 40 void yield() 41 { 42 if (auto f = Fiber.getThis()) f.yield(); 43 else Thread.yield(); 44 } 45 46 /// 47 class MqttAccessor(T) : Accessor!T 48 { 49 protected: 50 MqttTransport tr; 51 AFiber[] fibers; 52 bool work = true; 53 int exit_result; 54 55 void callTransportLoop() { tr.loop(); } 56 57 public: 58 59 alias Settings = MosquittoClient.Settings; 60 61 /// 62 size_t stackSize = 1024 * 128; 63 64 /// 65 this(T obj, string uniq="") { this(obj, Settings.init, uniq); } 66 67 /// 68 this(T obj, Settings sets, string uniq="") 69 { 70 tr = new MqttTransport(sets); 71 super(tr, obj, uniq, (s){ .sleep(s); }); 72 73 spawnInfLoop({ callTransportLoop(); }); 74 } 75 76 void spawn(void delegate() _body) 77 { 78 if (stackSize > 0) 79 fibers ~= new AFiber({ _body(); }, stackSize); 80 else 81 fibers ~= new AFiber({ _body(); }); 82 } 83 84 void spawnInfLoop(void delegate() loop_body) 85 { 86 fibers ~= new AFiber( 87 { 88 while (true) 89 { 90 loop_body(); 91 yield(); 92 } 93 }, stackSize); 94 } 95 96 void exitLoop(int res=0) 97 { 98 work = false; 99 exit_result = res; 100 } 101 102 int exitResult() const @property { return exit_result; } 103 104 /// 105 bool loop() 106 { 107 if (!work) return false; 108 fibers = fibers.filter!(f=>f.state != Fiber.State.TERM).array; 109 foreach (f; fibers) 110 if (f.nextTime < Clock.currStdTime) 111 f.call(); 112 return true; 113 } 114 }