-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Retain strong refs to update-processing tasks in AsyncTeleBot #2588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,6 +193,10 @@ def __init__(self, token: str, parse_mode: Optional[str]=None, offset: Optional[ | |
|
|
||
| self._user = None # set during polling | ||
| self._polling = None | ||
| # Strong references to background tasks created via asyncio.create_task(). | ||
| # asyncio only keeps weak references, so unreferenced tasks can be GC'd | ||
| # mid-execution; see https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task | ||
| self._pending_tasks: set = set() | ||
| self.webhook_listener = None | ||
|
|
||
| if validate_token: | ||
|
|
@@ -456,8 +460,10 @@ async def _process_polling(self, non_stop: bool=False, interval: int=0, timeout: | |
| updates = await self.get_updates(offset=self.offset, allowed_updates=allowed_updates, timeout=timeout, request_timeout=request_timeout) | ||
| if updates: | ||
| self.offset = updates[-1].update_id + 1 | ||
| # noinspection PyAsyncCall | ||
| asyncio.create_task(self.process_new_updates(updates)) # Seperate task for processing updates | ||
| # Retain a strong reference so the task isn't GC'd mid-execution. | ||
| task = asyncio.create_task(self.process_new_updates(updates)) | ||
| self._pending_tasks.add(task) | ||
| task.add_done_callback(self._pending_tasks.discard) | ||
|
Comment on lines
+463
to
+466
|
||
| if interval: await asyncio.sleep(interval) | ||
| error_interval = 0.25 # drop error_interval if no errors | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,75 @@ | ||||||
| # -*- coding: utf-8 -*- | ||||||
| """Unit tests for `telebot.async_telebot.AsyncTeleBot`. | ||||||
|
|
||||||
| These tests are self-contained (no TOKEN/CHAT_ID required) and stub out all | ||||||
| network I/O. | ||||||
| """ | ||||||
| import asyncio | ||||||
|
|
||||||
| from telebot import types | ||||||
| from telebot.async_telebot import AsyncTeleBot | ||||||
|
|
||||||
|
|
||||||
| def _make_fake_me() -> types.User: | ||||||
| return types.User.de_json({ | ||||||
| "id": 1, | ||||||
| "is_bot": True, | ||||||
| "first_name": "Test", | ||||||
| "username": "test_bot", | ||||||
| }) | ||||||
|
|
||||||
|
|
||||||
| def test_process_polling_retains_update_processing_tasks(): | ||||||
| """Regression test for issue #2572. | ||||||
|
|
||||||
| Tasks fired by `_process_polling` for `process_new_updates` must be held | ||||||
| in `self._pending_tasks` while running and discarded on completion, so | ||||||
| they cannot be garbage-collected mid-execution. | ||||||
| """ | ||||||
| bot = AsyncTeleBot("1:fake", validate_token=False) | ||||||
|
|
||||||
| task_was_tracked_during_run: list[bool] = [] | ||||||
| process_completed = asyncio.Event() | ||||||
|
|
||||||
| async def fake_process_new_updates(updates): | ||||||
| current = asyncio.current_task() | ||||||
| task_was_tracked_during_run.append(current in bot._pending_tasks) | ||||||
| process_completed.set() | ||||||
|
|
||||||
| async def fake_get_me(): | ||||||
| return _make_fake_me() | ||||||
|
|
||||||
| # Deliver a single update batch, then stop polling on the next tick. | ||||||
| fake_update = types.Update.de_json({"update_id": 1}) | ||||||
| call_count = {"n": 0} | ||||||
|
|
||||||
| async def fake_get_updates(*args, **kwargs): | ||||||
| call_count["n"] += 1 | ||||||
| if call_count["n"] == 1: | ||||||
| return [fake_update] | ||||||
| bot._polling = False | ||||||
| return [] | ||||||
|
|
||||||
| async def noop(): | ||||||
| return None | ||||||
|
|
||||||
| bot.get_me = fake_get_me | ||||||
| bot.get_updates = fake_get_updates | ||||||
| bot.process_new_updates = fake_process_new_updates | ||||||
| bot.close_session = noop # stub: no real aiohttp session in tests | ||||||
|
|
||||||
| async def driver(): | ||||||
| await bot._process_polling(non_stop=True, interval=0, timeout=0) | ||||||
| # Allow the fire-and-forget task to finish plus one yield for the | ||||||
| # add_done_callback discard to run. | ||||||
| await process_completed.wait() | ||||||
|
||||||
| await process_completed.wait() | |
| await asyncio.wait_for(process_completed.wait(), timeout=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, applied in the latest push (ea13186). The guard now wraps the wait in asyncio.wait_for(..., timeout=1) so a miswired stub can't hang the test run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._pending_tasksis annotated as a bareset, which loses the element type in this typed package. Consider using a parameterized type likeset[asyncio.Task](orset[asyncio.Task[None]]if appropriate) so static type checkers and IDEs can reason about what’s stored here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied in the latest push as
set[asyncio.Task[Any]]. KeptAnyrather thanNonesinceprocess_new_updatesreturnsNonetoday but the set is general enough that a future caller might hand it a task with a different return.