1 module mosquitto.client; 2 3 import std.algorithm : map; 4 import std.exception; 5 import std.array : array; 6 import std..string; 7 8 public import mosquitto.api; 9 10 class MosquittoException : Exception 11 { 12 MOSQ_ERR err; 13 this(MOSQ_ERR err, string func) 14 { 15 this.err = err; 16 super(format("%s returns %d (%s)", func, err, err)); 17 } 18 } 19 20 private void mosqCheck(alias fnc, Args...)(Args args) 21 { 22 if (auto r = cast(MOSQ_ERR)fnc(args)) 23 throw new MosquittoException(r, __traits(identifier, fnc)); 24 } 25 26 class MosquittoClient 27 { 28 protected: 29 mosquitto_t mosq; 30 31 static struct Callback 32 { 33 string pattern; 34 void delegate(string, const(ubyte)[]) func; 35 int qos; 36 } 37 38 Callback[] slist; 39 40 bool _connected; 41 42 public: 43 44 struct Message 45 { 46 string topic; 47 const(ubyte)[] payload; 48 } 49 50 struct Settings 51 { 52 string host = "::1"; 53 ushort port = 1883; 54 string clientId; 55 bool cleanSession = false; 56 int keepalive = 5; 57 } 58 59 Settings settings; 60 61 void delegate() onConnect; 62 63 extern(C) protected static 64 { 65 void onConnectCallback(mosquitto_t mosq, void* cptr, int res) 66 { 67 auto cli = enforce(cast(MosquittoClient)cptr, "null cli"); 68 enum Res 69 { 70 success = 0, 71 unacceptable_protocol_version = 1, 72 identifier_rejected = 2, 73 broker_unavailable = 3 74 } 75 enforce(res == 0, format("connection error: %s", cast(Res)res)); 76 cli._connected = true; 77 cli.subscribeList(); 78 if (cli.onConnect !is null) cli.onConnect(); 79 } 80 81 void onDisconnectCallback(mosquitto_t mosq, void* cptr, int res) 82 { 83 auto cli = enforce(cast(MosquittoClient)cptr, "null cli"); 84 cli._connected = false; 85 } 86 87 void onMessageCallback(mosquitto_t mosq, void* cptr, const mosquitto_message* msg) 88 { 89 auto cli = enforce(cast(MosquittoClient)cptr, "null cli"); 90 cli.onMessage(Message(msg.topic.fromStringz.idup, cast(ubyte[])msg.payload[0..msg.payloadlen])); 91 } 92 } 93 94 protected void subscribeList() 95 { 96 foreach (cb; slist) 97 mosqCheck!mosquitto_subscribe(mosq, null, cb.pattern.toStringz, cb.qos); 98 } 99 100 protected void onMessage(Message msg) 101 { 102 foreach (cb; slist) 103 { 104 bool res; 105 mosqCheck!mosquitto_topic_matches_sub(cb.pattern.toStringz, 106 msg.topic.toStringz, 107 &res); 108 if (res) cb.func(msg.topic, msg.payload); 109 } 110 } 111 112 this(Settings s) 113 { 114 import core.stdc.errno; 115 116 settings = s; 117 118 mosq = enforce(mosquitto_new(s.clientId.toStringz, s.cleanSession, cast(void*)this), 119 format("error while create mosquitto: %d", errno)); 120 121 mosquitto_connect_callback_set(mosq, &onConnectCallback); 122 mosquitto_message_callback_set(mosq, &onMessageCallback); 123 } 124 125 ~this() { disconnect(); } 126 127 bool connected() const @property { return _connected; } 128 129 void loop() { mosqCheck!mosquitto_loop(mosq, 0, 1); } 130 131 void connect() { mosqCheck!mosquitto_connect(mosq, settings.host.toStringz, settings.port, settings.keepalive); } 132 void reconnect() { mosqCheck!mosquitto_reconnect(mosq); } 133 134 void disconnect() { mosqCheck!mosquitto_disconnect(mosq); } 135 136 void publish(string t, const(ubyte)[] d, int qos=0, bool retain=false) 137 { mosqCheck!mosquitto_publish(mosq, null, t.toStringz, cast(int)d.length, d.ptr, qos, retain); } 138 139 void subscribe(string pattern, void delegate(string, const(ubyte)[]) cb, int qos) 140 { 141 slist ~= Callback(pattern, cb, qos); 142 if (connected) mosqCheck!mosquitto_subscribe(mosq, null, pattern.toStringz, qos); 143 } 144 }