|
2 | 2 | import paho.mqtt.client as mqtt
|
3 | 3 |
|
4 | 4 | class Hub():
|
5 |
| - def __init__(self, account_id, api_key, client_id=None): |
| 5 | + def __init__(self, account_id, api_key, device_id=None, message_handler=None): |
6 | 6 | self.account_id = account_id
|
7 |
| - self.client = mqtt.Client(client_id=client_id, clean_session=True) |
| 7 | + self.client = mqtt.Client(client_id=device_id, clean_session=True) |
8 | 8 | self.client.username_pw_set(account_id, api_key)
|
9 | 9 | # self.client.tls_set(ca_certs='labstack.com/cert.pem')
|
10 | 10 | self.handlers = {}
|
11 | 11 | def handler(client, userdata, msg):
|
12 |
| - self.handlers[msg.topic](msg.payload) |
| 12 | + topic = self._denormalize_topic(msg.topic) |
| 13 | + if message_handler: |
| 14 | + message_handler(topic, msg.payload) |
| 15 | + h = self.handlers.get(topic) |
| 16 | + if h: |
| 17 | + h(topic, msg.payload) |
13 | 18 | self.client.on_message = handler
|
| 19 | + |
| 20 | + def _normalize_topic(self, topic): |
| 21 | + return '{}/{}'.format(self.account_id, topic) |
| 22 | + |
| 23 | + def _denormalize_topic(self, topic): |
| 24 | + return topic.lstrip(self.account_id + '/') |
14 | 25 |
|
15 | 26 | def connect(self, handler=None):
|
16 | 27 | self.client.connect("hub.labstack.com", 1883)
|
17 | 28 | self.client.loop_start()
|
18 | 29 | def on_connect(client, userdata, flags, rc):
|
19 |
| - if handler is not None: |
| 30 | + if handler: |
20 | 31 | handler()
|
21 | 32 | self.client.on_connect = on_connect
|
22 | 33 |
|
23 | 34 | def publish(self, topic, message):
|
24 |
| - self.client.publish('{}/{}'.format(self.account_id, topic), message) |
| 35 | + self.client.publish(self._normalize_topic(topic), message) |
25 | 36 |
|
26 | 37 | # def subscribe(self, topic, handler, shared=False):
|
27 |
| - def subscribe(self, topic, handler): |
28 |
| - topic = '{}/{}'.format(self.account_id, topic) |
| 38 | + def subscribe(self, topic, handler=None): |
29 | 39 | # if shared:
|
30 | 40 | # topic = '$queue/' + topic
|
31 |
| - self.client.subscribe(topic) |
| 41 | + self.client.subscribe(self._normalize_topic(topic)) |
32 | 42 | self.handlers[topic] = handler
|
33 | 43 |
|
34 | 44 | def unsubscribe(self, topic):
|
35 |
| - self.client.unsubscribe('{}/{}'.format(self.account_id, topic)) |
| 45 | + self.client.unsubscribe(self._normalize_topic(topic)) |
36 | 46 |
|
37 | 47 | def disconnect(self):
|
38 | 48 | self.client.loop_stop()
|
|
0 commit comments