1 module mosquitto.client;
2 
3 import std.algorithm : map;
4 import std.exception;
5 import std.array : array;
6 import std.string;
7 
8 public import mosquitto.api;
9 
10 class MosquittoException : Exception
11 {
12     MOSQ_ERR err;
13     this(MOSQ_ERR err, string func)
14     {
15         this.err = err;
16         super(format("%s returns %d (%s)", func, err, err));
17     }
18 }
19 
20 private void mosqCheck(alias fnc, Args...)(Args args)
21 {
22     if (auto r = cast(MOSQ_ERR)fnc(args))
23         throw new MosquittoException(r, __traits(identifier, fnc));
24 }
25 
26 class MosquittoClient
27 {
28 protected:
29     mosquitto_t mosq;
30 
31     static struct Callback
32     {
33         string pattern;
34         void delegate(string, const(ubyte)[]) func;
35         int qos;
36     }
37 
38     Callback[] slist;
39 
40     bool _connected;
41 
42 public:
43 
44     struct Message
45     {
46         string topic;
47         const(ubyte)[] payload;
48     }
49 
50     struct Settings
51     {
52         string host = "::1";
53         ushort port = 1883;
54         string clientId;
55         bool cleanSession = false;
56         int keepalive = 5;
57     }
58 
59     Settings settings;
60 
61     void delegate() onConnect;
62 
63     extern(C) protected static
64     {
65         void onConnectCallback(mosquitto_t mosq, void* cptr, int res)
66         {
67             auto cli = enforce(cast(MosquittoClient)cptr, "null cli");
68             enum Res
69             {
70                 success = 0,
71                 unacceptable_protocol_version = 1,
72                 identifier_rejected = 2,
73                 broker_unavailable = 3
74             }
75             enforce(res == 0, format("connection error: %s", cast(Res)res));
76             cli._connected = true;
77             cli.subscribeList();
78             if (cli.onConnect !is null) cli.onConnect();
79         }
80 
81         void onDisconnectCallback(mosquitto_t mosq, void* cptr, int res)
82         {
83             auto cli = enforce(cast(MosquittoClient)cptr, "null cli");
84             cli._connected = false;
85         }
86 
87         void onMessageCallback(mosquitto_t mosq, void* cptr,
88                                 const mosquitto_message* msg)
89         {
90             auto cli = enforce(cast(MosquittoClient)cptr, "null cli");
91             cli.onMessage(Message(msg.topic.fromStringz.idup,
92                        cast(ubyte[])msg.payload[0..msg.payloadlen]));
93         }
94     }
95 
96     protected void subscribeList()
97     {
98         foreach (cb; slist)
99             mosqCheck!mosquitto_subscribe(mosq, null,
100                             cb.pattern.toStringz, cb.qos);
101     }
102 
103     protected void onMessage(Message msg)
104     {
105         foreach (cb; slist)
106         {
107             bool res;
108             mosqCheck!mosquitto_topic_matches_sub(
109                 cb.pattern.toStringz, msg.topic.toStringz, &res);
110             if (res) cb.func(msg.topic, msg.payload);
111         }
112     }
113 
114     this(Settings s)
115     {
116         import core.stdc.errno;
117 
118         settings = s;
119 
120         mosq = enforce(mosquitto_new(s.clientId.toStringz,
121                         s.cleanSession, cast(void*)this),
122         format("error while create mosquitto: %d", errno));
123 
124         mosquitto_connect_callback_set(mosq, &onConnectCallback);
125         mosquitto_message_callback_set(mosq, &onMessageCallback);
126     }
127 
128     ~this() { disconnect(); }
129 
130     bool connected() const @property { return _connected; }
131 
132     void loop() { mosqCheck!mosquitto_loop(mosq, 0, 1); }
133 
134     void connect()
135     {
136         mosqCheck!mosquitto_connect(mosq,
137                 settings.host.toStringz, settings.port,
138                 settings.keepalive);
139     }
140 
141     void reconnect() { mosqCheck!mosquitto_reconnect(mosq); }
142 
143     void disconnect() { mosqCheck!mosquitto_disconnect(mosq); }
144 
145     void publish(string t, const(ubyte)[] d, int qos=0,
146                  bool retain=false)
147     {
148         mosqCheck!mosquitto_publish(mosq, null, t.toStringz,
149         cast(int)d.length, d.ptr, qos, retain);
150     }
151 
152     void subscribe(string pattern, void delegate(string,
153                     const(ubyte)[]) cb, int qos)
154     {
155         slist ~= Callback(pattern, cb, qos);
156         if (connected) mosqCheck!mosquitto_subscribe(mosq,
157                                 null, pattern.toStringz, qos);
158     }
159 }