Revert "改名"
This reverts commit fdf58326
This commit is contained in:
@@ -14,7 +14,7 @@ use Kiri\Application;
|
||||
use Kiri\Core\Json;
|
||||
use Kiri\Di\Container;
|
||||
use Kiri\Environmental;
|
||||
use Kiri\Server\Tasker\AsyncTaskExecute;
|
||||
use Kiri\Task\AsyncTaskExecute;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Process;
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
"Kiri\\Websocket\\": "kiri-websocket-server/",
|
||||
"Gii\\": "kiri-gii/",
|
||||
"Kiri\\Annotation\\": "kiri-annotation/",
|
||||
"Kiri\\Server\\": "kiri-server/",
|
||||
"Kiri\\Task\\": "kiri-task/"
|
||||
},
|
||||
"files": [
|
||||
|
||||
@@ -22,7 +22,9 @@ use Kiri\Di\LocalService;
|
||||
use Kiri\Error\{ErrorHandler, Logger};
|
||||
use Kiri\Exception\{InitException, NotFindClassException};
|
||||
use ReflectionException;
|
||||
use Kiri\Server\{Contract\OnTaskInterface, Server, ServerManager, Tasker\AsyncTaskExecute};
|
||||
use Kiri\Server\{Server, ServerManager};
|
||||
use Kiri\Task\AsyncTaskExecute;
|
||||
use Kiri\Task\OnTaskInterface;
|
||||
use Swoole\Table;
|
||||
|
||||
/**
|
||||
@@ -200,7 +202,7 @@ abstract class BaseApplication extends Component
|
||||
|
||||
/**
|
||||
* @param OnTaskInterface $execute
|
||||
* @throws ReflectionException
|
||||
* @throws ReflectionException|Exception
|
||||
*/
|
||||
public function task(OnTaskInterface $execute): void
|
||||
{
|
||||
|
||||
@@ -7,7 +7,7 @@ namespace Kiri;
|
||||
use Exception;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Server\ServerManager;
|
||||
use Kiri\Server\Tasker\AsyncTaskExecute;
|
||||
use Kiri\Task\AsyncTaskExecute;
|
||||
use Kiri;
|
||||
/**
|
||||
* Class Async
|
||||
|
||||
@@ -31,7 +31,7 @@ class GiiTask extends GiiBase
|
||||
|
||||
namespace App\Async;
|
||||
|
||||
use Kiri\Server\Contract\OnTaskInterface;
|
||||
use Kiri\Task\OnTaskInterface;
|
||||
|
||||
';
|
||||
|
||||
|
||||
@@ -0,0 +1,115 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Server;
|
||||
|
||||
use Kiri;
|
||||
use Kiri\Context;
|
||||
use Kiri\Events\EventDispatch;
|
||||
use Kiri\Server\Abstracts\BaseProcess;
|
||||
use Kiri\Server\Contract\OnProcessInterface;
|
||||
use Kiri\Server\Events\OnProcessStart;
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Coroutine\Http\Server;
|
||||
use Swoole\Http\Request;
|
||||
use Swoole\Http\Response;
|
||||
use Swoole\Process\Manager;
|
||||
use Swoole\Process\Pool;
|
||||
|
||||
|
||||
class CoroutineServer
|
||||
{
|
||||
|
||||
|
||||
private Manager $manager;
|
||||
|
||||
|
||||
/**
|
||||
* @param string|OnProcessInterface|BaseProcess $customProcess
|
||||
* @param string $system
|
||||
* @return void
|
||||
*/
|
||||
public function addProcess(string|OnProcessInterface|BaseProcess $customProcess, string $system)
|
||||
{
|
||||
if (is_string($customProcess)) {
|
||||
$customProcess = Kiri::getDi()->get($customProcess);
|
||||
}
|
||||
$this->manager->add(function (Pool $pool, int $workerId) use ($customProcess, $system) {
|
||||
$process = $pool->getProcess($workerId);
|
||||
|
||||
if (Kiri::getPlatform()->isLinux()) {
|
||||
$process->name($system . '(' . $customProcess->getName() . ')');
|
||||
}
|
||||
|
||||
Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnProcessStart());
|
||||
|
||||
set_env('environmental', Kiri::PROCESS);
|
||||
$channel = Coroutine::create(function () use ($process, $customProcess) {
|
||||
while (!$customProcess->isStop()) {
|
||||
$message = $process->read();
|
||||
if (!empty($message)) {
|
||||
$message = unserialize($message);
|
||||
}
|
||||
if (is_null($message)) {
|
||||
continue;
|
||||
}
|
||||
$customProcess->onBroadcast($message);
|
||||
}
|
||||
});
|
||||
Context::setContext('waite:process:message', $channel);
|
||||
|
||||
$customProcess->onSigterm()->process($process);
|
||||
|
||||
}, $customProcess->isEnableCoroutine());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param array $settings
|
||||
* @return void
|
||||
*/
|
||||
public function httpServer(array $settings = []): void
|
||||
{
|
||||
$this->manager->add(function (Pool $pool, int $workerId) use ($settings) {
|
||||
$host = $settings['host'];
|
||||
$port = $settings['port'];
|
||||
|
||||
$server = new Server($host, $port, false, true);
|
||||
$server->set($settings);
|
||||
|
||||
$callback = $settings['events'][Constant::REQUEST] ?? null;
|
||||
if (is_null($callback)) {
|
||||
$pool->shutdown();
|
||||
return;
|
||||
}
|
||||
if (is_string($callback[0])) {
|
||||
$callback[0] = Kiri::getDi()->get($callback[0]);
|
||||
}
|
||||
$server->handle('/', $callback);
|
||||
$server->start();
|
||||
}, true);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param array $settings
|
||||
* @return void
|
||||
*/
|
||||
public function websocketServer(array $settings)
|
||||
{
|
||||
$this->manager->add(function () use ($settings) {
|
||||
$host = $settings['host'];
|
||||
$port = $settings['port'];
|
||||
|
||||
$server = new Server($host, $port, false, true);
|
||||
$server->set($settings);
|
||||
$hServer = \Kiri::getDi()->get(\Kiri\Message\Server::class);
|
||||
$server->handle('/', function (Request $request, Response $response) use ($hServer) {
|
||||
call_user_func([$hServer, 'onRequest'], $request, $response);
|
||||
});
|
||||
$server->start();
|
||||
}, true);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Task\Annotation;
|
||||
|
||||
|
||||
use Kiri\Annotation\Attribute;
|
||||
use Kiri\Task\TaskManager;
|
||||
|
||||
#[\Attribute(\Attribute::TARGET_CLASS)] class AsynchronousTask extends Attribute
|
||||
{
|
||||
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
*/
|
||||
public function __construct(public string $name)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param mixed $class
|
||||
* @param mixed $method
|
||||
* @return mixed
|
||||
*/
|
||||
public function execute(mixed $class, mixed $method = ''): mixed
|
||||
{
|
||||
$AsyncTaskExecute = \Kiri::getDi()->get(TaskManager::class);
|
||||
$AsyncTaskExecute->add($this->name, $class::class);
|
||||
return parent::execute($class, $method); // TODO: Change the autogenerated stub
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Task;
|
||||
|
||||
use Exception;
|
||||
use Kiri;
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Server\SwooleServerInterface;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
class AsyncTaskExecute extends Component
|
||||
{
|
||||
|
||||
|
||||
use TaskResolve;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @param OnTaskInterface|string $handler
|
||||
* @param array $params
|
||||
* @param int $workerId
|
||||
* @throws Exception
|
||||
*/
|
||||
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
|
||||
{
|
||||
$server = Kiri::getDi()->get(SwooleServerInterface::class);
|
||||
if ($workerId < 0 || $workerId > $server->setting['task_worker_num']) {
|
||||
$workerId = random_int(0, $server->setting['task_worker_num'] - 1);
|
||||
}
|
||||
if (is_string($handler)) {
|
||||
$handler = $this->handle($handler, $params);
|
||||
}
|
||||
$server->task(serialize($handler), $workerId);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Task;
|
||||
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Core\HashMap;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use ReflectionException;
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Server\Task;
|
||||
|
||||
class CoroutineTaskExecute extends Component
|
||||
{
|
||||
|
||||
use TaskResolve;
|
||||
|
||||
|
||||
private HashMap $hashMap;
|
||||
|
||||
|
||||
private Coroutine\Channel $channel;
|
||||
|
||||
|
||||
private OnServerTask $taskServer;
|
||||
|
||||
|
||||
private int $total = 50;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function init()
|
||||
{
|
||||
$this->hashMap = new HashMap();
|
||||
|
||||
$this->channel = new Coroutine\Channel($this->total);
|
||||
|
||||
$this->taskServer = \Kiri::getDi()->get(OnServerTask::class);
|
||||
|
||||
Coroutine::create(function () {
|
||||
$barrier = Coroutine\Barrier::make();
|
||||
for ($i = 0; $i < 50; $i++) {
|
||||
Coroutine::create(function () {
|
||||
$this->handler();
|
||||
});
|
||||
}
|
||||
Coroutine\Barrier::wait($barrier);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return void
|
||||
* @throws ConfigException
|
||||
*/
|
||||
protected function handler()
|
||||
{
|
||||
$data = $this->channel->pop(-1);
|
||||
|
||||
$task = new Task();
|
||||
$task->data = $data;
|
||||
|
||||
$this->taskServer->onCoroutineTask(null, $task);
|
||||
|
||||
$this->handler();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param OnTaskInterface|string $handler
|
||||
* @param array $params
|
||||
* @param int $workerId
|
||||
* @return void
|
||||
* @throws ReflectionException
|
||||
*/
|
||||
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
|
||||
{
|
||||
if (is_string($handler)) {
|
||||
$handler = $this->handle($handler, $params);
|
||||
}
|
||||
$this->channel->push(serialize($handler));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
<?php
|
||||
|
||||
|
||||
namespace Kiri\Task;
|
||||
|
||||
|
||||
use Kiri\Annotation\Inject;
|
||||
use Kiri\Abstracts\Logger;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri\Task\OnTaskInterface;
|
||||
use Swoole\Server;
|
||||
|
||||
|
||||
/**
|
||||
* Class OnServerTask
|
||||
* @package Server\Task
|
||||
*/
|
||||
class OnServerTask
|
||||
{
|
||||
|
||||
|
||||
#[Inject(Logger::class)]
|
||||
public Logger $logger;
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param int $task_id
|
||||
* @param int $src_worker_id
|
||||
* @param mixed $data
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data)
|
||||
{
|
||||
try {
|
||||
$data = $this->resolve($data);
|
||||
} catch (\Throwable $exception) {
|
||||
$data = jTraceEx($exception);
|
||||
|
||||
$this->logger->error('task', [error_trigger_format($exception)]);
|
||||
} finally {
|
||||
$server->finish($data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Server|null $server
|
||||
* @param Server\Task $task
|
||||
* @throws ConfigException
|
||||
*/
|
||||
public function onCoroutineTask(?Server $server, Server\Task $task)
|
||||
{
|
||||
try {
|
||||
$data = $this->resolve($task->data);
|
||||
} catch (\Throwable $exception) {
|
||||
$data = jTraceEx($exception);
|
||||
|
||||
$this->logger->error('task', [error_trigger_format($exception)]);
|
||||
} finally {
|
||||
$task->finish($data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $data
|
||||
* @return null
|
||||
*/
|
||||
private function resolve($data)
|
||||
{
|
||||
$execute = unserialize($data);
|
||||
if ($execute instanceof OnTaskInterface) {
|
||||
return $execute->execute();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $server
|
||||
* @param int $task_id
|
||||
* @param mixed $data
|
||||
*/
|
||||
public function onFinish(Server $server, int $task_id, mixed $data)
|
||||
{
|
||||
if (!($data instanceof OnTaskInterface)) {
|
||||
return;
|
||||
}
|
||||
$data->finish($server, $task_id);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
<?php
|
||||
|
||||
|
||||
namespace Kiri\Task;
|
||||
|
||||
|
||||
use Swoole\Server;
|
||||
|
||||
interface OnTaskInterface
|
||||
{
|
||||
|
||||
public function execute();
|
||||
|
||||
|
||||
public function finish(Server $server, int $task_id);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Task;
|
||||
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Core\HashMap;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
use Swoole\Server;
|
||||
|
||||
class TaskManager extends Component
|
||||
{
|
||||
|
||||
|
||||
private HashMap $hashMap;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function init()
|
||||
{
|
||||
$this->hashMap = new HashMap();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Server $swollen
|
||||
* @return void
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function taskListener(Server $swollen)
|
||||
{
|
||||
if (!isset($swollen->setting['task_worker_num']) || $swollen->setting['task_worker_num'] < 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
$task_use_object = $swollen->setting['task_object'] ?? $swollen->setting['task_use_object'] ?? false;
|
||||
$reflect = $this->container->get(OnServerTask::class);
|
||||
|
||||
$swollen->on('finish', [$reflect, 'onFinish']);
|
||||
if ($task_use_object || $swollen->setting['task_enable_coroutine']) {
|
||||
$swollen->on('task', [$reflect, 'onCoroutineTask']);
|
||||
} else {
|
||||
$swollen->on('task', [$reflect, 'onTask']);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param $handler
|
||||
*/
|
||||
public function add(string $key, $handler)
|
||||
{
|
||||
$this->hashMap->put($key, $handler);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @return OnTaskInterface
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function get(string $key): OnTaskInterface
|
||||
{
|
||||
$task = $this->hashMap->get($key);
|
||||
if (is_string($task)) {
|
||||
$task = $this->getContainer()->get($task);
|
||||
}
|
||||
return $task;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
<?php
|
||||
|
||||
namespace Kiri\Task;
|
||||
|
||||
use Exception;
|
||||
use ReflectionException;
|
||||
|
||||
trait TaskResolve
|
||||
{
|
||||
|
||||
|
||||
/**
|
||||
* @param $handler
|
||||
* @param $params
|
||||
* @return object
|
||||
* @throws ReflectionException
|
||||
* @throws Exception
|
||||
*/
|
||||
private function handle($handler, $params): object
|
||||
{
|
||||
if (!class_exists($handler) && $this->hashMap->has($handler)) {
|
||||
$handler = $this->hashMap->get($handler);
|
||||
}
|
||||
$implements = $this->container->getReflect($handler);
|
||||
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
|
||||
throw new Exception('Task must instance ' . OnTaskInterface::class);
|
||||
}
|
||||
return $implements->newInstanceArgs($params);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user