This commit is contained in:
2026-06-28 17:51:05 +08:00
parent 2239a34681
commit fec0715c40
2 changed files with 217 additions and 30 deletions
+119 -16
View File
@@ -3,6 +3,10 @@ declare(strict_types=1);
namespace Kiri\Router\Annotate; namespace Kiri\Router\Annotate;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoole\Coroutine;
class DeferRegistry class DeferRegistry
{ {
@@ -71,9 +75,11 @@ class DeferRegistry
/** /**
* @param string $class * 异步执行 Defer 回调 — 不阻塞父方法返回值
* @param string $method * 在子协程中执行所有已注册的回调,并注入 request/response 上下文
* @return void *
* @param string $class 原始类名
* @param string $method 原始方法名
*/ */
public static function execute(string $class, string $method): void public static function execute(string $class, string $method): void
{ {
@@ -85,22 +91,119 @@ class DeferRegistry
$defers = self::$registry[$key]; $defers = self::$registry[$key];
unset(self::$registry[$key]); unset(self::$registry[$key]);
foreach ($defers as $defer) { // 在创建子协程前捕获父协程的 request/response 上下文
try { // 子协程的 Coroutine Context 是空的,不继承父协程
$callback = $defer->callback; $request = self::captureRequest();
$params = $defer->params; $response = self::captureResponse();
if (is_array($callback)) { // 非协程环境直接同步执行,避免协程永不调度导致回调丢失
[$cbClass, $cbMethod] = $callback; if (Coroutine::getCid() <= 0) {
$instance = \Kiri::getDi()->get($cbClass); foreach ($defers as $defer) {
call_user_func([$instance, $cbMethod], ...$params); try {
} else { self::invokeSingleDefer($defer, $request, $response);
$instance = \Kiri::getDi()->get($callback); } catch (\Throwable $throwable) {
call_user_func([$instance, '__invoke'], ...$params); \Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage());
} }
} catch (\Throwable $throwable) {
\Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage());
} }
return;
}
Coroutine::create(function () use ($defers, $request, $response) {
foreach ($defers as $defer) {
try {
self::invokeSingleDefer($defer, $request, $response);
} catch (\Throwable $throwable) {
\Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage());
}
}
});
}
/**
* 调用单个 Defer 回调
*
* @param Defer $defer 延迟回调配置
* @param ServerRequestInterface|null $request 请求上下文
* @param ResponseInterface|null $response 响应上下文
*/
private static function invokeSingleDefer(Defer $defer, ?ServerRequestInterface $request, ?ResponseInterface $response): void
{
$callback = $defer->callback;
$params = $defer->params;
if (is_array($callback)) {
[$class, $method] = $callback;
$instance = \Kiri::getDi()->get($class);
self::injectContext($instance, $request, $response);
call_user_func([$instance, $method], ...$params);
} else {
$instance = \Kiri::getDi()->get($callback);
self::injectContext($instance, $request, $response);
call_user_func([$instance, '__invoke'], ...$params);
}
}
/**
* 捕获当前协程的 request,不抛出异常
*/
private static function captureRequest(): ?ServerRequestInterface
{
try {
return \request();
} catch (\Throwable) {
return null;
}
}
/**
* 捕获当前协程的 response,不抛出异常
*/
private static function captureResponse(): ?ResponseInterface
{
try {
return \response();
} catch (\Throwable) {
return null;
}
}
/**
* 将 request/response 注入到回调实例
* 已有 #[Container] 注入的属性会被覆盖(子协程中 DI 注入的是错误上下文)
*
* @param object $instance 回调类实例
* @param ServerRequestInterface|null $request 请求上下文
* @param ResponseInterface|null $response 响应上下文
*/
private static function injectContext(
object $instance,
?ServerRequestInterface $request,
?ResponseInterface $response
): void {
try {
$reflect = new \ReflectionClass($instance);
if ($request !== null && $reflect->hasProperty('request')) {
$prop = $reflect->getProperty('request');
if ($prop->isPublic() || $prop->isProtected()) {
// $prop->setAccessible(true);
$prop->setValue($instance, $request);
}
}
if ($response !== null && $reflect->hasProperty('response')) {
$prop = $reflect->getProperty('response');
if ($prop->isPublic() || $prop->isProtected()) {
// $prop->setAccessible(true);
$prop->setValue($instance, $response);
}
}
} catch (\Throwable) {
// 注入失败不影响主流程
} }
} }
+98 -14
View File
@@ -12,6 +12,7 @@ use Kiri\Router\Format\NoBody;
use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\RequestHandlerInterface; use Psr\Http\Server\RequestHandlerInterface;
use Swoole\Coroutine;
class Handler implements RequestHandlerInterface class Handler implements RequestHandlerInterface
{ {
@@ -196,26 +197,109 @@ class Handler implements RequestHandlerInterface
/** /**
* @return void * 异步执行 Defer 回调 — 不阻塞客户端响应
* 提前捕获当前协程的 request/response,新建协程后注入到回调实例
*/ */
private function executeDeferred(): void private function executeDeferred(): void
{ {
foreach ($this->deferred as $defer) { if (empty($this->deferred)) {
try { return;
$callback = $defer->callback; }
$params = $defer->params;
if (is_array($callback)) { $defers = $this->deferred;
[$class, $method] = $callback; $this->deferred = [];
$instance = Kiri::getDi()->get($class);
call_user_func([$instance, $method], ...$params); // 在创建子协程前捕获父协程的 request/response 上下文
} else { // 子协程的 Coroutine Context 是空的,不继承父协程
$instance = Kiri::getDi()->get($callback); $request = \request();
call_user_func([$instance, '__invoke'], ...$params); $response = \response();
// 非协程环境直接同步执行,避免协程永不调度导致回调丢失
if (Coroutine::getCid() <= 0) {
foreach ($defers as $defer) {
try {
self::invokeDeferCallback($defer, $request, $response);
} catch (\Throwable $throwable) {
\Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage());
} }
} catch (\Throwable $throwable) {
\Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage());
} }
return;
}
Coroutine::create(function () use ($defers, $request, $response) {
foreach ($defers as $defer) {
try {
self::invokeDeferCallback($defer, $request, $response);
} catch (\Throwable $throwable) {
\Kiri::getLogger()->error('Defer callback failed: ' . $throwable->getMessage());
}
}
});
}
/**
* 调用单个 Defer 回调,自动注入 request/response 到回调实例
*
* @param Defer $defer 延迟回调配置
* @param ServerRequestInterface $request 当前请求
* @param ResponseInterface $response 当前响应
*/
private static function invokeDeferCallback(
Defer $defer,
ServerRequestInterface $request,
ResponseInterface $response
): void {
$callback = $defer->callback;
$params = $defer->params;
if (is_array($callback)) {
[$class, $method] = $callback;
$instance = Kiri::getDi()->get($class);
self::injectContextToInstance($instance, $request, $response);
call_user_func([$instance, $method], ...$params);
} else {
$instance = Kiri::getDi()->get($callback);
self::injectContextToInstance($instance, $request, $response);
call_user_func([$instance, '__invoke'], ...$params);
}
}
/**
* 将 request/response 注入到回调实例
* 通过反射为实例设置 request 和 response 属性
* 已有 #[Container] 注入的属性会被覆盖(子协程中 DI 注入的是错误上下文)
*
* @param object $instance 回调类实例
* @param ServerRequestInterface $request 当前请求
* @param ResponseInterface $response 当前响应
*/
private static function injectContextToInstance(
object $instance,
ServerRequestInterface $request,
ResponseInterface $response
): void {
try {
$reflect = new \ReflectionClass($instance);
if ($reflect->hasProperty('request')) {
$prop = $reflect->getProperty('request');
if ($prop->isPublic() || $prop->isProtected()) {
$prop->setAccessible(true);
$prop->setValue($instance, $request);
}
}
if ($reflect->hasProperty('response')) {
$prop = $reflect->getProperty('response');
if ($prop->isPublic() || $prop->isProtected()) {
$prop->setAccessible(true);
$prop->setValue($instance, $response);
}
}
} catch (\Throwable) {
// 注入失败不影响主流程
} }
} }