From fec0715c402b17c12677bce8a2be381a2d473c73 Mon Sep 17 00:00:00 2001 From: whwyy Date: Sun, 28 Jun 2026 17:51:05 +0800 Subject: [PATCH] eee --- src/Annotate/DeferRegistry.php | 135 +++++++++++++++++++++++++++++---- src/Handler.php | 112 +++++++++++++++++++++++---- 2 files changed, 217 insertions(+), 30 deletions(-) diff --git a/src/Annotate/DeferRegistry.php b/src/Annotate/DeferRegistry.php index fb846f7..ef65bfa 100644 --- a/src/Annotate/DeferRegistry.php +++ b/src/Annotate/DeferRegistry.php @@ -3,6 +3,10 @@ declare(strict_types=1); namespace Kiri\Router\Annotate; +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; +use Swoole\Coroutine; + class DeferRegistry { @@ -71,9 +75,11 @@ class DeferRegistry /** - * @param string $class - * @param string $method - * @return void + * 异步执行 Defer 回调 — 不阻塞父方法返回值 + * 在子协程中执行所有已注册的回调,并注入 request/response 上下文 + * + * @param string $class 原始类名 + * @param string $method 原始方法名 */ public static function execute(string $class, string $method): void { @@ -85,22 +91,119 @@ class DeferRegistry $defers = self::$registry[$key]; unset(self::$registry[$key]); - foreach ($defers as $defer) { - try { - $callback = $defer->callback; - $params = $defer->params; + // 在创建子协程前捕获父协程的 request/response 上下文 + // 子协程的 Coroutine Context 是空的,不继承父协程 + $request = self::captureRequest(); + $response = self::captureResponse(); - if (is_array($callback)) { - [$cbClass, $cbMethod] = $callback; - $instance = \Kiri::getDi()->get($cbClass); - call_user_func([$instance, $cbMethod], ...$params); - } else { - $instance = \Kiri::getDi()->get($callback); - call_user_func([$instance, '__invoke'], ...$params); + // 非协程环境直接同步执行,避免协程永不调度导致回调丢失 + if (Coroutine::getCid() <= 0) { + foreach ($defers as $defer) { + try { + self::invokeSingleDefer($defer, $request, $response); + } catch (\Throwable $throwable) { + \Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage()); } - } catch (\Throwable $throwable) { - \Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage()); } + return; + } + + Coroutine::create(function () use ($defers, $request, $response) { + foreach ($defers as $defer) { + try { + self::invokeSingleDefer($defer, $request, $response); + } catch (\Throwable $throwable) { + \Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage()); + } + } + }); + } + + + /** + * 调用单个 Defer 回调 + * + * @param Defer $defer 延迟回调配置 + * @param ServerRequestInterface|null $request 请求上下文 + * @param ResponseInterface|null $response 响应上下文 + */ + private static function invokeSingleDefer(Defer $defer, ?ServerRequestInterface $request, ?ResponseInterface $response): void + { + $callback = $defer->callback; + $params = $defer->params; + + if (is_array($callback)) { + [$class, $method] = $callback; + $instance = \Kiri::getDi()->get($class); + self::injectContext($instance, $request, $response); + call_user_func([$instance, $method], ...$params); + } else { + $instance = \Kiri::getDi()->get($callback); + self::injectContext($instance, $request, $response); + call_user_func([$instance, '__invoke'], ...$params); + } + } + + + /** + * 捕获当前协程的 request,不抛出异常 + */ + private static function captureRequest(): ?ServerRequestInterface + { + try { + return \request(); + } catch (\Throwable) { + return null; + } + } + + + /** + * 捕获当前协程的 response,不抛出异常 + */ + private static function captureResponse(): ?ResponseInterface + { + try { + return \response(); + } catch (\Throwable) { + return null; + } + } + + + /** + * 将 request/response 注入到回调实例 + * 已有 #[Container] 注入的属性会被覆盖(子协程中 DI 注入的是错误上下文) + * + * @param object $instance 回调类实例 + * @param ServerRequestInterface|null $request 请求上下文 + * @param ResponseInterface|null $response 响应上下文 + */ + private static function injectContext( + object $instance, + ?ServerRequestInterface $request, + ?ResponseInterface $response + ): void { + try { + $reflect = new \ReflectionClass($instance); + + if ($request !== null && $reflect->hasProperty('request')) { + $prop = $reflect->getProperty('request'); + if ($prop->isPublic() || $prop->isProtected()) { +// $prop->setAccessible(true); + $prop->setValue($instance, $request); + } + } + + if ($response !== null && $reflect->hasProperty('response')) { + $prop = $reflect->getProperty('response'); + if ($prop->isPublic() || $prop->isProtected()) { +// $prop->setAccessible(true); + $prop->setValue($instance, $response); + } + } + } catch (\Throwable) { + // 注入失败不影响主流程 } } diff --git a/src/Handler.php b/src/Handler.php index 3cb12f7..ad2d9d7 100644 --- a/src/Handler.php +++ b/src/Handler.php @@ -12,6 +12,7 @@ use Kiri\Router\Format\NoBody; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Server\RequestHandlerInterface; +use Swoole\Coroutine; class Handler implements RequestHandlerInterface { @@ -196,26 +197,109 @@ class Handler implements RequestHandlerInterface /** - * @return void + * 异步执行 Defer 回调 — 不阻塞客户端响应 + * 提前捕获当前协程的 request/response,新建协程后注入到回调实例 */ private function executeDeferred(): void { - foreach ($this->deferred as $defer) { - try { - $callback = $defer->callback; - $params = $defer->params; + if (empty($this->deferred)) { + return; + } - if (is_array($callback)) { - [$class, $method] = $callback; - $instance = Kiri::getDi()->get($class); - call_user_func([$instance, $method], ...$params); - } else { - $instance = Kiri::getDi()->get($callback); - call_user_func([$instance, '__invoke'], ...$params); + $defers = $this->deferred; + $this->deferred = []; + + // 在创建子协程前捕获父协程的 request/response 上下文 + // 子协程的 Coroutine Context 是空的,不继承父协程 + $request = \request(); + $response = \response(); + + // 非协程环境直接同步执行,避免协程永不调度导致回调丢失 + if (Coroutine::getCid() <= 0) { + foreach ($defers as $defer) { + try { + self::invokeDeferCallback($defer, $request, $response); + } catch (\Throwable $throwable) { + \Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage()); } - } catch (\Throwable $throwable) { - \Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage()); } + return; + } + + Coroutine::create(function () use ($defers, $request, $response) { + foreach ($defers as $defer) { + try { + self::invokeDeferCallback($defer, $request, $response); + } catch (\Throwable $throwable) { + \Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage()); + } + } + }); + } + + + /** + * 调用单个 Defer 回调,自动注入 request/response 到回调实例 + * + * @param Defer $defer 延迟回调配置 + * @param ServerRequestInterface $request 当前请求 + * @param ResponseInterface $response 当前响应 + */ + private static function invokeDeferCallback( + Defer $defer, + ServerRequestInterface $request, + ResponseInterface $response + ): void { + $callback = $defer->callback; + $params = $defer->params; + + if (is_array($callback)) { + [$class, $method] = $callback; + $instance = Kiri::getDi()->get($class); + self::injectContextToInstance($instance, $request, $response); + call_user_func([$instance, $method], ...$params); + } else { + $instance = Kiri::getDi()->get($callback); + self::injectContextToInstance($instance, $request, $response); + call_user_func([$instance, '__invoke'], ...$params); + } + } + + + /** + * 将 request/response 注入到回调实例 + * 通过反射为实例设置 request 和 response 属性 + * 已有 #[Container] 注入的属性会被覆盖(子协程中 DI 注入的是错误上下文) + * + * @param object $instance 回调类实例 + * @param ServerRequestInterface $request 当前请求 + * @param ResponseInterface $response 当前响应 + */ + private static function injectContextToInstance( + object $instance, + ServerRequestInterface $request, + ResponseInterface $response + ): void { + try { + $reflect = new \ReflectionClass($instance); + + if ($reflect->hasProperty('request')) { + $prop = $reflect->getProperty('request'); + if ($prop->isPublic() || $prop->isProtected()) { + $prop->setAccessible(true); + $prop->setValue($instance, $request); + } + } + + if ($reflect->hasProperty('response')) { + $prop = $reflect->getProperty('response'); + if ($prop->isPublic() || $prop->isProtected()) { + $prop->setAccessible(true); + $prop->setValue($instance, $response); + } + } + } catch (\Throwable) { + // 注入失败不影响主流程 } }