Replies: 7 comments 21 replies
-
Well, as it currently stands, you don't have the payload available to you in the plugin. This is for performance reasons. However, I do have some ideas how I would make that possible. So, that functionality may come. But aside from that, I can give some insight. First, how many messages per second are we talking? One thing you do have to realize, is that MQTT messages can be dropped, even in QoS. QoS is merely a way for the sending side to know if a message is delivered, not a guarantee that nothing is dropped. So, if you need to process 1 million messages per second, one subscriber isn't going to be able to handle that. If you do do it in the plugin, beware that if you do a direct call to a a database back-end, that will be blocking and you'll be holding up the event loops. You'd need to do it in some kind of async way, or producer/consumer model with semaphore controlled queues. You'd have an extra thread per event loop that will be writing the messages to the DB. Also, you should make use of multi-value insert statements. Doing it in the plugin would also have the advantage of distributing the load over multiple threads (because you'd have a db writer thread per FlashMQ thread). But, as stated, currently you can't do it in the plugin. I'd have to give an API change some thought. If your message volume isn't super high, a subscriber on |
Beta Was this translation helpful? Give feedback.
-
I was debating doing it with a subscriber on #, specifically in case of high volume messages and then having an an additional dependency of this new subscriber and managing it(cost, uptime, maintenance, etc) . As a different approach, I was thinking about adding a hook to the RetainedMessagesDB::saveData() call as it seemed to be already backed by a circular buffer and setup in a way that multi threaded calls would be safe. We are planning to build a simple IoT service where devices will push messages up, with publish being the crux and there may not really be a subscriber but just a DB where the data gets ingested and is accessed via DB's rest apis. If we do not use any subscriber at all, i assume the messages get dropped but will still make its way to the the above function call if they are marked as "retain". Just replacing the persistencefile write calls with the DB calls in saveData() seemed like the simplest way to do it. I think the messages should always get to this function and then based on the user selection in the config file, write to DB in addition to the actual persist handling. This seemed to be the easiest but probably not the cleanest I think. Also, my approach is based on reading your code just in the last day, so I am sure I am making some dumb assumptions as well. :) |
Beta Was this translation helpful? Give feedback.
-
Remember that retained messages are set to a specific value; it's not a stream of messages. If you only dump the retained messages, you will miss transitions. And, the messages may be old. Can you tell me what your message volume is expected to be? |
Beta Was this translation helpful? Give feedback.
-
I think doing it with the plugin may still be your best choice. Forking the code has obvious downsides. I have some ideas how to make the payload available. I'll push a concept branch soon. You may also want to test writing thousands of messages per second to a database. Multi-value statements is key. And, perhaps Timescaledb is an option, if it's time series based. If it's in a normal SQL database, actually removing old data is going to harder than inserting it. I have experienced this myself. That's one of many things a time series DB can help. |
Beta Was this translation helpful? Give feedback.
-
I pushed draft-change-payload. I'm still thinking over a few issues. |
Beta Was this translation helpful? Give feedback.
-
Hi Wiebe, I know you've been trying something more complex and probably you have a reason for not including the payload in |
Beta Was this translation helpful? Give feedback.
-
I think everything from this discussion was resolved. Closing. |
Beta Was this translation helpful? Give feedback.
-
what would be the best approach to archive all the messages received in a DB ? would it be to tap into AlterPublish path for the plugin and pipe to DB or you would suggest to instead create a dumb subscriber whose sole purpose is to subscribe to all topics and keep writing to a DB ?
Beta Was this translation helpful? Give feedback.
All reactions