3#ifndef OCPP_COMMON_MESSAGE_QUEUE_HPP
4#define OCPP_COMMON_MESSAGE_QUEUE_HPP
7#include <condition_variable>
15#include <everest/timer.hpp>
17#include <ocpp/common/call_types.hpp>
18#include <ocpp/common/database/database_handler_common.hpp>
19#include <ocpp/common/types.hpp>
20#include <ocpp/v16/messages/StopTransaction.hpp>
21#include <ocpp/v16/types.hpp>
22#include <ocpp/v2/messages/TransactionEvent.hpp>
23#include <ocpp/v2/types.hpp>
30 int transaction_message_attempts;
31 int transaction_message_retry_interval;
35 int queues_total_size_threshold;
37 bool queue_all_messages{
false};
38 std::set<M> message_types_discard_for_queueing;
41 int message_timeout_seconds = 30;
42 int boot_notification_retry_interval_seconds =
49 return queue_all_messages and !message_types_discard_for_queueing.count(message_type);
72 bool stall_until_accepted;
80 return this->message[MESSAGE_ID];
88enum class MessageTransmissionPriority {
90 SendAfterRegistrationStatusAccepted,
96inline MessageTransmissionPriority get_message_transmission_priority(
bool is_boot_notification_message,
bool triggered,
97 bool registration_already_accepted,
98 bool is_transaction_related,
99 bool queue_all_message) {
100 if (registration_already_accepted || is_boot_notification_message || triggered) {
101 return MessageTransmissionPriority::SendImmediately;
104 if (is_transaction_related || queue_all_message) {
105 return MessageTransmissionPriority::SendAfterRegistrationStatusAccepted;
108 return MessageTransmissionPriority::Discard;
114bool is_transaction_message(
const ocpp::v16::MessageType message_type);
119bool is_transaction_message(
const ocpp::v2::MessageType message_type);
124bool is_start_transaction_message(
const ocpp::v16::MessageType message_type);
129bool is_start_transaction_message(
const ocpp::v2::MessageType message_type);
132template <
typename M>
auto is_start_transaction_message(
const ControlMessage<M>& control_message) {
133 return is_start_transaction_message(control_message.messageType);
137template <
typename M>
auto is_transaction_message(
const ControlMessage<M>& control_message) {
138 return is_transaction_message(control_message.messageType);
143bool is_boot_notification_message(
const ocpp::v16::MessageType message_type);
148bool is_boot_notification_message(
const ocpp::v2::MessageType message_type);
151bool allowed_to_send_message(
const ControlMessage<M>& message,
const DateTime& time,
152 const bool is_registration_status_accepted) {
153 if (message.stall_until_accepted and !is_registration_status_accepted) {
157 if (message.timestamp > time) {
167 std::shared_ptr<ocpp::common::DatabaseHandlerCommon> database_handler;
169 std::thread worker_thread;
171 std::deque<std::shared_ptr<ControlMessage<M>>> transaction_message_queue;
173 std::deque<std::shared_ptr<ControlMessage<M>>> normal_message_queue;
174 std::shared_ptr<ControlMessage<M>> in_flight;
175 std::recursive_mutex message_mutex;
176 std::condition_variable_any cv;
177 std::function<bool(json message)> send_callback;
178 std::vector<M> external_notify;
184 bool is_registration_status_accepted;
185 std::recursive_mutex next_message_mutex;
186 std::optional<MessageId> next_message_to_send;
188 Everest::SteadyTimer in_flight_timeout_timer;
189 Everest::SteadyTimer notify_queue_timer;
192 Everest::SteadyTimer resume_timer;
195 u_int64_t pause_resume_ctr = 0;
201 std::map<std::string, int32_t> message_id_transaction_id_map;
206 std::map<std::string, std::vector<std::string>> start_transaction_mid_meter_values_mid_map;
209 std::function<void(
const std::string& new_message_id,
const std::string& old_message_id)>
210 start_transaction_message_retry_callback;
212 MessageId getMessageId(
const json::array_t& json_message) {
213 return MessageId(json_message.at(MESSAGE_ID).get<std::string>());
215 MessageTypeId getMessageTypeId(
const json::array_t& json_message) {
216 if (json_message.size() > 0) {
217 auto messageTypeId = json_message.at(MESSAGE_TYPE_ID);
218 if (messageTypeId == MessageTypeId::CALL) {
219 return MessageTypeId::CALL;
221 if (messageTypeId == MessageTypeId::CALLRESULT) {
222 return MessageTypeId::CALLRESULT;
224 if (messageTypeId == MessageTypeId::CALLERROR) {
225 return MessageTypeId::CALLERROR;
229 return MessageTypeId::UNKNOWN;
231 bool isValidMessageType(
const json::array_t& json_message) {
232 if (this->getMessageTypeId(json_message) != MessageTypeId::UNKNOWN) {
239 EVLOG_debug <<
"Adding message to normal message queue";
241 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
243 if (message->messageType == M::BootNotification) {
244 this->normal_message_queue.push_front(message);
246 this->normal_message_queue.push_back(message);
248 if (this->config.
check_queue(message->messageType)) {
250 message->message, messagetype_to_string(message->messageType), message->message_attempts,
251 message->timestamp, message->uniqueId()};
253 this->database_handler->insert_message_queue_message(db_message, QueueType::Normal);
255 EVLOG_warning <<
"Could not insert message into transaction queue: " << e.what();
258 this->new_message =
true;
259 this->check_queue_sizes();
261 this->cv.notify_all();
262 EVLOG_debug <<
"Notified message queue worker";
264 void add_to_transaction_message_queue(std::shared_ptr<
ControlMessage<M>> message) {
265 EVLOG_debug <<
"Adding message to transaction message queue";
267 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
268 this->transaction_message_queue.push_back(message);
270 message->message_attempts, message->timestamp,
271 message->uniqueId()};
273 this->database_handler->insert_message_queue_message(db_message);
275 EVLOG_warning <<
"Could not insert message into transaction queue: " << e.what();
277 this->new_message =
true;
278 this->check_queue_sizes();
280 this->cv.notify_all();
281 EVLOG_debug <<
"Notified message queue worker";
284 void check_queue_sizes() {
285 if (this->transaction_message_queue.size() + this->normal_message_queue.size() <=
286 this->config.queues_total_size_threshold) {
289 EVLOG_warning <<
"Queue sizes exceed threshold (" << this->config.queues_total_size_threshold <<
") with "
290 << this->transaction_message_queue.size() <<
" transaction and "
291 << this->normal_message_queue.size() <<
" normal messages in queue";
293 while (this->transaction_message_queue.size() + this->normal_message_queue.size() >
294 this->config.queues_total_size_threshold &&
295 !this->normal_message_queue.empty()) {
296 this->drop_messages_from_normal_message_queue();
299 while (this->transaction_message_queue.size() + this->normal_message_queue.size() >
300 this->config.queues_total_size_threshold &&
301 this->drop_update_messages_from_transactional_message_queue()) {
305 void drop_messages_from_normal_message_queue() {
307 int number_of_dropped_messages = std::min((
int)this->normal_message_queue.size(),
308 std::max(this->config.queues_total_size_threshold / 10, 1));
310 EVLOG_warning <<
"Dropping " << number_of_dropped_messages <<
" messages from normal message queue.";
312 for (
int i = 0; i < number_of_dropped_messages; i++) {
313 if (this->config.queue_all_messages) {
315 database_handler->remove_message_queue_message(
316 this->normal_message_queue.front()->initial_unique_id, QueueType::Normal);
318 EVLOG_warning <<
"Could not delete message from transaction queue: " << e.what();
319 }
catch (
const std::exception& e) {
320 EVLOG_warning <<
"Could not delete message from transaction queue: " << e.what();
323 this->normal_message_queue.pop_front();
333 bool drop_update_messages_from_transactional_message_queue() {
335 std::deque<std::shared_ptr<ControlMessage<M>>> temporary_swap_queue;
336 bool remove_next_update_message =
true;
337 while (!transaction_message_queue.empty()) {
338 auto element = transaction_message_queue.front();
339 transaction_message_queue.pop_front();
341 if (remove_next_update_message && element->is_transaction_update_message() &&
342 transaction_message_queue.size() > 1) {
343 EVLOG_debug <<
"Drop transactional message " << element->initial_unique_id;
345 database_handler->remove_message_queue_message(element->initial_unique_id);
347 EVLOG_warning <<
"Could not delete message from transaction queue: " << e.what();
348 }
catch (
const std::exception& e) {
349 EVLOG_warning <<
"Could not delete message from transaction queue: " << e.what();
352 remove_next_update_message =
false;
354 remove_next_update_message =
true;
355 temporary_swap_queue.push_back(element);
359 std::swap(transaction_message_queue, temporary_swap_queue);
361 if (drop_count > 0) {
362 EVLOG_warning <<
"Dropped " << drop_count <<
" transactional update messages to reduce queue size.";
365 EVLOG_warning <<
"There are no further transaction update messages to drop!";
371 void resume_now(u_int64_t expected_pause_resume_ctr) {
372 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
373 if (this->pause_resume_ctr == expected_pause_resume_ctr) {
374 this->paused =
false;
375 this->resuming =
false;
376 this->cv.notify_one();
377 EVLOG_debug <<
"resume() notified message queue";
382 std::chrono::seconds current_message_timeout(
unsigned int attempt) {
383 return std::chrono::seconds(this->config.message_timeout_seconds +
384 (this->config.transaction_message_retry_interval * attempt));
391 const std::vector<M>& external_notify, std::shared_ptr<common::DatabaseHandlerCommon> database_handler,
392 const std::function<
void(
const std::string& new_message_id,
const std::string& old_message_id)>
393 start_transaction_message_retry_callback =
394 [](
const std::string& new_message_id,
const std::string& old_message_id) {}) :
395 database_handler(std::move(database_handler)),
397 external_notify(external_notify),
402 is_registration_status_accepted(
false),
403 start_transaction_message_retry_callback(start_transaction_message_retry_callback) {
405 this->send_callback = send_callback;
406 this->in_flight =
nullptr;
409 MessageQueue(
const std::function<
bool(json message)>& send_callback,
const MessageQueueConfig<M>& config,
410 std::shared_ptr<common::DatabaseHandlerCommon> databaseHandler) :
411 MessageQueue(send_callback, config, {}, databaseHandler) {
415 this->worker_thread = std::thread([
this]() {
417 while (this->running) {
418 EVLOG_debug <<
"Waiting for a message from the message queue";
420 std::unique_lock<std::recursive_mutex> lk(this->message_mutex);
421 using namespace std::chrono_literals;
423 this->cv.wait(lk, [
this]() {
424 return !this->running || (!this->paused && this->new_message && this->in_flight ==
nullptr);
426 if (this->transaction_message_queue.empty() && this->normal_message_queue.empty()) {
430 EVLOG_debug <<
"There are " << this->normal_message_queue.size()
431 <<
" messages in the normal message queue.";
432 EVLOG_debug <<
"There are " << this->transaction_message_queue.size()
433 <<
" messages in the transaction message queue.";
440 if (this->in_flight !=
nullptr) {
444 EVLOG_debug <<
"There is no message in flight, checking message queue for a new message.";
448 std::shared_ptr<ControlMessage<M>> message =
nullptr;
449 QueueType queue_type = QueueType::None;
450 const auto now = DateTime();
453 auto selected_normal_message_it =
454 std::find_if(normal_message_queue.begin(), normal_message_queue.end(),
455 [&](
const std::shared_ptr<ControlMessage<M>>& msg) {
456 return allowed_to_send_message(*msg, now, this->is_registration_status_accepted);
459 if (selected_normal_message_it != normal_message_queue.end()) {
460 message = *selected_normal_message_it;
461 queue_type = QueueType::Normal;
464 auto is_transaction_message_available = [&](
const std::shared_ptr<ControlMessage<M>>& msg) {
465 if (!allowed_to_send_message(*msg, now, this->is_registration_status_accepted)) {
469 if (message ==
nullptr) {
473 if (message->messageType == M::BootNotification) {
477 if (msg->timestamp <= message->timestamp) {
484 auto selected_transaction_message_it =
485 (!transaction_message_queue.empty() and
486 is_transaction_message_available(transaction_message_queue.front()))
487 ? transaction_message_queue.begin()
488 : transaction_message_queue.end();
490 if (selected_transaction_message_it != transaction_message_queue.end()) {
491 message = *selected_transaction_message_it;
492 queue_type = QueueType::Transaction;
495 if (message ==
nullptr) {
496 EVLOG_debug <<
"No message in queue ready to be sent yet";
497 this->new_message =
false;
502 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
503 if (next_message_to_send.has_value()) {
504 if (next_message_to_send.value() != message->uniqueId()) {
505 EVLOG_debug <<
"Message with id " << message->uniqueId()
506 <<
" held back because message with id " << next_message_to_send.value()
507 <<
" should be sent first";
513 EVLOG_debug <<
"Attempting to send message to central system. UID: " << message->uniqueId()
514 <<
" attempt#: " << message->message_attempts;
515 this->in_flight = message;
516 this->in_flight->message_attempts += 1;
518 if (this->message_id_transaction_id_map.count(this->in_flight->message.at(1))) {
519 EVLOG_debug <<
"Replacing transaction id";
520 this->in_flight->message.at(3)[
"transactionId"] =
521 this->message_id_transaction_id_map.at(this->in_flight->message.at(1));
522 this->message_id_transaction_id_map.erase(this->in_flight->message.at(1));
525 if (!this->send_callback(this->in_flight->message)) {
527 EVLOG_error <<
"Could not send message, this is most likely because the charge point is offline.";
528 if (this->in_flight && is_transaction_message(*this->in_flight)) {
529 EVLOG_info <<
"The message in flight is transaction related and will be sent again once the "
530 "connection can be established again.";
531 if (this->in_flight->message.at(CALL_ACTION) ==
"TransactionEvent") {
532 this->in_flight->message.at(CALL_PAYLOAD)[
"offline"] =
true;
534 }
else if (this->config.
check_queue(this->in_flight->messageType)) {
535 EVLOG_info <<
"The message in flight will be sent again once the connection can be "
536 "established again since QueueAllMessages is set to 'true'.";
538 EVLOG_info <<
"The message in flight is not transaction related and will be dropped";
539 if (queue_type == QueueType::Normal) {
540 EnhancedMessage<M> enhanced_message;
541 enhanced_message.offline =
true;
542 this->in_flight->promise.set_value(enhanced_message);
543 this->normal_message_queue.pop_front();
546 this->reset_in_flight();
548 EVLOG_debug <<
"Successfully sent message. UID: " << this->in_flight->uniqueId();
550 this->current_message_timeout(message->message_attempts));
551 switch (queue_type) {
552 case QueueType::Normal:
553 this->normal_message_queue.erase(selected_normal_message_it);
555 case QueueType::Transaction:
556 this->transaction_message_queue.erase(selected_transaction_message_it);
558 case QueueType::None:
563 if (this->transaction_message_queue.empty() && this->normal_message_queue.empty()) {
564 this->new_message =
false;
569 EVLOG_info <<
"Message queue stopped processing messages";
575 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
576 this->next_message_to_send.reset();
581 std::vector<QueueType> queue_types = {QueueType::Normal, QueueType::Transaction};
583 for (
const auto queue_type : queue_types) {
584 const auto persisted_messages = database_handler->get_message_queue_messages(queue_type);
585 if (!persisted_messages.empty()) {
586 for (
auto& persisted_message : persisted_messages) {
588 if (ignore_security_event_notifications &&
589 persisted_message.message_type ==
"SecurityEventNotification") {
592 this->database_handler->remove_message_queue_message(persisted_message.unique_id,
595 EVLOG_warning <<
"Could not delete message from message queue: " << e.what();
596 }
catch (
const std::exception& e) {
597 EVLOG_warning <<
"Could not delete message from message queue: " << e.what();
600 std::shared_ptr<ControlMessage<M>> message =
601 std::make_shared<ControlMessage<M>>(persisted_message.json_message,
true);
602 message->messageType = string_to_messagetype(persisted_message.message_type);
603 message->timestamp = persisted_message.timestamp;
604 message->message_attempts = persisted_message.message_attempts;
606 if (queue_type == QueueType::Normal) {
607 normal_message_queue.push_back(message);
608 }
else if (queue_type == QueueType::Transaction) {
609 transaction_message_queue.push_back(message);
613 this->new_message =
true;
617 if (!this->config.queue_all_messages) {
620 this->database_handler->clear_message_queue(QueueType::Normal);
624 void push_call(
const json& message,
const bool stall_until_accepted =
false) {
629 auto control_message = std::make_shared<ControlMessage<M>>(message, stall_until_accepted);
630 if (is_transaction_message(*control_message)) {
636 this->add_to_transaction_message_queue(control_message);
640 if (!this->paused || this->resuming || this->config.
check_queue(control_message->messageType) ||
641 control_message->messageType == M::BootNotification) {
642 this->add_to_normal_message_queue(control_message);
645 this->cv.notify_all();
653 this->send_callback(call_result);
655 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
656 if (next_message_to_send.has_value()) {
657 if (next_message_to_send.value() == call_result.at(MESSAGE_ID)) {
658 next_message_to_send.reset();
663 this->cv.notify_all();
672 this->send_callback(call_error);
674 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
675 if (next_message_to_send.has_value()) {
676 if (next_message_to_send.value() == call_error.uniqueId) {
677 next_message_to_send.reset();
682 this->cv.notify_all();
688 auto message = std::make_shared<ControlMessage<M>>(call);
692 enhanced_message.offline =
true;
693 message->promise.set_value(enhanced_message);
694 }
else if (is_transaction_message(message->messageType)) {
697 this->add_to_transaction_message_queue(message);
701 if (this->paused && !this->config.
check_queue(message->messageType) && !this->resuming &&
702 message->messageType != M::BootNotification) {
705 enhanced_message.offline =
true;
706 message->promise.set_value(enhanced_message);
708 this->add_to_normal_message_queue(message);
711 return message->promise.get_future();
720 enhanced_message.
message = json::parse(message);
721 enhanced_message.
uniqueId = this->getMessageId(enhanced_message.
message);
725 enhanced_message.
messageType = this->string_to_messagetype(enhanced_message.
message.at(CALL_ACTION));
729 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
732 next_message_to_send.emplace(enhanced_message.
uniqueId);
737 if (enhanced_message.
messageTypeId == MessageTypeId::CALLRESULT ||
738 enhanced_message.
messageTypeId == MessageTypeId::CALLERROR) {
740 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
741 next_message_to_send.reset();
746 std::unique_lock<std::recursive_mutex> lk(this->message_mutex);
747 if (this->in_flight ==
nullptr) {
748 EVLOG_error <<
"Received a CALLRESULT OR CALLERROR without a message in flight, this should not happen";
749 return enhanced_message;
751 if (this->in_flight->uniqueId() != enhanced_message.
uniqueId) {
752 EVLOG_error <<
"Received a CALLRESULT OR CALLERROR with mismatching uid: "
754 return enhanced_message;
756 if (enhanced_message.
messageTypeId == MessageTypeId::CALLERROR) {
757 EVLOG_error <<
"Received a CALLERROR for message with UID: " << enhanced_message.
uniqueId;
759 enhanced_message.
call_message = this->in_flight->message;
763 this->handle_call_result(enhanced_message);
767 return enhanced_message;
770 void reset_in_flight() {
771 this->in_flight =
nullptr;
772 this->in_flight_timeout_timer.stop();
775 void handle_call_result(EnhancedMessage<M>& enhanced_message) {
776 if (this->in_flight->uniqueId() == enhanced_message.uniqueId) {
777 enhanced_message.
call_message = this->in_flight->message;
778 enhanced_message.messageType = this->string_to_messagetype(
779 this->in_flight->message.at(CALL_ACTION).template get<std::string>() + std::string(
"Response"));
780 this->in_flight->promise.set_value(enhanced_message);
782 const auto queue_type =
783 is_transaction_message(*this->in_flight) ? QueueType::Transaction : QueueType::Normal;
784 if (is_transaction_message(*this->in_flight) or this->config.
check_queue(this->in_flight->messageType)) {
788 this->database_handler->remove_message_queue_message(this->in_flight->initial_unique_id,
791 EVLOG_warning <<
"Could not delete message from message queue: " << e.what();
792 }
catch (
const std::exception& e) {
793 EVLOG_warning <<
"Could not delete message from message queue: " << e.what();
796 this->reset_in_flight();
801 if (std::find(this->external_notify.begin(), this->external_notify.end(), enhanced_message.messageType) ==
802 this->external_notify.end()) {
803 this->cv.notify_one();
810 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
812 bool timeout = !enhanced_message_opt.has_value();
814 EVLOG_warning <<
"Message timeout for: " << this->in_flight->messageType <<
" ("
815 << this->in_flight->uniqueId() <<
")";
817 EVLOG_warning <<
"CALLERROR for: " << this->in_flight->messageType <<
" (" << this->in_flight->uniqueId()
821 const auto queue_type = is_transaction_message(*this->in_flight) ? QueueType::Transaction : QueueType::Normal;
822 if (is_transaction_message(*this->in_flight) or this->config.
check_queue(this->in_flight->messageType)) {
823 if (this->in_flight->message_attempts < this->config.transaction_message_attempts) {
824 EVLOG_warning <<
"Message shall be persisted and will therefore be sent again";
826 const auto old_message_id = this->in_flight->message[MESSAGE_ID];
827 this->in_flight->message[MESSAGE_ID] = ocpp::create_message_id();
828 if (this->config.transaction_message_retry_interval > 0) {
830 this->in_flight->timestamp =
831 DateTime(this->in_flight->timestamp.to_time_point() +
832 std::chrono::seconds(this->config.transaction_message_retry_interval) *
833 this->in_flight->message_attempts);
834 EVLOG_debug <<
"Retry interval > 0: " << this->config.transaction_message_retry_interval
835 <<
" attempting to retry message at: " << this->in_flight->timestamp;
838 this->in_flight->timestamp =
DateTime();
839 EVLOG_debug <<
"Retry interval of 0 means immediate retry";
842 EVLOG_warning <<
"Attempt: " << this->in_flight->message_attempts + 1 <<
"/"
843 << this->config.transaction_message_attempts <<
" will be sent at "
844 << this->in_flight->timestamp;
846 if (queue_type == QueueType::Transaction) {
847 this->transaction_message_queue.push_front(this->in_flight);
848 }
else if (queue_type == QueueType::Normal) {
849 this->normal_message_queue.push_front(this->in_flight);
851 if (is_start_transaction_message(*this->in_flight)) {
852 this->start_transaction_message_retry_callback(this->in_flight->message[MESSAGE_ID],
855 this->notify_queue_timer.at(
857 this->new_message =
true;
858 this->cv.notify_all();
860 this->in_flight->timestamp.to_time_point());
862 EVLOG_error <<
"Could not deliver message within the configured amount of attempts, "
864 if (enhanced_message_opt) {
865 this->in_flight->promise.set_value(enhanced_message_opt.value());
868 enhanced_message.
offline =
true;
869 this->in_flight->promise.set_value(enhanced_message);
873 this->database_handler->remove_message_queue_message(this->in_flight->initial_unique_id,
876 EVLOG_warning <<
"Could not delete message from transaction queue: " << e.what();
877 }
catch (
const std::exception& e) {
878 EVLOG_warning <<
"Could not delete message from transaction queue: " << e.what();
881 }
else if (is_boot_notification_message(this->in_flight->messageType)) {
882 EVLOG_warning <<
"Message is BootNotification.req and will therefore be sent again";
884 this->in_flight->message[MESSAGE_ID] = ocpp::create_message_id();
887 this->in_flight->timestamp =
888 DateTime(this->in_flight->timestamp.to_time_point() +
889 std::chrono::seconds(this->config.boot_notification_retry_interval_seconds));
890 this->normal_message_queue.push_front(this->in_flight);
891 this->notify_queue_timer.at(
893 this->new_message =
true;
894 this->cv.notify_all();
896 this->in_flight->timestamp.to_time_point());
898 EVLOG_warning <<
"Message is not transaction related, dropping it";
899 if (enhanced_message_opt) {
900 this->in_flight->promise.set_value(enhanced_message_opt.value());
903 enhanced_message.
offline =
true;
904 this->in_flight->promise.set_value(enhanced_message);
907 this->reset_in_flight();
908 this->cv.notify_all();
913 EVLOG_debug <<
"stop()";
915 this->running =
false;
916 this->cv.notify_one();
917 this->worker_thread.join();
918 EVLOG_debug <<
"stop() notified message queue";
923 EVLOG_debug <<
"pause()";
924 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
925 this->pause_resume_ctr++;
926 this->resume_timer.stop();
928 this->resuming =
false;
929 this->cv.notify_one();
930 EVLOG_debug <<
"pause() notified message queue";
934 void resume(std::chrono::seconds delay_on_reconnect) {
935 EVLOG_debug <<
"resume() called";
936 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
940 this->pause_resume_ctr++;
942 if (this->pause_resume_ctr > 1 && delay_on_reconnect > std::chrono::seconds(0)) {
943 this->resuming =
true;
944 EVLOG_debug <<
"Delaying message queue resume by " << delay_on_reconnect.count() <<
" seconds";
945 u_int64_t expected_pause_resume_ctr = this->pause_resume_ctr;
946 this->resume_timer.timeout(
947 [
this, expected_pause_resume_ctr] { this->resume_now(expected_pause_resume_ctr); }, delay_on_reconnect);
949 this->resume_now(this->pause_resume_ctr);
953 void set_registration_status_accepted() {
955 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
956 this->is_registration_status_accepted =
true;
958 this->cv.notify_all();
961 bool is_transaction_message_queue_empty() {
962 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
963 return this->transaction_message_queue.empty();
966 bool contains_transaction_messages(
const CiString<36> transaction_id) {
967 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
968 for (
const auto control_message : this->transaction_message_queue) {
969 if (control_message->messageType == v2::MessageType::TransactionEvent) {
970 v2::TransactionEventRequest req = control_message->message.at(CALL_PAYLOAD);
971 if (req.transactionInfo.transactionId == transaction_id) {
979 bool contains_stop_transaction_message(
const int32_t transaction_id) {
980 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
981 for (
const auto control_message : this->transaction_message_queue) {
982 if (control_message->messageType == v16::MessageType::StopTransaction) {
983 v16::StopTransactionRequest req = control_message->message.at(CALL_PAYLOAD);
984 if (req.transactionId == transaction_id) {
994 this->config.transaction_message_attempts = transaction_message_attempts;
999 this->config.transaction_message_retry_interval = transaction_message_retry_interval;
1004 this->config.message_timeout_seconds = timeout;
1010 EVLOG_debug <<
"adding " << stop_transaction_message_id <<
" for transaction " << transaction_id;
1011 this->message_id_transaction_id_map[stop_transaction_message_id] = transaction_id;
1014 void add_meter_value_message_id(
const std::string& start_transaction_message_id,
1015 const std::string& meter_value_message_id) {
1016 if (this->start_transaction_mid_meter_values_mid_map.count(start_transaction_message_id)) {
1017 this->start_transaction_mid_meter_values_mid_map.at(start_transaction_message_id)
1018 .push_back(meter_value_message_id);
1020 std::vector<std::string> meter_value_message_ids;
1021 meter_value_message_ids.push_back(meter_value_message_id);
1022 this->start_transaction_mid_meter_values_mid_map[start_transaction_message_id] = meter_value_message_ids;
1026 void notify_start_transaction_handled(
const std::string& start_transaction_message_id,
1027 const int32_t transaction_id) {
1028 this->cv.notify_one();
1032 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
1033 if (this->start_transaction_mid_meter_values_mid_map.count(start_transaction_message_id)) {
1034 for (
auto it = this->transaction_message_queue.begin(); it != transaction_message_queue.end(); ++it) {
1035 for (
const auto& meter_value_message_id :
1036 this->start_transaction_mid_meter_values_mid_map.at(start_transaction_message_id)) {
1038 if (meter_value_message_id == (*it)->message.at(1)) {
1039 EVLOG_debug <<
"Adding transactionId " << transaction_id <<
" to MeterValue.req";
1040 (*it)->message.at(3)[
"transactionId"] = transaction_id;
1045 this->start_transaction_mid_meter_values_mid_map.erase(start_transaction_message_id);
1048 M string_to_messagetype(
const std::string& s);
1049 std::string messagetype_to_string(M m);
Contains a DateTime implementation that can parse and create RFC 3339 compatible strings.
Definition: types.hpp:109
Contains a MessageId implementation based on a case insensitive string with a maximum length of 36 pr...
Definition: call_types.hpp:34
contains a message queue that makes sure that OCPPs synchronicity requirements are met
Definition: message_queue.hpp:164
MessageQueue(const std::function< bool(json message)> &send_callback, const MessageQueueConfig< M > &config, const std::vector< M > &external_notify, std::shared_ptr< common::DatabaseHandlerCommon > database_handler, const std::function< void(const std::string &new_message_id, const std::string &old_message_id)> start_transaction_message_retry_callback=[](const std::string &new_message_id, const std::string &old_message_id) {})
Creates a new MessageQueue object with the provided configuration and send_callback.
Definition: message_queue.hpp:389
void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id)
Adds the given transaction_id to the message_id_transaction_id_map using the key stop_transaction_mes...
Definition: message_queue.hpp:1009
std::future< EnhancedMessage< M > > push_call_async(const json &call)
pushes a new call message onto the message queue
Definition: message_queue.hpp:687
void pause()
Pauses the message queue.
Definition: message_queue.hpp:922
void update_transaction_message_retry_interval(const int transaction_message_retry_interval)
Set transaction_message_retry_interval to given transaction_message_retry_interval in seconds.
Definition: message_queue.hpp:998
void handle_timeout_or_callerror(const std::optional< EnhancedMessage< M > > &enhanced_message_opt)
Handles a message timeout or a CALLERROR. enhanced_message_opt is set only in case of CALLERROR.
Definition: message_queue.hpp:809
void resume(std::chrono::seconds delay_on_reconnect)
Resumes the message queue.
Definition: message_queue.hpp:934
void stop()
Stops the message queue.
Definition: message_queue.hpp:912
void reset_next_message_to_send()
Resets next message to send. Can be used in situation when we dont want to reply to a CALL message.
Definition: message_queue.hpp:574
void push_call_result(const json &call_result)
Sends a new call_result message over the websocket.
Definition: message_queue.hpp:649
void get_persisted_messages_from_db(bool ignore_security_event_notifications=false)
Gets all persisted messages of normal message queue and persisted message queue from the database.
Definition: message_queue.hpp:580
void update_message_timeout(const int timeout)
Set message_timeout to given timeout (in seconds)
Definition: message_queue.hpp:1003
EnhancedMessage< M > receive(std::string_view message)
Enhances a received json_message with additional meta information, checks if it is a valid CallResult...
Definition: message_queue.hpp:717
void push_call_error(CallError call_error)
Sends a new call_error message over the websocket.
Definition: message_queue.hpp:667
void update_transaction_message_attempts(const int transaction_message_attempts)
Set transaction_message_attempts to given transaction_message_attempts.
Definition: message_queue.hpp:993
Exception for errors during query execution.
Definition: database_exceptions.hpp:49
Contains a OCPP CallError message.
Definition: call_types.hpp:143
This contains an internal control message.
Definition: message_queue.hpp:65
std::promise< EnhancedMessage< M > > promise
A promise used by the async send interface.
Definition: message_queue.hpp:69
bool is_transaction_update_message() const
True for transactional messages containing updates (measurements) for a transaction.
MessageId uniqueId() const
Provides the unique message ID stored in the message.
Definition: message_queue.hpp:79
int32_t message_attempts
The number of times this message has been rejected by the central system.
Definition: message_queue.hpp:68
ControlMessage(const json &message, const bool stall_until_accepted=false)
Creates a new ControlMessage object from the provided message.
json::array_t message
The OCPP message as a json array.
Definition: message_queue.hpp:66
M messageType
The OCPP message type.
Definition: message_queue.hpp:67
DateTime timestamp
A timestamp that shows when this message can be sent.
Definition: message_queue.hpp:70
Contains a OCPP message in json form with additional information.
Definition: message_queue.hpp:54
json call_message
If the message is a CALLRESULT or CALLERROR this can contain the original CALL message.
Definition: message_queue.hpp:60
json message
The OCPP message as json.
Definition: message_queue.hpp:55
M messageType
The OCPP message type.
Definition: message_queue.hpp:58
MessageTypeId messageTypeId
The OCPP message type ID (CALL/CALLRESULT/CALLERROR)
Definition: message_queue.hpp:59
MessageId uniqueId
The unique ID of the json message.
Definition: message_queue.hpp:57
size_t message_size
size of the json message in bytes
Definition: message_queue.hpp:56
bool offline
A flag indicating if the connection to the central system is offline.
Definition: message_queue.hpp:61
Definition: message_queue.hpp:29
bool check_queue(const M &message_type)
Returns true if the given message_type shall be queued based on the configuration of queue_all_messag...
Definition: message_queue.hpp:48
Definition: database_handler_common.hpp:16