diff --git a/Kiri.php b/Kiri.php index 63b70222..1ea63a95 100644 --- a/Kiri.php +++ b/Kiri.php @@ -14,7 +14,7 @@ use Kiri\Application; use Kiri\Core\Json; use Kiri\Di\Container; use Kiri\Environmental; -use Kiri\Server\Tasker\AsyncTaskExecute; +use Kiri\Task\AsyncTaskExecute; use Psr\Container\ContainerInterface; use Swoole\Coroutine; use Swoole\Process; diff --git a/composer.json b/composer.json index e2705b4a..4f014c0d 100644 --- a/composer.json +++ b/composer.json @@ -36,6 +36,7 @@ "Kiri\\Websocket\\": "kiri-websocket-server/", "Gii\\": "kiri-gii/", "Kiri\\Annotation\\": "kiri-annotation/", + "Kiri\\Server\\": "kiri-server/", "Kiri\\Task\\": "kiri-task/" }, "files": [ diff --git a/kiri-engine/Abstracts/BaseApplication.php b/kiri-engine/Abstracts/BaseApplication.php index c1b89471..07a99c52 100644 --- a/kiri-engine/Abstracts/BaseApplication.php +++ b/kiri-engine/Abstracts/BaseApplication.php @@ -22,7 +22,9 @@ use Kiri\Di\LocalService; use Kiri\Error\{ErrorHandler, Logger}; use Kiri\Exception\{InitException, NotFindClassException}; use ReflectionException; -use Kiri\Server\{Contract\OnTaskInterface, Server, ServerManager, Tasker\AsyncTaskExecute}; +use Kiri\Server\{Server, ServerManager}; +use Kiri\Task\AsyncTaskExecute; +use Kiri\Task\OnTaskInterface; use Swoole\Table; /** @@ -200,7 +202,7 @@ abstract class BaseApplication extends Component /** * @param OnTaskInterface $execute - * @throws ReflectionException + * @throws ReflectionException|Exception */ public function task(OnTaskInterface $execute): void { diff --git a/kiri-engine/Async.php b/kiri-engine/Async.php index a67cc46e..55adc13f 100644 --- a/kiri-engine/Async.php +++ b/kiri-engine/Async.php @@ -7,7 +7,7 @@ namespace Kiri; use Exception; use Kiri\Abstracts\Component; use Kiri\Server\ServerManager; -use Kiri\Server\Tasker\AsyncTaskExecute; +use Kiri\Task\AsyncTaskExecute; use Kiri; /** * Class Async diff --git a/kiri-gii/GiiTask.php b/kiri-gii/GiiTask.php index 5a2b6479..bf6bd87d 100644 --- a/kiri-gii/GiiTask.php +++ b/kiri-gii/GiiTask.php @@ -31,7 +31,7 @@ class GiiTask extends GiiBase namespace App\Async; -use Kiri\Server\Contract\OnTaskInterface; +use Kiri\Task\OnTaskInterface; '; diff --git a/kiri-server/CoroutineServer.php b/kiri-server/CoroutineServer.php new file mode 100644 index 00000000..12e1c909 --- /dev/null +++ b/kiri-server/CoroutineServer.php @@ -0,0 +1,115 @@ +get($customProcess); + } + $this->manager->add(function (Pool $pool, int $workerId) use ($customProcess, $system) { + $process = $pool->getProcess($workerId); + + if (Kiri::getPlatform()->isLinux()) { + $process->name($system . '(' . $customProcess->getName() . ')'); + } + + Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnProcessStart()); + + set_env('environmental', Kiri::PROCESS); + $channel = Coroutine::create(function () use ($process, $customProcess) { + while (!$customProcess->isStop()) { + $message = $process->read(); + if (!empty($message)) { + $message = unserialize($message); + } + if (is_null($message)) { + continue; + } + $customProcess->onBroadcast($message); + } + }); + Context::setContext('waite:process:message', $channel); + + $customProcess->onSigterm()->process($process); + + }, $customProcess->isEnableCoroutine()); + } + + + /** + * @param array $settings + * @return void + */ + public function httpServer(array $settings = []): void + { + $this->manager->add(function (Pool $pool, int $workerId) use ($settings) { + $host = $settings['host']; + $port = $settings['port']; + + $server = new Server($host, $port, false, true); + $server->set($settings); + + $callback = $settings['events'][Constant::REQUEST] ?? null; + if (is_null($callback)) { + $pool->shutdown(); + return; + } + if (is_string($callback[0])) { + $callback[0] = Kiri::getDi()->get($callback[0]); + } + $server->handle('/', $callback); + $server->start(); + }, true); + } + + + /** + * @param array $settings + * @return void + */ + public function websocketServer(array $settings) + { + $this->manager->add(function () use ($settings) { + $host = $settings['host']; + $port = $settings['port']; + + $server = new Server($host, $port, false, true); + $server->set($settings); + $hServer = \Kiri::getDi()->get(\Kiri\Message\Server::class); + $server->handle('/', function (Request $request, Response $response) use ($hServer) { + call_user_func([$hServer, 'onRequest'], $request, $response); + }); + $server->start(); + }, true); + + } + + +} diff --git a/kiri-task/Annotation/AsynchronousTask.php b/kiri-task/Annotation/AsynchronousTask.php new file mode 100644 index 00000000..34b7719e --- /dev/null +++ b/kiri-task/Annotation/AsynchronousTask.php @@ -0,0 +1,33 @@ +get(TaskManager::class); + $AsyncTaskExecute->add($this->name, $class::class); + return parent::execute($class, $method); // TODO: Change the autogenerated stub + } + +} diff --git a/kiri-task/AsyncTaskExecute.php b/kiri-task/AsyncTaskExecute.php new file mode 100644 index 00000000..26b0ce52 --- /dev/null +++ b/kiri-task/AsyncTaskExecute.php @@ -0,0 +1,41 @@ +get(SwooleServerInterface::class); + if ($workerId < 0 || $workerId > $server->setting['task_worker_num']) { + $workerId = random_int(0, $server->setting['task_worker_num'] - 1); + } + if (is_string($handler)) { + $handler = $this->handle($handler, $params); + } + $server->task(serialize($handler), $workerId); + } + + +} diff --git a/kiri-task/CoroutineTaskExecute.php b/kiri-task/CoroutineTaskExecute.php new file mode 100644 index 00000000..42f8d554 --- /dev/null +++ b/kiri-task/CoroutineTaskExecute.php @@ -0,0 +1,85 @@ +hashMap = new HashMap(); + + $this->channel = new Coroutine\Channel($this->total); + + $this->taskServer = \Kiri::getDi()->get(OnServerTask::class); + + Coroutine::create(function () { + $barrier = Coroutine\Barrier::make(); + for ($i = 0; $i < 50; $i++) { + Coroutine::create(function () { + $this->handler(); + }); + } + Coroutine\Barrier::wait($barrier); + }); + } + + + /** + * @return void + * @throws ConfigException + */ + protected function handler() + { + $data = $this->channel->pop(-1); + + $task = new Task(); + $task->data = $data; + + $this->taskServer->onCoroutineTask(null, $task); + + $this->handler(); + } + + + /** + * @param OnTaskInterface|string $handler + * @param array $params + * @param int $workerId + * @return void + * @throws ReflectionException + */ + public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1) + { + if (is_string($handler)) { + $handler = $this->handle($handler, $params); + } + $this->channel->push(serialize($handler)); + } + +} diff --git a/kiri-task/OnServerTask.php b/kiri-task/OnServerTask.php new file mode 100644 index 00000000..513c6876 --- /dev/null +++ b/kiri-task/OnServerTask.php @@ -0,0 +1,94 @@ +resolve($data); + } catch (\Throwable $exception) { + $data = jTraceEx($exception); + + $this->logger->error('task', [error_trigger_format($exception)]); + } finally { + $server->finish($data); + } + } + + + /** + * @param Server|null $server + * @param Server\Task $task + * @throws ConfigException + */ + public function onCoroutineTask(?Server $server, Server\Task $task) + { + try { + $data = $this->resolve($task->data); + } catch (\Throwable $exception) { + $data = jTraceEx($exception); + + $this->logger->error('task', [error_trigger_format($exception)]); + } finally { + $task->finish($data); + } + } + + + /** + * @param $data + * @return null + */ + private function resolve($data) + { + $execute = unserialize($data); + if ($execute instanceof OnTaskInterface) { + return $execute->execute(); + } + return null; + } + + + /** + * @param Server $server + * @param int $task_id + * @param mixed $data + */ + public function onFinish(Server $server, int $task_id, mixed $data) + { + if (!($data instanceof OnTaskInterface)) { + return; + } + $data->finish($server, $task_id); + } + + +} diff --git a/kiri-task/OnTaskInterface.php b/kiri-task/OnTaskInterface.php new file mode 100644 index 00000000..7bcc389c --- /dev/null +++ b/kiri-task/OnTaskInterface.php @@ -0,0 +1,17 @@ +hashMap = new HashMap(); + } + + + /** + * @param Server $swollen + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function taskListener(Server $swollen) + { + if (!isset($swollen->setting['task_worker_num']) || $swollen->setting['task_worker_num'] < 1) { + return; + } + + $task_use_object = $swollen->setting['task_object'] ?? $swollen->setting['task_use_object'] ?? false; + $reflect = $this->container->get(OnServerTask::class); + + $swollen->on('finish', [$reflect, 'onFinish']); + if ($task_use_object || $swollen->setting['task_enable_coroutine']) { + $swollen->on('task', [$reflect, 'onCoroutineTask']); + } else { + $swollen->on('task', [$reflect, 'onTask']); + } + } + + + /** + * @param string $key + * @param $handler + */ + public function add(string $key, $handler) + { + $this->hashMap->put($key, $handler); + } + + + /** + * @param string $key + * @return OnTaskInterface + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface + */ + public function get(string $key): OnTaskInterface + { + $task = $this->hashMap->get($key); + if (is_string($task)) { + $task = $this->getContainer()->get($task); + } + return $task; + } + + +} diff --git a/kiri-task/TaskResolve.php b/kiri-task/TaskResolve.php new file mode 100644 index 00000000..a624dc00 --- /dev/null +++ b/kiri-task/TaskResolve.php @@ -0,0 +1,31 @@ +hashMap->has($handler)) { + $handler = $this->hashMap->get($handler); + } + $implements = $this->container->getReflect($handler); + if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) { + throw new Exception('Task must instance ' . OnTaskInterface::class); + } + return $implements->newInstanceArgs($params); + } + +}