@@ -27,35 +27,39 @@ thread_timer::~thread_timer() {
2727
2828std::shared_ptr<thread_timer> thread_timer::start (std::function<void (ignite_error&&)> error_handler) {
2929 std::shared_ptr<thread_timer> res{new thread_timer ()};
30- res->m_thread = std::thread ([&self = * res, error_handler = std::move (error_handler)]() {
31- std::unique_lock<std::mutex> lock (self. m_mutex );
30+ res->m_thread = std::thread ([state = res-> m_state , error_handler = std::move (error_handler)]() {
31+ std::unique_lock<std::mutex> lock (state-> m_mutex );
3232 while (true ) {
33- if (self. m_stopping ) {
34- self. m_condition .notify_one ();
33+ if (state-> m_stopping ) {
34+ state-> m_condition .notify_one ();
3535 return ;
3636 }
3737
38- if (self. m_events .empty ()) {
39- self. m_condition .wait (lock);
38+ if (state-> m_events .empty ()) {
39+ state-> m_condition .wait (lock);
4040 continue ;
4141 }
4242
43- auto nearest_event_ts = self. m_events .top ().timestamp ;
43+ auto nearest_event_ts = state-> m_events .top ().timestamp ;
4444 auto now = std::chrono::steady_clock::now ();
4545 if (nearest_event_ts < now) {
46- auto func = self. m_events .top ().callback ;
47- self. m_events .pop ();
46+ auto func = state-> m_events .top ().callback ;
47+ state-> m_events .pop ();
4848
4949 lock.unlock ();
5050
51+ // NOTE: invoking func may destroy the thread_timer object (e.g. when the last
52+ // shared_ptr<node_connection> held by the callback is released, triggering
53+ // ~node_connection -> ~thread_timer -> stop()). The timer_state shared_ptr
54+ // captured by this lambda keeps state alive across that destruction.
5155 auto res = result_of_operation (func);
5256 if (res.has_error ()) {
5357 error_handler (res.error ());
5458 }
5559
5660 lock.lock ();
5761 } else {
58- self. m_condition .wait_until (lock, nearest_event_ts);
62+ state-> m_condition .wait_until (lock, nearest_event_ts);
5963 }
6064 }
6165 });
@@ -64,20 +68,29 @@ std::shared_ptr<thread_timer> thread_timer::start(std::function<void(ignite_erro
6468
6569void thread_timer::stop () {
6670 {
67- std::unique_lock<std::mutex> lock (m_mutex);
68- if (m_stopping)
71+ std::unique_lock<std::mutex> lock (m_state-> m_mutex );
72+ if (m_state-> m_stopping )
6973 return ;
7074
71- m_stopping = true ;
72- m_condition.notify_one ();
75+ m_state->m_stopping = true ;
76+ m_state->m_condition .notify_one ();
77+ }
78+
79+ if (std::this_thread::get_id () == m_thread.get_id ()) {
80+ // Called from within a timer callback. Joining the current thread would deadlock, so
81+ // detach instead. The timer loop will see m_stopping == true on its next iteration and
82+ // exit cleanly. The timer_state shared_ptr held by the thread lambda keeps the state
83+ // (mutex, condition variable, event queue) alive until the thread actually terminates.
84+ m_thread.detach ();
85+ } else {
86+ m_thread.join ();
7387 }
74- m_thread.join ();
7588}
7689
7790void thread_timer::add (std::chrono::milliseconds timeout, std::function<void ()> callback) {
78- std::lock_guard<std::mutex> lock (m_mutex);
79- m_events.emplace (std::chrono::steady_clock::now () + timeout, std::move (callback));
80- m_condition.notify_one ();
91+ std::lock_guard<std::mutex> lock (m_state-> m_mutex );
92+ m_state-> m_events .emplace (std::chrono::steady_clock::now () + timeout, std::move (callback));
93+ m_state-> m_condition .notify_one ();
8194}
8295
8396} // namespace ignite::detail
0 commit comments