1 module mosquitto.client; 2 3 import std.algorithm : map; 4 import std.exception; 5 import std.array : array; 6 import std.string; 7 8 public import mosquitto.api; 9 10 class MosquittoException : Exception 11 { 12 MOSQ_ERR err; 13 this(MOSQ_ERR err, string func) 14 { 15 this.err = err; 16 super(format("%s returns %d (%s)", func, err, err)); 17 } 18 } 19 20 private void mosqCheck(alias fnc, Args...)(Args args) 21 { 22 if (auto r = cast(MOSQ_ERR)fnc(args)) 23 throw new MosquittoException(r, __traits(identifier, fnc)); 24 } 25 26 class MosquittoClient 27 { 28 protected: 29 mosquitto_t mosq; 30 31 static struct Callback 32 { 33 string pattern; 34 void delegate(string, const(ubyte)[]) func; 35 int qos; 36 } 37 38 Callback[] slist; 39 40 bool _connected; 41 42 public: 43 44 struct Message 45 { 46 string topic; 47 const(ubyte)[] payload; 48 } 49 50 struct Settings 51 { 52 string host = "::1"; 53 ushort port = 1883; 54 string clientId; 55 bool cleanSession = false; 56 int keepalive = 5; 57 } 58 59 Settings settings; 60 61 void delegate() onConnect; 62 63 extern(C) protected static 64 { 65 void onConnectCallback(mosquitto_t mosq, void* cptr, int res) 66 { 67 auto cli = enforce(cast(MosquittoClient)cptr, "null cli"); 68 enum Res 69 { 70 success = 0, 71 unacceptable_protocol_version = 1, 72 identifier_rejected = 2, 73 broker_unavailable = 3 74 } 75 enforce(res == 0, format("connection error: %s", cast(Res)res)); 76 cli._connected = true; 77 cli.subscribeList(); 78 if (cli.onConnect !is null) cli.onConnect(); 79 } 80 81 void onDisconnectCallback(mosquitto_t mosq, void* cptr, int res) 82 { 83 auto cli = enforce(cast(MosquittoClient)cptr, "null cli"); 84 cli._connected = false; 85 } 86 87 void onMessageCallback(mosquitto_t mosq, void* cptr, 88 const mosquitto_message* msg) 89 { 90 auto cli = enforce(cast(MosquittoClient)cptr, "null cli"); 91 cli.onMessage(Message(msg.topic.fromStringz.idup, 92 cast(ubyte[])msg.payload[0..msg.payloadlen])); 93 } 94 } 95 96 protected void subscribeList() 97 { 98 foreach (cb; slist) 99 mosqCheck!mosquitto_subscribe(mosq, null, 100 cb.pattern.toStringz, cb.qos); 101 } 102 103 protected void onMessage(Message msg) 104 { 105 foreach (cb; slist) 106 { 107 bool res; 108 mosqCheck!mosquitto_topic_matches_sub( 109 cb.pattern.toStringz, msg.topic.toStringz, &res); 110 if (res) cb.func(msg.topic, msg.payload); 111 } 112 } 113 114 this(Settings s) 115 { 116 import core.stdc.errno; 117 118 settings = s; 119 120 mosq = enforce(mosquitto_new(s.clientId.toStringz, 121 s.cleanSession, cast(void*)this), 122 format("error while create mosquitto: %d", errno)); 123 124 mosquitto_connect_callback_set(mosq, &onConnectCallback); 125 mosquitto_message_callback_set(mosq, &onMessageCallback); 126 } 127 128 ~this() { disconnect(); } 129 130 bool connected() const @property { return _connected; } 131 132 void loop() { mosqCheck!mosquitto_loop(mosq, 0, 1); } 133 134 void connect() 135 { 136 mosqCheck!mosquitto_connect(mosq, 137 settings.host.toStringz, settings.port, 138 settings.keepalive); 139 } 140 141 void reconnect() { mosqCheck!mosquitto_reconnect(mosq); } 142 143 void disconnect() { mosqCheck!mosquitto_disconnect(mosq); } 144 145 void publish(string t, const(ubyte)[] d, int qos=0, 146 bool retain=false) 147 { 148 mosqCheck!mosquitto_publish(mosq, null, t.toStringz, 149 cast(int)d.length, d.ptr, qos, retain); 150 } 151 152 void subscribe(string pattern, void delegate(string, 153 const(ubyte)[]) cb, int qos) 154 { 155 slist ~= Callback(pattern, cb, qos); 156 if (connected) mosqCheck!mosquitto_subscribe(mosq, 157 null, pattern.toStringz, qos); 158 } 159 }