1 ///
2 module drmi.mqtt.accessor;
3 
4 import drmi.ps.accessor;
5 import drmi.mqtt.transport;
6 
7 import std.algorithm;
8 import std.array;
9 import std.datetime;
10 import core.thread;
11 import std.conv;
12 import std.experimental.logger;
13 
14 class AFiber : Fiber
15 {
16     ulong nextTime;
17     this(void delegate() dlg) { super(dlg); }
18     this(void delegate() dlg, size_t sz) { super(dlg, sz); }
19 }
20 
21 void sleep(Duration d)
22 {
23     if (auto f = Fiber.getThis())
24     {
25         auto af = cast(AFiber)f;
26         if (af !is null)
27         {
28             af.nextTime = Clock.currStdTime + d.total!"hnsecs";
29             af.yield();
30         }
31         else
32         {
33             auto sw = StopWatch(AutoStart.yes);
34             while (sw.peek.to!Duration < d) f.yield();
35         }
36     }
37     else Thread.sleep(d);
38 }
39 
40 void yield()
41 {
42     if (auto f = Fiber.getThis()) f.yield();
43     else Thread.yield();
44 }
45 
46 ///
47 class MqttAccessor(T) : Accessor!T
48 {
49 protected:
50     MqttTransport tr;
51     AFiber[] fibers;
52     bool work = true;
53     int exit_result;
54 
55     void callTransportLoop() { tr.loop(); }
56 
57 public:
58 
59     alias Settings = MosquittoClient.Settings;
60 
61     ///
62     size_t stackSize = 1024 * 128;
63 
64     ///
65     this(T obj, string uniq="") { this(obj, Settings.init, uniq); }
66 
67     ///
68     this(T obj, Settings sets, string uniq="")
69     {
70         tr = new MqttTransport(sets);
71         super(tr, obj, uniq, (s){ .sleep(s); });
72 
73         spawnInfLoop({ callTransportLoop(); });
74     }
75 
76     void spawn(void delegate() _body)
77     {
78         if (stackSize > 0)
79             fibers ~= new AFiber({ _body(); }, stackSize);
80         else
81             fibers ~= new AFiber({ _body(); });
82     }
83 
84     void spawnInfLoop(void delegate() loop_body)
85     {
86         fibers ~= new AFiber(
87         {
88             while (true)
89             {
90                 loop_body();
91                 yield();
92             }
93         }, stackSize);
94     }
95 
96     void exitLoop(int res=0)
97     {
98         work = false;
99         exit_result = res;
100     }
101 
102     int exitResult() const @property { return exit_result; }
103 
104     ///
105     bool loop()
106     {
107         if (!work) return false;
108         fibers = fibers.filter!(f=>f.state != Fiber.State.TERM).array;
109         foreach (f; fibers)
110             if (f.nextTime < Clock.currStdTime)
111                 f.call();
112         return true;
113     }
114 }