refact: add two-phase startup lifecycle#5943
refact: add two-phase startup lifecycle#5943zouyonghe wants to merge 2 commits intoAstrBotDevs:masterfrom
Conversation
Allow the dashboard to become available before plugin bootstrap completes and surface runtime readiness and failure states to API callers. Guard plugin-facing endpoints until runtime is ready and clean up provider and plugin runtime state safely across bootstrap failures, retries, stop, and restart flows.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the application's startup sequence to enhance user experience and system stability. By decoupling the initial core setup from the more extensive runtime initialization, the dashboard can launch much faster, providing immediate feedback. The changes also introduce comprehensive state management for the application's lifecycle, allowing for clear communication of its operational status and enabling more resilient error handling and recovery mechanisms. This ensures that the system behaves predictably even during complex startup or failure scenarios. Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Hey - 我发现了 1 个问题,并留下了一些高层次的反馈:
AstrBotCoreLifecycle中的生命周期状态机已经变得相当复杂(例如_wait_for_runtime_ready、_interrupt_runtime_bootstrap_waiters、_collect_runtime_bootstrap_task);可以考虑抽取一个小的辅助类,或者在类内部划分一个更清晰的私有区域,用来封装“runtime 启动(bootstrapping)”相关的逻辑,这样主生命周期类更易于理解,将来扩展时也不容易出错。- Runtime 就绪状态的防护目前分散在
guard_runtime_ready、guarded_srv_plug_route,以及StatRoute里的显式检查(例如get_start_time、get_stat);你可能会希望把这些集中成一个可复用的装饰器或 helper 模式,这样新的 dashboard 路由就可以通过「选择加入」来复用,而不用重复就绪检查和状态响应的接线逻辑。
给 AI Agent 的提示
Please address the comments from this code review:
## Overall Comments
- The lifecycle state machine in `AstrBotCoreLifecycle` has grown quite complex (e.g. `_wait_for_runtime_ready`, `_interrupt_runtime_bootstrap_waiters`, `_collect_runtime_bootstrap_task`); consider factoring out a small helper class or clearly separated private section to encapsulate runtime-bootstrapping concerns so the main lifecycle class is easier to reason about and less error-prone to extend.
- The runtime readiness guarding is currently split between `guard_runtime_ready`, `guarded_srv_plug_route`, and explicit checks in `StatRoute` (e.g. `get_start_time`, `get_stat`); you might want to centralize this into a single reusable decorator or helper pattern so new dashboard routes can opt-in without duplicating readiness checks and status response wiring.
## Individual Comments
### Comment 1
<location path="astrbot/core/core_lifecycle.py" line_range="59" />
<code_context>
+ RUNTIME_READY = "runtime_ready"
+
+
class AstrBotCoreLifecycle:
"""AstrBot 核心生命周期管理类, 负责管理 AstrBot 的启动、停止、重启等操作.
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the new lifecycle management by consolidating runtime state tracking, collapsing wait/shutdown logic, and inlining trivial guards so the core → runtime → start/stop flows are easier to follow.
The new lifecycle split is useful, but some complexity is self‑inflicted and can be reduced without losing behavior.
### 1. Consolidate runtime state & events
You currently have:
- `lifecycle_state` + `core_initialized` / `runtime_ready` / `runtime_failed`
- `runtime_bootstrap_task`
- `runtime_bootstrap_error`
- `runtime_ready_event`, `runtime_failed_event`
- `_runtime_wait_interrupted`
Many of these encode the same thing. You can simplify by:
- Treating `lifecycle_state` as the **single source of truth** for status.
- Keeping only one optional error field (`runtime_bootstrap_error`).
- Keeping a **single** event for “runtime ready or failed” that other waiters can use.
Example skeleton:
```python
# state:
self.lifecycle_state = LifecycleState.CREATED
self.runtime_bootstrap_task: asyncio.Task[None] | None = None
self.runtime_bootstrap_error: BaseException | None = None
self.runtime_done_event = asyncio.Event() # set when bootstrap finishes (success/fail)
self._runtime_wait_interrupted = False # keep if you really need explicit interrupt
```
Bootstrap sets state + error + the single event:
```python
async def bootstrap_runtime(self) -> None:
# ... preconditions ...
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
try:
# ... bootstrap work ...
self._set_lifecycle_state(LifecycleState.RUNTIME_READY)
except asyncio.CancelledError:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_done_event.set()
raise
except BaseException as exc:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.RUNTIME_FAILED)
self.runtime_bootstrap_error = exc
self.runtime_done_event.set()
raise
else:
self.runtime_done_event.set()
```
With that, you no longer need `runtime_ready_event` vs `runtime_failed_event`, nor `runtime_ready/runtime_failed` flags (they’re derivable from `lifecycle_state`).
### 2. Simplify `_wait_for_runtime_ready`
`_wait_for_runtime_ready` is much more complex than needed: it waits on three things (bootstrap task, ready event, failed event) and has a lot of branching.
If the bootstrap task is the only thing that mutates lifecycle state & error, you can just await that task, with a small wrapper to support concurrent waiters:
```python
async def _wait_for_runtime_ready(self) -> bool:
# fast path
if self.lifecycle_state is LifecycleState.RUNTIME_READY:
return True
if self.lifecycle_state is LifecycleState.RUNTIME_FAILED:
return False
if self._runtime_wait_interrupted:
return False
task = self.runtime_bootstrap_task
if task is None:
raise RuntimeError("runtime bootstrap task was not scheduled before start")
# wait for bootstrap completion
try:
await asyncio.shield(task)
except asyncio.CancelledError:
# propagate interruption; let caller decide
raise
# Consume result (avoid re-raising for later awaiters).
await self._consume_runtime_bootstrap_task(task)
if self._runtime_wait_interrupted:
return False
return self.lifecycle_state is LifecycleState.RUNTIME_READY
```
If you still need an event for other code (e.g. dashboard or other subsystems), let them `await self.runtime_done_event.wait()` and then inspect `lifecycle_state` / `runtime_bootstrap_error`. You don’t need to also race against separate ready/failed events inside `_wait_for_runtime_ready`.
This removes:
- The parallel `asyncio.wait` on bootstrap + 2 events.
- Many of the branches around `runtime_failed_event`, clearing it, etc.
- The multiple paths that selectively consume the bootstrap task.
### 3. Collapse micro‑helpers into higher level operations
There are many very small helpers (`_clear_runtime_failure_for_retry`, `_reset_runtime_bootstrap_state`, `_interrupt_runtime_bootstrap_waiters`, `_collect_runtime_bootstrap_task`, `_clear_runtime_artifacts`, etc.) used mostly in shutdown/restart paths.
You can make the main flows easier to follow by introducing 1–2 higher‑level operations and inlining some of these tiny helpers.
For example, factor shutdown semantics into a single method:
```python
async def _shutdown_runtime(self, reset_core_state: bool) -> None:
# interrupt waiters and cancel background work
self._runtime_wait_interrupted = True
tasks_to_wait = self._cancel_current_tasks()
if self.runtime_bootstrap_task:
tasks_to_wait.append(self.runtime_bootstrap_task)
self.runtime_bootstrap_task.cancel()
self.runtime_bootstrap_task = None
if self.cron_manager:
await self.cron_manager.shutdown()
if self.plugin_manager and self.plugin_manager.context:
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
except Exception as e:
logger.warning(traceback.format_exc())
logger.warning(
f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。",
)
if self.provider_manager:
await self.provider_manager.terminate()
if self.platform_manager:
await self.platform_manager.terminate()
if self.kb_manager:
await self.kb_manager.terminate()
if self.dashboard_shutdown_event:
self.dashboard_shutdown_event.set()
self._clear_runtime_artifacts()
for task in tasks_to_wait:
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"任务 {task.get_name()} 发生错误: {e}")
if reset_core_state:
self._set_lifecycle_state(LifecycleState.CREATED)
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
else:
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
```
Then `stop` and `restart` can become:
```python
async def stop(self) -> None:
if self.temp_dir_cleaner:
await self.temp_dir_cleaner.stop()
await self._shutdown_runtime(reset_core_state=True)
async def restart(self) -> None:
await self._shutdown_runtime(reset_core_state=False)
if self.astrbot_updator is None:
return
threading.Thread(
target=self.astrbot_updator._reboot,
name="restart",
daemon=True,
).start()
```
This removes duplicated sequencing and makes the overall lifecycle (`initialize_core` → `bootstrap_runtime` → `start` → `stop`/`restart`) much easier to trace.
### 4. Reduce `_require_*` indirection where safe
For internal methods like `load_pipeline_scheduler` / `reload_pipeline_scheduler`, the `_require_*` helpers add indirection without much extra safety, because:
- They are only called in lifecycle phases where `initialize_core` is already required.
- A simple explicit check at the top of each public method is enough.
You can inline the checks and use local variables to keep code readable:
```python
async def load_pipeline_scheduler(self) -> dict[str, PipelineScheduler]:
if self.astrbot_config_mgr is None or self.plugin_manager is None:
raise RuntimeError("initialize_core must complete before scheduler setup")
mapping: dict[str, PipelineScheduler] = {}
mgr = self.astrbot_config_mgr
plugin_manager = self.plugin_manager
for conf_id, ab_config in mgr.confs.items():
scheduler = PipelineScheduler(
PipelineContext(ab_config, plugin_manager, conf_id),
)
await scheduler.initialize()
mapping[conf_id] = scheduler
self.pipeline_scheduler_mapping = mapping
return mapping
```
Likewise for `reload_pipeline_scheduler` and `load_platform`. This reduces the “micro‑API” around lifecycle and keeps the main flows more direct.
---
All of these suggestions keep the new behavior (split core/runtime init, early dashboard, cancellation support) but:
- Reduce the number of mutable runtime state variables.
- Make `_wait_for_runtime_ready` easier to reason about.
- Centralize shutdown semantics.
- Flatten helper indirection where it doesn’t add much safety.
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English
Hey - I've found 1 issue, and left some high level feedback:
- The lifecycle state machine in
AstrBotCoreLifecyclehas grown quite complex (e.g._wait_for_runtime_ready,_interrupt_runtime_bootstrap_waiters,_collect_runtime_bootstrap_task); consider factoring out a small helper class or clearly separated private section to encapsulate runtime-bootstrapping concerns so the main lifecycle class is easier to reason about and less error-prone to extend. - The runtime readiness guarding is currently split between
guard_runtime_ready,guarded_srv_plug_route, and explicit checks inStatRoute(e.g.get_start_time,get_stat); you might want to centralize this into a single reusable decorator or helper pattern so new dashboard routes can opt-in without duplicating readiness checks and status response wiring.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The lifecycle state machine in `AstrBotCoreLifecycle` has grown quite complex (e.g. `_wait_for_runtime_ready`, `_interrupt_runtime_bootstrap_waiters`, `_collect_runtime_bootstrap_task`); consider factoring out a small helper class or clearly separated private section to encapsulate runtime-bootstrapping concerns so the main lifecycle class is easier to reason about and less error-prone to extend.
- The runtime readiness guarding is currently split between `guard_runtime_ready`, `guarded_srv_plug_route`, and explicit checks in `StatRoute` (e.g. `get_start_time`, `get_stat`); you might want to centralize this into a single reusable decorator or helper pattern so new dashboard routes can opt-in without duplicating readiness checks and status response wiring.
## Individual Comments
### Comment 1
<location path="astrbot/core/core_lifecycle.py" line_range="59" />
<code_context>
+ RUNTIME_READY = "runtime_ready"
+
+
class AstrBotCoreLifecycle:
"""AstrBot 核心生命周期管理类, 负责管理 AstrBot 的启动、停止、重启等操作.
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the new lifecycle management by consolidating runtime state tracking, collapsing wait/shutdown logic, and inlining trivial guards so the core → runtime → start/stop flows are easier to follow.
The new lifecycle split is useful, but some complexity is self‑inflicted and can be reduced without losing behavior.
### 1. Consolidate runtime state & events
You currently have:
- `lifecycle_state` + `core_initialized` / `runtime_ready` / `runtime_failed`
- `runtime_bootstrap_task`
- `runtime_bootstrap_error`
- `runtime_ready_event`, `runtime_failed_event`
- `_runtime_wait_interrupted`
Many of these encode the same thing. You can simplify by:
- Treating `lifecycle_state` as the **single source of truth** for status.
- Keeping only one optional error field (`runtime_bootstrap_error`).
- Keeping a **single** event for “runtime ready or failed” that other waiters can use.
Example skeleton:
```python
# state:
self.lifecycle_state = LifecycleState.CREATED
self.runtime_bootstrap_task: asyncio.Task[None] | None = None
self.runtime_bootstrap_error: BaseException | None = None
self.runtime_done_event = asyncio.Event() # set when bootstrap finishes (success/fail)
self._runtime_wait_interrupted = False # keep if you really need explicit interrupt
```
Bootstrap sets state + error + the single event:
```python
async def bootstrap_runtime(self) -> None:
# ... preconditions ...
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
try:
# ... bootstrap work ...
self._set_lifecycle_state(LifecycleState.RUNTIME_READY)
except asyncio.CancelledError:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_done_event.set()
raise
except BaseException as exc:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.RUNTIME_FAILED)
self.runtime_bootstrap_error = exc
self.runtime_done_event.set()
raise
else:
self.runtime_done_event.set()
```
With that, you no longer need `runtime_ready_event` vs `runtime_failed_event`, nor `runtime_ready/runtime_failed` flags (they’re derivable from `lifecycle_state`).
### 2. Simplify `_wait_for_runtime_ready`
`_wait_for_runtime_ready` is much more complex than needed: it waits on three things (bootstrap task, ready event, failed event) and has a lot of branching.
If the bootstrap task is the only thing that mutates lifecycle state & error, you can just await that task, with a small wrapper to support concurrent waiters:
```python
async def _wait_for_runtime_ready(self) -> bool:
# fast path
if self.lifecycle_state is LifecycleState.RUNTIME_READY:
return True
if self.lifecycle_state is LifecycleState.RUNTIME_FAILED:
return False
if self._runtime_wait_interrupted:
return False
task = self.runtime_bootstrap_task
if task is None:
raise RuntimeError("runtime bootstrap task was not scheduled before start")
# wait for bootstrap completion
try:
await asyncio.shield(task)
except asyncio.CancelledError:
# propagate interruption; let caller decide
raise
# Consume result (avoid re-raising for later awaiters).
await self._consume_runtime_bootstrap_task(task)
if self._runtime_wait_interrupted:
return False
return self.lifecycle_state is LifecycleState.RUNTIME_READY
```
If you still need an event for other code (e.g. dashboard or other subsystems), let them `await self.runtime_done_event.wait()` and then inspect `lifecycle_state` / `runtime_bootstrap_error`. You don’t need to also race against separate ready/failed events inside `_wait_for_runtime_ready`.
This removes:
- The parallel `asyncio.wait` on bootstrap + 2 events.
- Many of the branches around `runtime_failed_event`, clearing it, etc.
- The multiple paths that selectively consume the bootstrap task.
### 3. Collapse micro‑helpers into higher level operations
There are many very small helpers (`_clear_runtime_failure_for_retry`, `_reset_runtime_bootstrap_state`, `_interrupt_runtime_bootstrap_waiters`, `_collect_runtime_bootstrap_task`, `_clear_runtime_artifacts`, etc.) used mostly in shutdown/restart paths.
You can make the main flows easier to follow by introducing 1–2 higher‑level operations and inlining some of these tiny helpers.
For example, factor shutdown semantics into a single method:
```python
async def _shutdown_runtime(self, reset_core_state: bool) -> None:
# interrupt waiters and cancel background work
self._runtime_wait_interrupted = True
tasks_to_wait = self._cancel_current_tasks()
if self.runtime_bootstrap_task:
tasks_to_wait.append(self.runtime_bootstrap_task)
self.runtime_bootstrap_task.cancel()
self.runtime_bootstrap_task = None
if self.cron_manager:
await self.cron_manager.shutdown()
if self.plugin_manager and self.plugin_manager.context:
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
except Exception as e:
logger.warning(traceback.format_exc())
logger.warning(
f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。",
)
if self.provider_manager:
await self.provider_manager.terminate()
if self.platform_manager:
await self.platform_manager.terminate()
if self.kb_manager:
await self.kb_manager.terminate()
if self.dashboard_shutdown_event:
self.dashboard_shutdown_event.set()
self._clear_runtime_artifacts()
for task in tasks_to_wait:
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"任务 {task.get_name()} 发生错误: {e}")
if reset_core_state:
self._set_lifecycle_state(LifecycleState.CREATED)
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
else:
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
```
Then `stop` and `restart` can become:
```python
async def stop(self) -> None:
if self.temp_dir_cleaner:
await self.temp_dir_cleaner.stop()
await self._shutdown_runtime(reset_core_state=True)
async def restart(self) -> None:
await self._shutdown_runtime(reset_core_state=False)
if self.astrbot_updator is None:
return
threading.Thread(
target=self.astrbot_updator._reboot,
name="restart",
daemon=True,
).start()
```
This removes duplicated sequencing and makes the overall lifecycle (`initialize_core` → `bootstrap_runtime` → `start` → `stop`/`restart`) much easier to trace.
### 4. Reduce `_require_*` indirection where safe
For internal methods like `load_pipeline_scheduler` / `reload_pipeline_scheduler`, the `_require_*` helpers add indirection without much extra safety, because:
- They are only called in lifecycle phases where `initialize_core` is already required.
- A simple explicit check at the top of each public method is enough.
You can inline the checks and use local variables to keep code readable:
```python
async def load_pipeline_scheduler(self) -> dict[str, PipelineScheduler]:
if self.astrbot_config_mgr is None or self.plugin_manager is None:
raise RuntimeError("initialize_core must complete before scheduler setup")
mapping: dict[str, PipelineScheduler] = {}
mgr = self.astrbot_config_mgr
plugin_manager = self.plugin_manager
for conf_id, ab_config in mgr.confs.items():
scheduler = PipelineScheduler(
PipelineContext(ab_config, plugin_manager, conf_id),
)
await scheduler.initialize()
mapping[conf_id] = scheduler
self.pipeline_scheduler_mapping = mapping
return mapping
```
Likewise for `reload_pipeline_scheduler` and `load_platform`. This reduces the “micro‑API” around lifecycle and keeps the main flows more direct.
---
All of these suggestions keep the new behavior (split core/runtime init, early dashboard, cancellation support) but:
- Reduce the number of mutable runtime state variables.
- Make `_wait_for_runtime_ready` easier to reason about.
- Centralize shutdown semantics.
- Flatten helper indirection where it doesn’t add much safety.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| RUNTIME_READY = "runtime_ready" | ||
|
|
||
|
|
||
| class AstrBotCoreLifecycle: |
There was a problem hiding this comment.
issue (complexity): 建议通过合并 runtime 状态跟踪、收敛等待/关闭逻辑,并内联一些简单的 guard,来简化新的生命周期管理逻辑,从而让 core → runtime → start/stop 的流程更容易理解。
新的生命周期拆分本身是有价值的,但目前有一部分复杂度是“自造”的,在不改变行为的前提下是可以降低的。
1. 合并 runtime 状态和事件
当前你有:
lifecycle_state+core_initialized/runtime_ready/runtime_failedruntime_bootstrap_taskruntime_bootstrap_errorruntime_ready_event、runtime_failed_event_runtime_wait_interrupted
很多字段其实在表达同一类信息。可以考虑:
- 把
lifecycle_state作为状态的单一事实来源(single source of truth)。 - 只保留一个可选错误字段(
runtime_bootstrap_error)。 - 只保留单个事件,用来表示「runtime 已就绪或失败」,让其他等待方使用。
示例骨架:
# state:
self.lifecycle_state = LifecycleState.CREATED
self.runtime_bootstrap_task: asyncio.Task[None] | None = None
self.runtime_bootstrap_error: BaseException | None = None
self.runtime_done_event = asyncio.Event() # set when bootstrap finishes (success/fail)
self._runtime_wait_interrupted = False # keep if you really need explicit interruptBootstrap 设置 state + error + 单个事件:
async def bootstrap_runtime(self) -> None:
# ... preconditions ...
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
try:
# ... bootstrap work ...
self._set_lifecycle_state(LifecycleState.RUNTIME_READY)
except asyncio.CancelledError:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_done_event.set()
raise
except BaseException as exc:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.RUNTIME_FAILED)
self.runtime_bootstrap_error = exc
self.runtime_done_event.set()
raise
else:
self.runtime_done_event.set()这样就不再需要 runtime_ready_event 和 runtime_failed_event 的区分,也不再需要 runtime_ready/runtime_failed 这些布尔标记(可以从 lifecycle_state 推导)。
2. 简化 _wait_for_runtime_ready
_wait_for_runtime_ready 现在的实现比实际需求复杂很多:它在等待三个对象(bootstrap 任务、ready 事件、failed 事件),分支也很多。
如果只有 bootstrap 任务会改变生命周期状态和错误,那么你可以只等待这个任务,并加一个小的封装来支持并发等待者:
async def _wait_for_runtime_ready(self) -> bool:
# fast path
if self.lifecycle_state is LifecycleState.RUNTIME_READY:
return True
if self.lifecycle_state is LifecycleState.RUNTIME_FAILED:
return False
if self._runtime_wait_interrupted:
return False
task = self.runtime_bootstrap_task
if task is None:
raise RuntimeError("runtime bootstrap task was not scheduled before start")
# wait for bootstrap completion
try:
await asyncio.shield(task)
except asyncio.CancelledError:
# propagate interruption; let caller decide
raise
# Consume result (avoid re-raising for later awaiters).
await self._consume_runtime_bootstrap_task(task)
if self._runtime_wait_interrupted:
return False
return self.lifecycle_state is LifecycleState.RUNTIME_READY如果你仍然需要为其他代码(例如 dashboard 或其他子系统)提供事件,可以让它们 await self.runtime_done_event.wait(),然后检查 lifecycle_state / runtime_bootstrap_error。在 _wait_for_runtime_ready 内部就不需要再和 ready/failed 两个事件同时竞争了。
这样可以去掉:
- 对 bootstrap 任务和两个事件的并行
asyncio.wait。 - 与
runtime_failed_event相关的多处分支和清理逻辑。 - 多条会选择性消费 bootstrap 任务的路径。
3. 将微型 helper 折叠为更高层的操作
在关停和重启路径上,有很多非常小的 helper(如 _clear_runtime_failure_for_retry、_reset_runtime_bootstrap_state、_interrupt_runtime_bootstrap_waiters、_collect_runtime_bootstrap_task、_clear_runtime_artifacts 等)。
可以通过引入一两个更高层的操作,并在其中内联这些小 helper,让主流程更容易理解。
例如,把关停语义收敛到一个方法里:
async def _shutdown_runtime(self, reset_core_state: bool) -> None:
# interrupt waiters and cancel background work
self._runtime_wait_interrupted = True
tasks_to_wait = self._cancel_current_tasks()
if self.runtime_bootstrap_task:
tasks_to_wait.append(self.runtime_bootstrap_task)
self.runtime_bootstrap_task.cancel()
self.runtime_bootstrap_task = None
if self.cron_manager:
await self.cron_manager.shutdown()
if self.plugin_manager and self.plugin_manager.context:
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
except Exception as e:
logger.warning(traceback.format_exc())
logger.warning(
f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。",
)
if self.provider_manager:
await self.provider_manager.terminate()
if self.platform_manager:
await self.platform_manager.terminate()
if self.kb_manager:
await self.kb_manager.terminate()
if self.dashboard_shutdown_event:
self.dashboard_shutdown_event.set()
self._clear_runtime_artifacts()
for task in tasks_to_wait:
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"任务 {task.get_name()} 发生错误: {e}")
if reset_core_state:
self._set_lifecycle_state(LifecycleState.CREATED)
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
else:
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()这样 stop 和 restart 可以简化为:
async def stop(self) -> None:
if self.temp_dir_cleaner:
await self.temp_dir_cleaner.stop()
await self._shutdown_runtime(reset_core_state=True)
async def restart(self) -> None:
await self._shutdown_runtime(reset_core_state=False)
if self.astrbot_updator is None:
return
threading.Thread(
target=self.astrbot_updator._reboot,
name="restart",
daemon=True,
).start()这可以消除重复的顺序流程,使整体生命周期(initialize_core → bootstrap_runtime → start → stop/restart)更容易追踪。
4. 在安全的地方减少 _require_* 间接层
对于 load_pipeline_scheduler / reload_pipeline_scheduler 这类内部方法,_require_* helper 带来的间接层对安全性的提升有限,因为:
- 它们只在已经要求完成
initialize_core的生命周期阶段被调用。 - 在每个公开方法开头做一个简单的显式检查就足够了。
你可以把这些检查内联,并通过局部变量保持可读性:
async def load_pipeline_scheduler(self) -> dict[str, PipelineScheduler]:
if self.astrbot_config_mgr is None or self.plugin_manager is None:
raise RuntimeError("initialize_core must complete before scheduler setup")
mapping: dict[str, PipelineScheduler] = {}
mgr = self.astrbot_config_mgr
plugin_manager = self.plugin_manager
for conf_id, ab_config in mgr.confs.items():
scheduler = PipelineScheduler(
PipelineContext(ab_config, plugin_manager, conf_id),
)
await scheduler.initialize()
mapping[conf_id] = scheduler
self.pipeline_scheduler_mapping = mapping
return mappingreload_pipeline_scheduler 和 load_platform 同理。这样可以减少生命周期周边的「微型 API」,让主流程更加直接。
以上建议在保留现有行为(core/runtime 初始化拆分、dashboard 提前可用、支持取消)的前提下:
- 减少可变 runtime 状态变量的数量。
- 让
_wait_for_runtime_ready更容易推理。 - 将关停语义集中管理。
- 在安全的地方压平 helper 间接层。
Original comment in English
issue (complexity): Consider simplifying the new lifecycle management by consolidating runtime state tracking, collapsing wait/shutdown logic, and inlining trivial guards so the core → runtime → start/stop flows are easier to follow.
The new lifecycle split is useful, but some complexity is self‑inflicted and can be reduced without losing behavior.
1. Consolidate runtime state & events
You currently have:
lifecycle_state+core_initialized/runtime_ready/runtime_failedruntime_bootstrap_taskruntime_bootstrap_errorruntime_ready_event,runtime_failed_event_runtime_wait_interrupted
Many of these encode the same thing. You can simplify by:
- Treating
lifecycle_stateas the single source of truth for status. - Keeping only one optional error field (
runtime_bootstrap_error). - Keeping a single event for “runtime ready or failed” that other waiters can use.
Example skeleton:
# state:
self.lifecycle_state = LifecycleState.CREATED
self.runtime_bootstrap_task: asyncio.Task[None] | None = None
self.runtime_bootstrap_error: BaseException | None = None
self.runtime_done_event = asyncio.Event() # set when bootstrap finishes (success/fail)
self._runtime_wait_interrupted = False # keep if you really need explicit interruptBootstrap sets state + error + the single event:
async def bootstrap_runtime(self) -> None:
# ... preconditions ...
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
try:
# ... bootstrap work ...
self._set_lifecycle_state(LifecycleState.RUNTIME_READY)
except asyncio.CancelledError:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_done_event.set()
raise
except BaseException as exc:
await self._cleanup_partial_runtime_bootstrap()
self._set_lifecycle_state(LifecycleState.RUNTIME_FAILED)
self.runtime_bootstrap_error = exc
self.runtime_done_event.set()
raise
else:
self.runtime_done_event.set()With that, you no longer need runtime_ready_event vs runtime_failed_event, nor runtime_ready/runtime_failed flags (they’re derivable from lifecycle_state).
2. Simplify _wait_for_runtime_ready
_wait_for_runtime_ready is much more complex than needed: it waits on three things (bootstrap task, ready event, failed event) and has a lot of branching.
If the bootstrap task is the only thing that mutates lifecycle state & error, you can just await that task, with a small wrapper to support concurrent waiters:
async def _wait_for_runtime_ready(self) -> bool:
# fast path
if self.lifecycle_state is LifecycleState.RUNTIME_READY:
return True
if self.lifecycle_state is LifecycleState.RUNTIME_FAILED:
return False
if self._runtime_wait_interrupted:
return False
task = self.runtime_bootstrap_task
if task is None:
raise RuntimeError("runtime bootstrap task was not scheduled before start")
# wait for bootstrap completion
try:
await asyncio.shield(task)
except asyncio.CancelledError:
# propagate interruption; let caller decide
raise
# Consume result (avoid re-raising for later awaiters).
await self._consume_runtime_bootstrap_task(task)
if self._runtime_wait_interrupted:
return False
return self.lifecycle_state is LifecycleState.RUNTIME_READYIf you still need an event for other code (e.g. dashboard or other subsystems), let them await self.runtime_done_event.wait() and then inspect lifecycle_state / runtime_bootstrap_error. You don’t need to also race against separate ready/failed events inside _wait_for_runtime_ready.
This removes:
- The parallel
asyncio.waiton bootstrap + 2 events. - Many of the branches around
runtime_failed_event, clearing it, etc. - The multiple paths that selectively consume the bootstrap task.
3. Collapse micro‑helpers into higher level operations
There are many very small helpers (_clear_runtime_failure_for_retry, _reset_runtime_bootstrap_state, _interrupt_runtime_bootstrap_waiters, _collect_runtime_bootstrap_task, _clear_runtime_artifacts, etc.) used mostly in shutdown/restart paths.
You can make the main flows easier to follow by introducing 1–2 higher‑level operations and inlining some of these tiny helpers.
For example, factor shutdown semantics into a single method:
async def _shutdown_runtime(self, reset_core_state: bool) -> None:
# interrupt waiters and cancel background work
self._runtime_wait_interrupted = True
tasks_to_wait = self._cancel_current_tasks()
if self.runtime_bootstrap_task:
tasks_to_wait.append(self.runtime_bootstrap_task)
self.runtime_bootstrap_task.cancel()
self.runtime_bootstrap_task = None
if self.cron_manager:
await self.cron_manager.shutdown()
if self.plugin_manager and self.plugin_manager.context:
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
except Exception as e:
logger.warning(traceback.format_exc())
logger.warning(
f"插件 {plugin.name} 未被正常终止 {e!s}, 可能会导致资源泄露等问题。",
)
if self.provider_manager:
await self.provider_manager.terminate()
if self.platform_manager:
await self.platform_manager.terminate()
if self.kb_manager:
await self.kb_manager.terminate()
if self.dashboard_shutdown_event:
self.dashboard_shutdown_event.set()
self._clear_runtime_artifacts()
for task in tasks_to_wait:
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"任务 {task.get_name()} 发生错误: {e}")
if reset_core_state:
self._set_lifecycle_state(LifecycleState.CREATED)
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()
else:
self._set_lifecycle_state(LifecycleState.CORE_READY)
self.runtime_bootstrap_error = None
self.runtime_done_event.clear()Then stop and restart can become:
async def stop(self) -> None:
if self.temp_dir_cleaner:
await self.temp_dir_cleaner.stop()
await self._shutdown_runtime(reset_core_state=True)
async def restart(self) -> None:
await self._shutdown_runtime(reset_core_state=False)
if self.astrbot_updator is None:
return
threading.Thread(
target=self.astrbot_updator._reboot,
name="restart",
daemon=True,
).start()This removes duplicated sequencing and makes the overall lifecycle (initialize_core → bootstrap_runtime → start → stop/restart) much easier to trace.
4. Reduce _require_* indirection where safe
For internal methods like load_pipeline_scheduler / reload_pipeline_scheduler, the _require_* helpers add indirection without much extra safety, because:
- They are only called in lifecycle phases where
initialize_coreis already required. - A simple explicit check at the top of each public method is enough.
You can inline the checks and use local variables to keep code readable:
async def load_pipeline_scheduler(self) -> dict[str, PipelineScheduler]:
if self.astrbot_config_mgr is None or self.plugin_manager is None:
raise RuntimeError("initialize_core must complete before scheduler setup")
mapping: dict[str, PipelineScheduler] = {}
mgr = self.astrbot_config_mgr
plugin_manager = self.plugin_manager
for conf_id, ab_config in mgr.confs.items():
scheduler = PipelineScheduler(
PipelineContext(ab_config, plugin_manager, conf_id),
)
await scheduler.initialize()
mapping[conf_id] = scheduler
self.pipeline_scheduler_mapping = mapping
return mappingLikewise for reload_pipeline_scheduler and load_platform. This reduces the “micro‑API” around lifecycle and keeps the main flows more direct.
All of these suggestions keep the new behavior (split core/runtime init, early dashboard, cancellation support) but:
- Reduce the number of mutable runtime state variables.
- Make
_wait_for_runtime_readyeasier to reason about. - Centralize shutdown semantics.
- Flatten helper indirection where it doesn’t add much safety.
There was a problem hiding this comment.
Code Review
This pull request significantly refactors the AstrBot core lifecycle management to introduce a more granular and robust startup process. The main change involves splitting the initialize method into initialize_core (a fast core phase) and bootstrap_runtime (a deferred, asynchronous runtime initialization). A new LifecycleState enum is introduced to track the bot's state (CREATED, CORE_READY, RUNTIME_FAILED, RUNTIME_READY), along with numerous new instance variables and helper methods in AstrBotCoreLifecycle to manage these states, handle runtime bootstrap tasks, and clean up resources during failures or shutdowns. Dashboard endpoints are now guarded to check the runtime's readiness, returning a 503 error with status details if the runtime is still loading or has failed. The InitialLoader is updated to schedule the bootstrap_runtime as a background task. Provider management and plugin context registration are also enhanced with explicit cleanup and reset mechanisms to prevent stale state, especially important for retry scenarios after bootstrap failures. Extensive unit and integration tests have been added or modified to cover these new lifecycle states, asynchronous operations, and error handling, ensuring the system behaves correctly during various startup, shutdown, and restart scenarios.
Continue terminating remaining providers and disable MCP servers even if one provider terminate hook fails. Also add InitialLoader failure-path coverage and extract guarded plugin routes into a shared constant for easier review and maintenance.
Summary
Test Plan
uv run ruff format .uv run ruff check .uv run pytest tests/unit/test_core_lifecycle.py tests/unit/test_initial_loader.py tests/test_dashboard.py -vDetails
This change reduces perceived startup latency by letting AstrBot finish a minimal core setup before the full plugin/runtime bootstrap completes. The dashboard can start earlier, while runtime initialization continues in the background and
AstrBotCoreLifecycle.start()waits until bootstrap is ready before loading runtime tasks and startup hooks.To make that safe, the lifecycle now tracks explicit runtime readiness and failure signals. Dashboard routes can inspect that state through
/api/stat/runtime-status, and plugin-facing routes now return a clear 503 payload instead of misleading partial data or missing-route behavior when runtime is still loading.Failure handling was tightened as well. A failed runtime bootstrap now transitions into an explicit failed state, cleans up partially initialized runtime artifacts, and supports retry without leaking provider instances, plugin web APIs, or plugin runtime tasks across attempts. Public unauthenticated start-time polling still works, but it no longer exposes raw internal bootstrap error details.
Summary by Sourcery
引入一个由“核心初始化阶段”和“运行时启动阶段”组成的双阶段启动生命周期,并显式跟踪运行时就绪/失败状态,同时据此保护仪表盘/插件 API 以及清理流程。
New Features:
/api/stat/runtime-status接口以及标准化的错误返回格式,以便向仪表盘和调用方报告运行时就绪状态和失败信息。start()会在加载任务和钩子之前等待运行时就绪。Bug Fixes:
Enhancements:
ProviderManager的终止逻辑,以确定性地收集并关闭所有 provider 实例,并清空当前实例状态。Tests:
InitialLoader添加测试,用于验证后台运行时启动的调度以及仪表盘启动顺序。Original summary in English
Summary by Sourcery
Introduce a two-phase core and runtime startup lifecycle with explicit runtime readiness/failure tracking, and guard dashboard/plugin APIs and cleanup flows accordingly.
New Features:
Bug Fixes:
Enhancements:
Tests: