1 module mosquitto.client;
2 
3 import std.algorithm : map;
4 import std.exception;
5 import std.array : array;
6 import std..string;
7 
8 public import mosquitto.api;
9 
10 class MosquittoException : Exception
11 {
12     MOSQ_ERR err;
13     this(MOSQ_ERR err, string func)
14     {
15         this.err = err;
16         super(format("%s returns %d (%s)", func, err, err));
17     }
18 }
19 
20 private void mosqCheck(alias fnc, Args...)(Args args)
21 {
22     if (auto r = cast(MOSQ_ERR)fnc(args))
23         throw new MosquittoException(r, __traits(identifier, fnc));
24 }
25 
26 class MosquittoClient
27 {
28 protected:
29     mosquitto_t mosq;
30 
31     static struct Callback
32     {
33         string pattern;
34         void delegate(string, const(ubyte)[]) func;
35         int qos;
36     }
37 
38     Callback[] slist;
39 
40     bool _connected;
41 
42 public:
43 
44     struct Message
45     {
46         string topic;
47         const(ubyte)[] payload;
48     }
49 
50     struct Settings
51     {
52         string host = "::1";
53         ushort port = 1883;
54         string clientId;
55         bool cleanSession = false;
56         int keepalive = 5;
57     }
58 
59     Settings settings;
60 
61     void delegate() onConnect;
62 
63     extern(C) protected static
64     {
65         void onConnectCallback(mosquitto_t mosq, void* cptr, int res)
66         {
67             auto cli = enforce(cast(MosquittoClient)cptr, "null cli");
68             enum Res
69             {
70                 success = 0,
71                 unacceptable_protocol_version = 1,
72                 identifier_rejected = 2,
73                 broker_unavailable = 3
74             }
75             enforce(res == 0, format("connection error: %s", cast(Res)res));
76             cli._connected = true;
77             cli.subscribeList();
78             if (cli.onConnect !is null) cli.onConnect();
79         }
80 
81         void onDisconnectCallback(mosquitto_t mosq, void* cptr, int res)
82         {
83             auto cli = enforce(cast(MosquittoClient)cptr, "null cli");
84             cli._connected = false;
85         }
86 
87         void onMessageCallback(mosquitto_t mosq, void* cptr, const mosquitto_message* msg)
88         {
89             auto cli = enforce(cast(MosquittoClient)cptr, "null cli");
90             cli.onMessage(Message(msg.topic.fromStringz.idup, cast(ubyte[])msg.payload[0..msg.payloadlen]));
91         }
92     }
93 
94     protected void subscribeList()
95     {
96         foreach (cb; slist)
97             mosqCheck!mosquitto_subscribe(mosq, null, cb.pattern.toStringz, cb.qos);
98     }
99 
100     protected void onMessage(Message msg)
101     {
102         foreach (cb; slist)
103         {
104             bool res;
105             mosqCheck!mosquitto_topic_matches_sub(cb.pattern.toStringz,
106                                               msg.topic.toStringz,
107                                               &res);
108             if (res) cb.func(msg.topic, msg.payload);
109         }
110     }
111 
112     this(Settings s)
113     {
114         import core.stdc.errno;
115 
116         settings = s;
117 
118         mosq = enforce(mosquitto_new(s.clientId.toStringz, s.cleanSession, cast(void*)this),
119         format("error while create mosquitto: %d", errno));
120 
121         mosquitto_connect_callback_set(mosq, &onConnectCallback);
122         mosquitto_message_callback_set(mosq, &onMessageCallback);
123     }
124 
125     ~this() { disconnect(); }
126 
127     bool connected() const @property { return _connected; }
128 
129     void loop() { mosqCheck!mosquitto_loop(mosq, 0, 1); }
130 
131     void connect() { mosqCheck!mosquitto_connect(mosq, settings.host.toStringz, settings.port, settings.keepalive); }
132     void reconnect() { mosqCheck!mosquitto_reconnect(mosq); }
133 
134     void disconnect() { mosqCheck!mosquitto_disconnect(mosq); }
135 
136     void publish(string t, const(ubyte)[] d, int qos=0, bool retain=false)
137     { mosqCheck!mosquitto_publish(mosq, null, t.toStringz, cast(int)d.length, d.ptr, qos, retain); }
138 
139     void subscribe(string pattern, void delegate(string, const(ubyte)[]) cb, int qos)
140     {
141         slist ~= Callback(pattern, cb, qos);
142         if (connected) mosqCheck!mosquitto_subscribe(mosq, null, pattern.toStringz, qos);
143     }
144 }