ocpp 0.24.1
A C++ implementation of the Open Charge Point Protocol
message_queue.hpp
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest
3#ifndef OCPP_COMMON_MESSAGE_QUEUE_HPP
4#define OCPP_COMMON_MESSAGE_QUEUE_HPP
5
6#include <chrono>
7#include <condition_variable>
8#include <deque>
9#include <future>
10#include <mutex>
11#include <queue>
12#include <set>
13#include <thread>
14
15#include <everest/timer.hpp>
16
17#include <ocpp/common/call_types.hpp>
18#include <ocpp/common/database/database_handler_common.hpp>
19#include <ocpp/common/types.hpp>
20#include <ocpp/v16/messages/StopTransaction.hpp>
21#include <ocpp/v16/types.hpp>
22#include <ocpp/v2/messages/TransactionEvent.hpp>
23#include <ocpp/v2/types.hpp>
24
25namespace ocpp {
26
27using QueryExecutionException = common::QueryExecutionException;
28
29template <typename M> struct MessageQueueConfig {
30 int transaction_message_attempts;
31 int transaction_message_retry_interval; // seconds
32
33 // threshold for the accumulated sizes of the queues; if the queues exceed this limit,
34 // messages are potentially dropped in accordance with OCPP 2.0.1. Specification (cf. QueueAllMessages parameter)
35 int queues_total_size_threshold;
36
37 bool queue_all_messages{false}; // cf. OCPP 2.0.1. "QueueAllMessages" in OCPPCommCtrlr
38 std::set<M> message_types_discard_for_queueing; // allows to discard certain message types for offline queuing (e.g.
39 // Heartbeat)
40
41 int message_timeout_seconds = 30;
42 int boot_notification_retry_interval_seconds =
43 60; // interval for BootNotification.req in case response by CSMS is CALLERROR or CSMS does not respond at all
44 // (within specified MessageTimeout)
45
48 bool check_queue(const M& message_type) {
49 return queue_all_messages and !message_types_discard_for_queueing.count(message_type);
50 };
51};
52
54template <typename M> struct EnhancedMessage {
55 json message;
56 size_t message_size;
58 M messageType = M::InternalError;
59 MessageTypeId messageTypeId;
61 bool offline = false;
62};
63
65template <typename M> struct ControlMessage {
66 json::array_t message;
69 std::promise<EnhancedMessage<M>> promise;
71 MessageId initial_unique_id;
72 bool stall_until_accepted; // if true, message shall be sent only if registration status is accepted
73
75 explicit ControlMessage(const json& message, const bool stall_until_accepted = false);
76
79 [[nodiscard]] MessageId uniqueId() const {
80 return this->message[MESSAGE_ID];
81 }
82
85};
86
88enum class MessageTransmissionPriority {
89 SendImmediately, // message can be queued and can be send immediately
90 SendAfterRegistrationStatusAccepted, // message can be queued and shall be send only if registration status is
91 // accepted
92 Discard // message shall be discarded and not be sent
93};
94
96inline MessageTransmissionPriority get_message_transmission_priority(bool is_boot_notification_message, bool triggered,
97 bool registration_already_accepted,
98 bool is_transaction_related,
99 bool queue_all_message) {
100 if (registration_already_accepted || is_boot_notification_message || triggered) {
101 return MessageTransmissionPriority::SendImmediately;
102 }
103
104 if (is_transaction_related || queue_all_message) {
105 return MessageTransmissionPriority::SendAfterRegistrationStatusAccepted;
106 }
107
108 return MessageTransmissionPriority::Discard;
109};
110
114bool is_transaction_message(const ocpp::v16::MessageType message_type);
115
119bool is_transaction_message(const ocpp::v2::MessageType message_type);
120
124bool is_start_transaction_message(const ocpp::v16::MessageType message_type);
125
129bool is_start_transaction_message(const ocpp::v2::MessageType message_type);
130
132template <typename M> auto is_start_transaction_message(const ControlMessage<M>& control_message) {
133 return is_start_transaction_message(control_message.messageType);
134}
135
137template <typename M> auto is_transaction_message(const ControlMessage<M>& control_message) {
138 return is_transaction_message(control_message.messageType);
139}
143bool is_boot_notification_message(const ocpp::v16::MessageType message_type);
144
148bool is_boot_notification_message(const ocpp::v2::MessageType message_type);
149
150template <typename M>
151bool allowed_to_send_message(const ControlMessage<M>& message, const DateTime& time,
152 const bool is_registration_status_accepted) {
153 if (message.stall_until_accepted and !is_registration_status_accepted) {
154 return false;
155 }
156
157 if (message.timestamp > time) {
158 return false;
159 }
160 return true;
161}
162
164template <typename M> class MessageQueue {
165private:
167 std::shared_ptr<ocpp::common::DatabaseHandlerCommon> database_handler;
168
169 std::thread worker_thread;
171 std::deque<std::shared_ptr<ControlMessage<M>>> transaction_message_queue;
173 std::deque<std::shared_ptr<ControlMessage<M>>> normal_message_queue;
174 std::shared_ptr<ControlMessage<M>> in_flight;
175 std::recursive_mutex message_mutex;
176 std::condition_variable_any cv;
177 std::function<bool(json message)> send_callback;
178 std::vector<M> external_notify;
179 bool paused;
180 // Transiently true while the queue is paused, but is waiting to unpause
181 bool resuming;
182 bool running;
183 bool new_message;
184 bool is_registration_status_accepted;
185 std::recursive_mutex next_message_mutex;
186 std::optional<MessageId> next_message_to_send;
187
188 Everest::SteadyTimer in_flight_timeout_timer;
189 Everest::SteadyTimer notify_queue_timer;
190
191 // This timer schedules the resumption of the message queue
192 Everest::SteadyTimer resume_timer;
193 // Counts the number of pause()/resume() calls.
194 // Used by the resume timer callback to abort itself in case the timer triggered before it could be cancelled.
195 u_int64_t pause_resume_ctr = 0;
196
197 // key is the message id of the stop transaction and the value is the transaction id
198 // this map is used for StopTransaction.req that have been put on the message queue without having received a
199 // transactionId from the backend (e.g. when offline) it is used to replace the transactionId in the
200 // StopTransaction.req
201 std::map<std::string, int32_t> message_id_transaction_id_map;
202
203 // key is the message id of a StartTransaction.req and value is a list of MeterValue.req message ids. It is used to
204 // replace the transactionId within the MeterValue.req in case the transactionId was unknown at the time the message
205 // was queued. This can happen when the CP has not received a StartTransaction.conf from the CSMS.
206 std::map<std::string, std::vector<std::string>> start_transaction_mid_meter_values_mid_map;
207
208 // This callback is called when a StartTransaction.req message could not be delivered due to a timeout or CALL_ERROR
209 std::function<void(const std::string& new_message_id, const std::string& old_message_id)>
210 start_transaction_message_retry_callback;
211
212 MessageId getMessageId(const json::array_t& json_message) {
213 return MessageId(json_message.at(MESSAGE_ID).get<std::string>());
214 }
215 MessageTypeId getMessageTypeId(const json::array_t& json_message) {
216 if (json_message.size() > 0) {
217 auto messageTypeId = json_message.at(MESSAGE_TYPE_ID);
218 if (messageTypeId == MessageTypeId::CALL) {
219 return MessageTypeId::CALL;
220 }
221 if (messageTypeId == MessageTypeId::CALLRESULT) {
222 return MessageTypeId::CALLRESULT;
223 }
224 if (messageTypeId == MessageTypeId::CALLERROR) {
225 return MessageTypeId::CALLERROR;
226 }
227 }
228
229 return MessageTypeId::UNKNOWN;
230 }
231 bool isValidMessageType(const json::array_t& json_message) {
232 if (this->getMessageTypeId(json_message) != MessageTypeId::UNKNOWN) {
233 return true;
234 }
235 return false;
236 }
237
238 void add_to_normal_message_queue(std::shared_ptr<ControlMessage<M>> message) {
239 EVLOG_debug << "Adding message to normal message queue";
240 {
241 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
242 // A BootNotification message should always jump the queue
243 if (message->messageType == M::BootNotification) {
244 this->normal_message_queue.push_front(message);
245 } else {
246 this->normal_message_queue.push_back(message);
247 }
248 if (this->config.check_queue(message->messageType)) {
250 message->message, messagetype_to_string(message->messageType), message->message_attempts,
251 message->timestamp, message->uniqueId()};
252 try {
253 this->database_handler->insert_message_queue_message(db_message, QueueType::Normal);
254 } catch (const QueryExecutionException& e) {
255 EVLOG_warning << "Could not insert message into transaction queue: " << e.what();
256 }
257 }
258 this->new_message = true;
259 this->check_queue_sizes();
260 }
261 this->cv.notify_all();
262 EVLOG_debug << "Notified message queue worker";
263 }
264 void add_to_transaction_message_queue(std::shared_ptr<ControlMessage<M>> message) {
265 EVLOG_debug << "Adding message to transaction message queue";
266 {
267 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
268 this->transaction_message_queue.push_back(message);
269 ocpp::common::DBTransactionMessage db_message{message->message, messagetype_to_string(message->messageType),
270 message->message_attempts, message->timestamp,
271 message->uniqueId()};
272 try {
273 this->database_handler->insert_message_queue_message(db_message);
274 } catch (const QueryExecutionException& e) {
275 EVLOG_warning << "Could not insert message into transaction queue: " << e.what();
276 }
277 this->new_message = true;
278 this->check_queue_sizes();
279 }
280 this->cv.notify_all();
281 EVLOG_debug << "Notified message queue worker";
282 }
283
284 void check_queue_sizes() {
285 if (this->transaction_message_queue.size() + this->normal_message_queue.size() <=
286 this->config.queues_total_size_threshold) {
287 return;
288 }
289 EVLOG_warning << "Queue sizes exceed threshold (" << this->config.queues_total_size_threshold << ") with "
290 << this->transaction_message_queue.size() << " transaction and "
291 << this->normal_message_queue.size() << " normal messages in queue";
292
293 while (this->transaction_message_queue.size() + this->normal_message_queue.size() >
294 this->config.queues_total_size_threshold &&
295 !this->normal_message_queue.empty()) {
296 this->drop_messages_from_normal_message_queue();
297 }
298
299 while (this->transaction_message_queue.size() + this->normal_message_queue.size() >
300 this->config.queues_total_size_threshold &&
301 this->drop_update_messages_from_transactional_message_queue()) {
302 }
303 }
304
305 void drop_messages_from_normal_message_queue() {
306 // try to drop approx 10% of the allowed size (at least 1)
307 int number_of_dropped_messages = std::min((int)this->normal_message_queue.size(),
308 std::max(this->config.queues_total_size_threshold / 10, 1));
309
310 EVLOG_warning << "Dropping " << number_of_dropped_messages << " messages from normal message queue.";
311
312 for (int i = 0; i < number_of_dropped_messages; i++) {
313 if (this->config.queue_all_messages) {
314 try {
315 database_handler->remove_message_queue_message(
316 this->normal_message_queue.front()->initial_unique_id, QueueType::Normal);
317 } catch (const QueryExecutionException& e) {
318 EVLOG_warning << "Could not delete message from transaction queue: " << e.what();
319 } catch (const std::exception& e) {
320 EVLOG_warning << "Could not delete message from transaction queue: " << e.what();
321 }
322 }
323 this->normal_message_queue.pop_front();
324 }
325 }
326
333 bool drop_update_messages_from_transactional_message_queue() {
334 int drop_count = 0;
335 std::deque<std::shared_ptr<ControlMessage<M>>> temporary_swap_queue;
336 bool remove_next_update_message = true;
337 while (!transaction_message_queue.empty()) {
338 auto element = transaction_message_queue.front();
339 transaction_message_queue.pop_front();
340 // drop every second update message (except last one)
341 if (remove_next_update_message && element->is_transaction_update_message() &&
342 transaction_message_queue.size() > 1) {
343 EVLOG_debug << "Drop transactional message " << element->initial_unique_id;
344 try {
345 database_handler->remove_message_queue_message(element->initial_unique_id);
346 } catch (const QueryExecutionException& e) {
347 EVLOG_warning << "Could not delete message from transaction queue: " << e.what();
348 } catch (const std::exception& e) {
349 EVLOG_warning << "Could not delete message from transaction queue: " << e.what();
350 }
351 drop_count++;
352 remove_next_update_message = false;
353 } else {
354 remove_next_update_message = true;
355 temporary_swap_queue.push_back(element);
356 }
357 }
358
359 std::swap(transaction_message_queue, temporary_swap_queue);
360
361 if (drop_count > 0) {
362 EVLOG_warning << "Dropped " << drop_count << " transactional update messages to reduce queue size.";
363 return true;
364 } else {
365 EVLOG_warning << "There are no further transaction update messages to drop!";
366 return false;
367 }
368 }
369
370 // The public resume() delegates the actual resumption to this method
371 void resume_now(u_int64_t expected_pause_resume_ctr) {
372 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
373 if (this->pause_resume_ctr == expected_pause_resume_ctr) {
374 this->paused = false;
375 this->resuming = false;
376 this->cv.notify_one();
377 EVLOG_debug << "resume() notified message queue";
378 }
379 }
380
381 // Computes the current message timeout = interval * attempt + message timeout
382 std::chrono::seconds current_message_timeout(unsigned int attempt) {
383 return std::chrono::seconds(this->config.message_timeout_seconds +
384 (this->config.transaction_message_retry_interval * attempt));
385 }
386
387public:
390 const std::function<bool(json message)>& send_callback, const MessageQueueConfig<M>& config,
391 const std::vector<M>& external_notify, std::shared_ptr<common::DatabaseHandlerCommon> database_handler,
392 const std::function<void(const std::string& new_message_id, const std::string& old_message_id)>
393 start_transaction_message_retry_callback =
394 [](const std::string& new_message_id, const std::string& old_message_id) {}) :
395 database_handler(std::move(database_handler)),
396 config(config),
397 external_notify(external_notify),
398 paused(true),
399 resuming(false),
400 running(true),
401 new_message(false),
402 is_registration_status_accepted(false),
403 start_transaction_message_retry_callback(start_transaction_message_retry_callback) {
404
405 this->send_callback = send_callback;
406 this->in_flight = nullptr;
407 }
408
409 MessageQueue(const std::function<bool(json message)>& send_callback, const MessageQueueConfig<M>& config,
410 std::shared_ptr<common::DatabaseHandlerCommon> databaseHandler) :
411 MessageQueue(send_callback, config, {}, databaseHandler) {
412 }
413
414 void start() {
415 this->worker_thread = std::thread([this]() {
416 // TODO(kai): implement message timeout
417 while (this->running) {
418 EVLOG_debug << "Waiting for a message from the message queue";
419
420 std::unique_lock<std::recursive_mutex> lk(this->message_mutex);
421 using namespace std::chrono_literals;
422 // It's safe to wait on the cv here because we're guaranteed to only lock this->message_mutex once
423 this->cv.wait(lk, [this]() {
424 return !this->running || (!this->paused && this->new_message && this->in_flight == nullptr);
425 });
426 if (this->transaction_message_queue.empty() && this->normal_message_queue.empty()) {
427 // There is nothing in the message queue, not progressing further
428 continue;
429 }
430 EVLOG_debug << "There are " << this->normal_message_queue.size()
431 << " messages in the normal message queue.";
432 EVLOG_debug << "There are " << this->transaction_message_queue.size()
433 << " messages in the transaction message queue.";
434
435 if (this->paused) {
436 // Message queue is paused, not progressing further
437 continue;
438 }
439
440 if (this->in_flight != nullptr) {
441 // There already is a message in flight, not progressing further
442 continue;
443 } else {
444 EVLOG_debug << "There is no message in flight, checking message queue for a new message.";
445 }
446
447 // prioritize the message with the oldest timestamp
448 std::shared_ptr<ControlMessage<M>> message = nullptr;
449 QueueType queue_type = QueueType::None;
450 const auto now = DateTime();
451
452 // Find the first allowed normal message
453 auto selected_normal_message_it =
454 std::find_if(normal_message_queue.begin(), normal_message_queue.end(),
455 [&](const std::shared_ptr<ControlMessage<M>>& msg) {
456 return allowed_to_send_message(*msg, now, this->is_registration_status_accepted);
457 });
458
459 if (selected_normal_message_it != normal_message_queue.end()) {
460 message = *selected_normal_message_it;
461 queue_type = QueueType::Normal;
462 }
463
464 auto is_transaction_message_available = [&](const std::shared_ptr<ControlMessage<M>>& msg) {
465 if (!allowed_to_send_message(*msg, now, this->is_registration_status_accepted)) {
466 return false;
467 }
468 // no message selected from normal message queue, so select transaction message
469 if (message == nullptr) {
470 return true;
471 }
472 // message from normal message queue is BootNotification, this is prioritized
473 if (message->messageType == M::BootNotification) {
474 return false;
475 }
476 // transaction messages is older than normal message, so select transaction message
477 if (msg->timestamp <= message->timestamp) {
478 return true;
479 }
480 return false;
481 };
482
483 // Transaction messages must persist the order, so only check the first in the queue
484 auto selected_transaction_message_it =
485 (!transaction_message_queue.empty() and
486 is_transaction_message_available(transaction_message_queue.front()))
487 ? transaction_message_queue.begin()
488 : transaction_message_queue.end();
489
490 if (selected_transaction_message_it != transaction_message_queue.end()) {
491 message = *selected_transaction_message_it;
492 queue_type = QueueType::Transaction;
493 }
494
495 if (message == nullptr) {
496 EVLOG_debug << "No message in queue ready to be sent yet";
497 this->new_message = false;
498 continue;
499 }
500
501 {
502 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
503 if (next_message_to_send.has_value()) {
504 if (next_message_to_send.value() != message->uniqueId()) {
505 EVLOG_debug << "Message with id " << message->uniqueId()
506 << " held back because message with id " << next_message_to_send.value()
507 << " should be sent first";
508 continue;
509 }
510 }
511 }
512
513 EVLOG_debug << "Attempting to send message to central system. UID: " << message->uniqueId()
514 << " attempt#: " << message->message_attempts;
515 this->in_flight = message;
516 this->in_flight->message_attempts += 1;
517
518 if (this->message_id_transaction_id_map.count(this->in_flight->message.at(1))) {
519 EVLOG_debug << "Replacing transaction id";
520 this->in_flight->message.at(3)["transactionId"] =
521 this->message_id_transaction_id_map.at(this->in_flight->message.at(1));
522 this->message_id_transaction_id_map.erase(this->in_flight->message.at(1));
523 }
524
525 if (!this->send_callback(this->in_flight->message)) {
526 this->paused = true;
527 EVLOG_error << "Could not send message, this is most likely because the charge point is offline.";
528 if (this->in_flight && is_transaction_message(*this->in_flight)) {
529 EVLOG_info << "The message in flight is transaction related and will be sent again once the "
530 "connection can be established again.";
531 if (this->in_flight->message.at(CALL_ACTION) == "TransactionEvent") {
532 this->in_flight->message.at(CALL_PAYLOAD)["offline"] = true;
533 }
534 } else if (this->config.check_queue(this->in_flight->messageType)) {
535 EVLOG_info << "The message in flight will be sent again once the connection can be "
536 "established again since QueueAllMessages is set to 'true'.";
537 } else {
538 EVLOG_info << "The message in flight is not transaction related and will be dropped";
539 if (queue_type == QueueType::Normal) {
540 EnhancedMessage<M> enhanced_message;
541 enhanced_message.offline = true;
542 this->in_flight->promise.set_value(enhanced_message);
543 this->normal_message_queue.pop_front();
544 }
545 }
546 this->reset_in_flight();
547 } else {
548 EVLOG_debug << "Successfully sent message. UID: " << this->in_flight->uniqueId();
549 this->in_flight_timeout_timer.timeout([this]() { this->handle_timeout_or_callerror(std::nullopt); },
550 this->current_message_timeout(message->message_attempts));
551 switch (queue_type) {
552 case QueueType::Normal:
553 this->normal_message_queue.erase(selected_normal_message_it);
554 break;
555 case QueueType::Transaction:
556 this->transaction_message_queue.erase(selected_transaction_message_it);
557 break;
558 case QueueType::None:
559 // do nothing
560 break;
561 }
562 }
563 if (this->transaction_message_queue.empty() && this->normal_message_queue.empty()) {
564 this->new_message = false;
565 }
566 lk.unlock();
567 cv.notify_one();
568 }
569 EVLOG_info << "Message queue stopped processing messages";
570 });
571 }
572
575 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
576 this->next_message_to_send.reset();
577 }
578
580 void get_persisted_messages_from_db(bool ignore_security_event_notifications = false) {
581 std::vector<QueueType> queue_types = {QueueType::Normal, QueueType::Transaction};
582 // do for Normal and Transaction queue
583 for (const auto queue_type : queue_types) {
584 const auto persisted_messages = database_handler->get_message_queue_messages(queue_type);
585 if (!persisted_messages.empty()) {
586 for (auto& persisted_message : persisted_messages) {
587
588 if (ignore_security_event_notifications &&
589 persisted_message.message_type == "SecurityEventNotification") {
590 try {
591 // remove from database in case SecurityEventNotification.req should not be sent
592 this->database_handler->remove_message_queue_message(persisted_message.unique_id,
593 queue_type);
594 } catch (const QueryExecutionException& e) {
595 EVLOG_warning << "Could not delete message from message queue: " << e.what();
596 } catch (const std::exception& e) {
597 EVLOG_warning << "Could not delete message from message queue: " << e.what();
598 }
599 } else {
600 std::shared_ptr<ControlMessage<M>> message =
601 std::make_shared<ControlMessage<M>>(persisted_message.json_message, true);
602 message->messageType = string_to_messagetype(persisted_message.message_type);
603 message->timestamp = persisted_message.timestamp;
604 message->message_attempts = persisted_message.message_attempts;
605
606 if (queue_type == QueueType::Normal) {
607 normal_message_queue.push_back(message);
608 } else if (queue_type == QueueType::Transaction) {
609 transaction_message_queue.push_back(message);
610 }
611 }
612 }
613 this->new_message = true;
614 }
615 }
616
617 if (!this->config.queue_all_messages) {
618 // make sure to clear normal message queue table in case queue_all_messages is false, since without clearing
619 // it here messages would not be removed in handle_call_result or handle_call_timeout_or_error
620 this->database_handler->clear_message_queue(QueueType::Normal);
621 }
622 }
623
624 void push_call(const json& message, const bool stall_until_accepted = false) {
625 if (!running) {
626 return;
627 }
628
629 auto control_message = std::make_shared<ControlMessage<M>>(message, stall_until_accepted);
630 if (is_transaction_message(*control_message)) {
631 // according to the spec the "transaction related messages" StartTransaction, StopTransaction and
632 // MeterValues have to be delivered in chronological order
633
634 // intentionally break this message for testing...
635 // message->message[CALL_PAYLOAD]["broken"] = ocpp::create_message_id();
636 this->add_to_transaction_message_queue(control_message);
637 } else {
638 // all other messages are allowed to "jump the queue" to improve user experience
639 // TODO: decide if we only want to allow this for a subset of messages
640 if (!this->paused || this->resuming || this->config.check_queue(control_message->messageType) ||
641 control_message->messageType == M::BootNotification) {
642 this->add_to_normal_message_queue(control_message);
643 }
644 }
645 this->cv.notify_all();
646 }
647
649 void push_call_result(const json& call_result) {
650 if (!running) {
651 return;
652 }
653 this->send_callback(call_result);
654 {
655 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
656 if (next_message_to_send.has_value()) {
657 if (next_message_to_send.value() == call_result.at(MESSAGE_ID)) {
658 next_message_to_send.reset();
659 }
660 }
661 }
662
663 this->cv.notify_all();
664 }
665
667 void push_call_error(CallError call_error) {
668 if (!running) {
669 return;
670 }
671
672 this->send_callback(call_error);
673 {
674 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
675 if (next_message_to_send.has_value()) {
676 if (next_message_to_send.value() == call_error.uniqueId) {
677 next_message_to_send.reset();
678 }
679 }
680 }
681
682 this->cv.notify_all();
683 }
684
687 std::future<EnhancedMessage<M>> push_call_async(const json& call) {
688 auto message = std::make_shared<ControlMessage<M>>(call);
689
690 if (!running) {
691 auto enhanced_message = EnhancedMessage<M>();
692 enhanced_message.offline = true;
693 message->promise.set_value(enhanced_message);
694 } else if (is_transaction_message(message->messageType)) {
695 // according to the spec the "transaction related messages" StartTransaction, StopTransaction and
696 // MeterValues have to be delivered in chronological order
697 this->add_to_transaction_message_queue(message);
698 } else {
699 // all other messages are allowed to "jump the queue" to improve user experience
700 // TODO: decide if we only want to allow this for a subset of messages
701 if (this->paused && !this->config.check_queue(message->messageType) && !this->resuming &&
702 message->messageType != M::BootNotification) {
703 // do not add a normal message to the queue if the queue is paused/offline
704 auto enhanced_message = EnhancedMessage<M>();
705 enhanced_message.offline = true;
706 message->promise.set_value(enhanced_message);
707 } else {
708 this->add_to_normal_message_queue(message);
709 }
710 }
711 return message->promise.get_future();
712 }
713
717 EnhancedMessage<M> receive(std::string_view message) {
718 EnhancedMessage<M> enhanced_message;
719
720 enhanced_message.message = json::parse(message);
721 enhanced_message.uniqueId = this->getMessageId(enhanced_message.message);
722 enhanced_message.messageTypeId = this->getMessageTypeId(enhanced_message.message);
723
724 if (enhanced_message.messageTypeId == MessageTypeId::CALL) {
725 enhanced_message.messageType = this->string_to_messagetype(enhanced_message.message.at(CALL_ACTION));
726 enhanced_message.call_message = enhanced_message.message;
727
728 {
729 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
730 // save the uid of the message we just received to ensure the next message we send is a response to
731 // this message
732 next_message_to_send.emplace(enhanced_message.uniqueId);
733 }
734 }
735
736 // TODO(kai): what happens if we receive a CallResult or CallError out of order?
737 if (enhanced_message.messageTypeId == MessageTypeId::CALLRESULT ||
738 enhanced_message.messageTypeId == MessageTypeId::CALLERROR) {
739 {
740 std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
741 next_message_to_send.reset();
742 }
743 // we need to remove Call messages from in_flight if we receive a CallResult OR a CallError
744
745 // TODO(kai): we need to do some error handling in the CallError case
746 std::unique_lock<std::recursive_mutex> lk(this->message_mutex);
747 if (this->in_flight == nullptr) {
748 EVLOG_error << "Received a CALLRESULT OR CALLERROR without a message in flight, this should not happen";
749 return enhanced_message;
750 }
751 if (this->in_flight->uniqueId() != enhanced_message.uniqueId) {
752 EVLOG_error << "Received a CALLRESULT OR CALLERROR with mismatching uid: "
753 << this->in_flight->uniqueId() << " != " << enhanced_message.uniqueId;
754 return enhanced_message;
755 }
756 if (enhanced_message.messageTypeId == MessageTypeId::CALLERROR) {
757 EVLOG_error << "Received a CALLERROR for message with UID: " << enhanced_message.uniqueId;
758 // make sure the original call message is attached to the callerror
759 enhanced_message.call_message = this->in_flight->message;
760 lk.unlock();
761 this->handle_timeout_or_callerror(enhanced_message);
762 } else {
763 this->handle_call_result(enhanced_message);
764 }
765 }
766
767 return enhanced_message;
768 }
769
770 void reset_in_flight() {
771 this->in_flight = nullptr;
772 this->in_flight_timeout_timer.stop();
773 }
774
775 void handle_call_result(EnhancedMessage<M>& enhanced_message) {
776 if (this->in_flight->uniqueId() == enhanced_message.uniqueId) {
777 enhanced_message.call_message = this->in_flight->message;
778 enhanced_message.messageType = this->string_to_messagetype(
779 this->in_flight->message.at(CALL_ACTION).template get<std::string>() + std::string("Response"));
780 this->in_flight->promise.set_value(enhanced_message);
781
782 const auto queue_type =
783 is_transaction_message(*this->in_flight) ? QueueType::Transaction : QueueType::Normal;
784 if (is_transaction_message(*this->in_flight) or this->config.check_queue(this->in_flight->messageType)) {
785 try {
786 // We only remove the message as soon as a response is received. Otherwise we might miss a message
787 // if the charging station just boots after sending, but before receiving the result.
788 this->database_handler->remove_message_queue_message(this->in_flight->initial_unique_id,
789 queue_type);
790 } catch (const QueryExecutionException& e) {
791 EVLOG_warning << "Could not delete message from message queue: " << e.what();
792 } catch (const std::exception& e) {
793 EVLOG_warning << "Could not delete message from message queue: " << e.what();
794 }
795 }
796 this->reset_in_flight();
797
798 // we want the start transaction response handler to be executed before the next message will be
799 // send in order to be able to replace the transaction id if necessary
800 // start transaction response handler will notify
801 if (std::find(this->external_notify.begin(), this->external_notify.end(), enhanced_message.messageType) ==
802 this->external_notify.end()) {
803 this->cv.notify_one();
804 }
805 }
806 }
807
809 void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) {
810 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
811 // We got a timeout iff enhanced_message_opt is empty. Otherwise, enhanced_message_opt contains the CallError.
812 bool timeout = !enhanced_message_opt.has_value();
813 if (timeout) {
814 EVLOG_warning << "Message timeout for: " << this->in_flight->messageType << " ("
815 << this->in_flight->uniqueId() << ")";
816 } else {
817 EVLOG_warning << "CALLERROR for: " << this->in_flight->messageType << " (" << this->in_flight->uniqueId()
818 << ")";
819 }
820
821 const auto queue_type = is_transaction_message(*this->in_flight) ? QueueType::Transaction : QueueType::Normal;
822 if (is_transaction_message(*this->in_flight) or this->config.check_queue(this->in_flight->messageType)) {
823 if (this->in_flight->message_attempts < this->config.transaction_message_attempts) {
824 EVLOG_warning << "Message shall be persisted and will therefore be sent again";
825 // Generate a new message ID for the retry
826 const auto old_message_id = this->in_flight->message[MESSAGE_ID];
827 this->in_flight->message[MESSAGE_ID] = ocpp::create_message_id();
828 if (this->config.transaction_message_retry_interval > 0) {
829 // exponential backoff
830 this->in_flight->timestamp =
831 DateTime(this->in_flight->timestamp.to_time_point() +
832 std::chrono::seconds(this->config.transaction_message_retry_interval) *
833 this->in_flight->message_attempts);
834 EVLOG_debug << "Retry interval > 0: " << this->config.transaction_message_retry_interval
835 << " attempting to retry message at: " << this->in_flight->timestamp;
836 } else {
837 // immediate retry
838 this->in_flight->timestamp = DateTime();
839 EVLOG_debug << "Retry interval of 0 means immediate retry";
840 }
841
842 EVLOG_warning << "Attempt: " << this->in_flight->message_attempts + 1 << "/"
843 << this->config.transaction_message_attempts << " will be sent at "
844 << this->in_flight->timestamp;
845
846 if (queue_type == QueueType::Transaction) {
847 this->transaction_message_queue.push_front(this->in_flight);
848 } else if (queue_type == QueueType::Normal) {
849 this->normal_message_queue.push_front(this->in_flight);
850 }
851 if (is_start_transaction_message(*this->in_flight)) {
852 this->start_transaction_message_retry_callback(this->in_flight->message[MESSAGE_ID],
853 old_message_id);
854 }
855 this->notify_queue_timer.at(
856 [this]() {
857 this->new_message = true;
858 this->cv.notify_all();
859 },
860 this->in_flight->timestamp.to_time_point());
861 } else {
862 EVLOG_error << "Could not deliver message within the configured amount of attempts, "
863 "dropping message";
864 if (enhanced_message_opt) {
865 this->in_flight->promise.set_value(enhanced_message_opt.value());
866 } else {
867 EnhancedMessage<M> enhanced_message;
868 enhanced_message.offline = true;
869 this->in_flight->promise.set_value(enhanced_message);
870 }
871 try {
872 // also drop the message from the database
873 this->database_handler->remove_message_queue_message(this->in_flight->initial_unique_id,
874 queue_type);
875 } catch (const QueryExecutionException& e) {
876 EVLOG_warning << "Could not delete message from transaction queue: " << e.what();
877 } catch (const std::exception& e) {
878 EVLOG_warning << "Could not delete message from transaction queue: " << e.what();
879 }
880 }
881 } else if (is_boot_notification_message(this->in_flight->messageType)) {
882 EVLOG_warning << "Message is BootNotification.req and will therefore be sent again";
883 // Generate a new message ID for the retry
884 this->in_flight->message[MESSAGE_ID] = ocpp::create_message_id();
885 // Spec does not define how to handle retries for BootNotification.req: We use the
886 // the boot_notification_retry_interval_seconds
887 this->in_flight->timestamp =
888 DateTime(this->in_flight->timestamp.to_time_point() +
889 std::chrono::seconds(this->config.boot_notification_retry_interval_seconds));
890 this->normal_message_queue.push_front(this->in_flight);
891 this->notify_queue_timer.at(
892 [this]() {
893 this->new_message = true;
894 this->cv.notify_all();
895 },
896 this->in_flight->timestamp.to_time_point());
897 } else {
898 EVLOG_warning << "Message is not transaction related, dropping it";
899 if (enhanced_message_opt) {
900 this->in_flight->promise.set_value(enhanced_message_opt.value());
901 } else {
902 EnhancedMessage<M> enhanced_message;
903 enhanced_message.offline = true;
904 this->in_flight->promise.set_value(enhanced_message);
905 }
906 }
907 this->reset_in_flight();
908 this->cv.notify_all();
909 }
910
912 void stop() {
913 EVLOG_debug << "stop()";
914 // stop the running thread
915 this->running = false;
916 this->cv.notify_one();
917 this->worker_thread.join();
918 EVLOG_debug << "stop() notified message queue";
919 }
920
922 void pause() {
923 EVLOG_debug << "pause()";
924 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
925 this->pause_resume_ctr++;
926 this->resume_timer.stop();
927 this->paused = true;
928 this->resuming = false;
929 this->cv.notify_one();
930 EVLOG_debug << "pause() notified message queue";
931 }
932
934 void resume(std::chrono::seconds delay_on_reconnect) {
935 EVLOG_debug << "resume() called";
936 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
937 if (!this->paused) {
938 return;
939 }
940 this->pause_resume_ctr++;
941 // Do not delay if this is the first call to resume(), i.e. this is the initial connection
942 if (this->pause_resume_ctr > 1 && delay_on_reconnect > std::chrono::seconds(0)) {
943 this->resuming = true;
944 EVLOG_debug << "Delaying message queue resume by " << delay_on_reconnect.count() << " seconds";
945 u_int64_t expected_pause_resume_ctr = this->pause_resume_ctr;
946 this->resume_timer.timeout(
947 [this, expected_pause_resume_ctr] { this->resume_now(expected_pause_resume_ctr); }, delay_on_reconnect);
948 } else {
949 this->resume_now(this->pause_resume_ctr);
950 }
951 }
952
953 void set_registration_status_accepted() {
954 {
955 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
956 this->is_registration_status_accepted = true;
957 }
958 this->cv.notify_all();
959 }
960
961 bool is_transaction_message_queue_empty() {
962 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
963 return this->transaction_message_queue.empty();
964 }
965
966 bool contains_transaction_messages(const CiString<36> transaction_id) {
967 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
968 for (const auto control_message : this->transaction_message_queue) {
969 if (control_message->messageType == v2::MessageType::TransactionEvent) {
970 v2::TransactionEventRequest req = control_message->message.at(CALL_PAYLOAD);
971 if (req.transactionInfo.transactionId == transaction_id) {
972 return true;
973 }
974 }
975 }
976 return false;
977 }
978
979 bool contains_stop_transaction_message(const int32_t transaction_id) {
980 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
981 for (const auto control_message : this->transaction_message_queue) {
982 if (control_message->messageType == v16::MessageType::StopTransaction) {
983 v16::StopTransactionRequest req = control_message->message.at(CALL_PAYLOAD);
984 if (req.transactionId == transaction_id) {
985 return true;
986 }
987 }
988 }
989 return false;
990 }
991
993 void update_transaction_message_attempts(const int transaction_message_attempts) {
994 this->config.transaction_message_attempts = transaction_message_attempts;
995 }
996
998 void update_transaction_message_retry_interval(const int transaction_message_retry_interval) {
999 this->config.transaction_message_retry_interval = transaction_message_retry_interval;
1000 }
1001
1003 void update_message_timeout(const int timeout) {
1004 this->config.message_timeout_seconds = timeout;
1005 }
1006
1009 void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id) {
1010 EVLOG_debug << "adding " << stop_transaction_message_id << " for transaction " << transaction_id;
1011 this->message_id_transaction_id_map[stop_transaction_message_id] = transaction_id;
1012 }
1013
1014 void add_meter_value_message_id(const std::string& start_transaction_message_id,
1015 const std::string& meter_value_message_id) {
1016 if (this->start_transaction_mid_meter_values_mid_map.count(start_transaction_message_id)) {
1017 this->start_transaction_mid_meter_values_mid_map.at(start_transaction_message_id)
1018 .push_back(meter_value_message_id);
1019 } else {
1020 std::vector<std::string> meter_value_message_ids;
1021 meter_value_message_ids.push_back(meter_value_message_id);
1022 this->start_transaction_mid_meter_values_mid_map[start_transaction_message_id] = meter_value_message_ids;
1023 }
1024 }
1025
1026 void notify_start_transaction_handled(const std::string& start_transaction_message_id,
1027 const int32_t transaction_id) {
1028 this->cv.notify_one();
1029
1030 // replace transaction id in meter values if start_transaction_message_id is present in map
1031 // this is necessary when the chargepoint queued MeterValue.req for a transaction with unknown transaction_id
1032 std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
1033 if (this->start_transaction_mid_meter_values_mid_map.count(start_transaction_message_id)) {
1034 for (auto it = this->transaction_message_queue.begin(); it != transaction_message_queue.end(); ++it) {
1035 for (const auto& meter_value_message_id :
1036 this->start_transaction_mid_meter_values_mid_map.at(start_transaction_message_id)) {
1037
1038 if (meter_value_message_id == (*it)->message.at(1)) {
1039 EVLOG_debug << "Adding transactionId " << transaction_id << " to MeterValue.req";
1040 (*it)->message.at(3)["transactionId"] = transaction_id;
1041 }
1042 }
1043 }
1044 }
1045 this->start_transaction_mid_meter_values_mid_map.erase(start_transaction_message_id);
1046 }
1047
1048 M string_to_messagetype(const std::string& s);
1049 std::string messagetype_to_string(M m);
1050};
1051
1052} // namespace ocpp
1053#endif // OCPP_COMMON_MESSAGE_QUEUE_HPP
Contains a DateTime implementation that can parse and create RFC 3339 compatible strings.
Definition: types.hpp:109
Contains a MessageId implementation based on a case insensitive string with a maximum length of 36 pr...
Definition: call_types.hpp:34
contains a message queue that makes sure that OCPPs synchronicity requirements are met
Definition: message_queue.hpp:164
MessageQueue(const std::function< bool(json message)> &send_callback, const MessageQueueConfig< M > &config, const std::vector< M > &external_notify, std::shared_ptr< common::DatabaseHandlerCommon > database_handler, const std::function< void(const std::string &new_message_id, const std::string &old_message_id)> start_transaction_message_retry_callback=[](const std::string &new_message_id, const std::string &old_message_id) {})
Creates a new MessageQueue object with the provided configuration and send_callback.
Definition: message_queue.hpp:389
void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id)
Adds the given transaction_id to the message_id_transaction_id_map using the key stop_transaction_mes...
Definition: message_queue.hpp:1009
std::future< EnhancedMessage< M > > push_call_async(const json &call)
pushes a new call message onto the message queue
Definition: message_queue.hpp:687
void pause()
Pauses the message queue.
Definition: message_queue.hpp:922
void update_transaction_message_retry_interval(const int transaction_message_retry_interval)
Set transaction_message_retry_interval to given transaction_message_retry_interval in seconds.
Definition: message_queue.hpp:998
void handle_timeout_or_callerror(const std::optional< EnhancedMessage< M > > &enhanced_message_opt)
Handles a message timeout or a CALLERROR. enhanced_message_opt is set only in case of CALLERROR.
Definition: message_queue.hpp:809
void resume(std::chrono::seconds delay_on_reconnect)
Resumes the message queue.
Definition: message_queue.hpp:934
void stop()
Stops the message queue.
Definition: message_queue.hpp:912
void reset_next_message_to_send()
Resets next message to send. Can be used in situation when we dont want to reply to a CALL message.
Definition: message_queue.hpp:574
void push_call_result(const json &call_result)
Sends a new call_result message over the websocket.
Definition: message_queue.hpp:649
void get_persisted_messages_from_db(bool ignore_security_event_notifications=false)
Gets all persisted messages of normal message queue and persisted message queue from the database.
Definition: message_queue.hpp:580
void update_message_timeout(const int timeout)
Set message_timeout to given timeout (in seconds)
Definition: message_queue.hpp:1003
EnhancedMessage< M > receive(std::string_view message)
Enhances a received json_message with additional meta information, checks if it is a valid CallResult...
Definition: message_queue.hpp:717
void push_call_error(CallError call_error)
Sends a new call_error message over the websocket.
Definition: message_queue.hpp:667
void update_transaction_message_attempts(const int transaction_message_attempts)
Set transaction_message_attempts to given transaction_message_attempts.
Definition: message_queue.hpp:993
Exception for errors during query execution.
Definition: database_exceptions.hpp:49
Contains a OCPP CallError message.
Definition: call_types.hpp:143
This contains an internal control message.
Definition: message_queue.hpp:65
std::promise< EnhancedMessage< M > > promise
A promise used by the async send interface.
Definition: message_queue.hpp:69
bool is_transaction_update_message() const
True for transactional messages containing updates (measurements) for a transaction.
MessageId uniqueId() const
Provides the unique message ID stored in the message.
Definition: message_queue.hpp:79
int32_t message_attempts
The number of times this message has been rejected by the central system.
Definition: message_queue.hpp:68
ControlMessage(const json &message, const bool stall_until_accepted=false)
Creates a new ControlMessage object from the provided message.
json::array_t message
The OCPP message as a json array.
Definition: message_queue.hpp:66
M messageType
The OCPP message type.
Definition: message_queue.hpp:67
DateTime timestamp
A timestamp that shows when this message can be sent.
Definition: message_queue.hpp:70
Contains a OCPP message in json form with additional information.
Definition: message_queue.hpp:54
json call_message
If the message is a CALLRESULT or CALLERROR this can contain the original CALL message.
Definition: message_queue.hpp:60
json message
The OCPP message as json.
Definition: message_queue.hpp:55
M messageType
The OCPP message type.
Definition: message_queue.hpp:58
MessageTypeId messageTypeId
The OCPP message type ID (CALL/CALLRESULT/CALLERROR)
Definition: message_queue.hpp:59
MessageId uniqueId
The unique ID of the json message.
Definition: message_queue.hpp:57
size_t message_size
size of the json message in bytes
Definition: message_queue.hpp:56
bool offline
A flag indicating if the connection to the central system is offline.
Definition: message_queue.hpp:61
Definition: message_queue.hpp:29
bool check_queue(const M &message_type)
Returns true if the given message_type shall be queued based on the configuration of queue_all_messag...
Definition: message_queue.hpp:48
Definition: database_handler_common.hpp:16