ocpp 0.24.1
A C++ implementation of the Open Charge Point Protocol
safe_queue.hpp
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2020 - 2024 Pionix GmbH and Contributors to EVerest
3
4#pragma once
5
6#include <condition_variable>
7#include <mutex>
8#include <queue>
9
10namespace ocpp {
11
15template <typename T> class SafeQueue {
16public:
18 inline bool empty() const {
19 std::lock_guard lock(mutex);
20 return queue.empty();
21 }
22
25 inline T front() {
26 std::lock_guard lock(mutex);
27 return queue.front();
28 }
29
31 inline T pop() {
32 std::unique_lock<std::mutex> lock(mutex);
33
34 T front = std::move(queue.front());
35 queue.pop();
36
37 // Unlock here and notify
38 lock.unlock();
39
41
42 return front;
43 }
44
46 inline void push(T&& value) {
47 {
48 std::lock_guard<std::mutex> lock(mutex);
49 queue.push(value);
50 }
51
53 }
54
56 inline void push(const T& value) {
57 {
58 std::lock_guard<std::mutex> lock(mutex);
59 queue.push(value);
60 }
61
63 }
64
66 inline void clear() {
67 {
68 std::lock_guard<std::mutex> lock(mutex);
69
70 std::queue<T> empty;
71 empty.swap(queue);
72 }
73
75 }
76
79 inline void wait_on_queue_element(std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
80 wait_on_queue_element_or_predicate([]() { return false; }, timeout);
81 }
82
84 template <class Predicate>
85 inline void wait_on_queue_element_or_predicate(Predicate pred,
86 std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
87 std::unique_lock<std::mutex> lock(mutex);
88
89 if (timeout.count() > 0) {
90 cv.wait_for(lock, timeout, [&]() { return (false == queue.empty()) or pred(); });
91 } else {
92 cv.wait(lock, [&]() { return (false == queue.empty()) or pred(); });
93 }
94 }
95
98 template <class Predicate>
99 inline void wait_on_custom_event(Predicate pred, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
100 std::unique_lock<std::mutex> lock(mutex);
101
102 if (timeout.count() > 0) {
103 cv.wait_for(lock, timeout, [&]() { return pred(); });
104 } else {
105 cv.wait(lock, [&]() { return pred(); });
106 }
107 }
108
110 inline void notify_waiting_thread() {
111 cv.notify_one();
112 }
113
114private:
115 std::queue<T> queue;
116
117 mutable std::mutex mutex;
118 std::condition_variable cv;
119};
120
121} // namespace ocpp
Thread safe message queue. Holds a conditional variable that can be waited upon. Will take up the wai...
Definition: safe_queue.hpp:15
void wait_on_custom_event(Predicate pred, std::chrono::milliseconds timeout=std::chrono::milliseconds(0))
Waits on the queue for a custom event.
Definition: safe_queue.hpp:99
void notify_waiting_thread()
Notifies a single waiting thread to wake up.
Definition: safe_queue.hpp:110
void clear()
Clears the queue.
Definition: safe_queue.hpp:66
void push(T &&value)
Queues an element and notifies any threads waiting on the internal conditional variable.
Definition: safe_queue.hpp:46
T front()
We return a copy here, since while might be accessing the reference while another thread uses pop and...
Definition: safe_queue.hpp:25
T pop()
Definition: safe_queue.hpp:31
void wait_on_queue_element_or_predicate(Predicate pred, std::chrono::milliseconds timeout=std::chrono::milliseconds(0))
Same as 'wait_on_queue' but receives an additional predicate to wait upon.
Definition: safe_queue.hpp:85
bool empty() const
Definition: safe_queue.hpp:18
void wait_on_queue_element(std::chrono::milliseconds timeout=std::chrono::milliseconds(0))
Waits for the queue to receive an element.
Definition: safe_queue.hpp:79
void push(const T &value)
Queues an element and notifies any threads waiting on the internal conditional variable.
Definition: safe_queue.hpp:56