Revert "改名"

This reverts commit fdf58326
This commit is contained in:
2022-01-17 18:45:00 +08:00
parent 248f4d7100
commit 224b52db49
5 changed files with 23 additions and 278 deletions
-17
View File
@@ -1,17 +0,0 @@
<?php
namespace Kiri\Server\Contract;
use Swoole\Server;
interface OnTaskInterface
{
public function execute();
public function finish(Server $server, int $task_id);
}
+3 -3
View File
@@ -11,11 +11,11 @@ use Kiri\Annotation\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Message\Handler\Abstracts\HttpService;
use Kiri\Message\Handler\Router;
use Kiri\Server\Events\OnShutdown;
use Psr\Container\ContainerExceptionInterface;
use Kiri\Server\Events\OnServerBeforeStart;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Swoole\Coroutine;
@@ -80,7 +80,7 @@ class Server extends HttpService
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws ReflectionException
* @throws Exception
*/
public function start(): mixed
@@ -108,7 +108,7 @@ class Server extends HttpService
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws ReflectionException
* @throws Exception
*/
public function shutdown()
+20 -73
View File
@@ -20,8 +20,7 @@ use Kiri\Server\Handler\OnServer;
use Kiri\Server\Handler\OnServerManager;
use Kiri\Server\Handler\OnServerReload;
use Kiri\Server\Handler\OnServerWorker;
use Kiri\Server\Tasker\OnServerTask;
use Kiri\Websocket\WebSocketInterface;
use Kiri\Task\TaskManager;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Http\Server as HServer;
@@ -139,7 +138,25 @@ class ServerManager extends Component
foreach ($this->sortService($configs['ports']) as $config) {
$this->startListenerHandler($context, $config, $daemon);
}
$this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
$this->bindPipeMessage();
}
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function bindPipeMessage(): void
{
$pipeMessage = $this->getContainer()->get(OnPipeMessage::class);
$this->server->on(Constant::PIPE_MESSAGE, [$pipeMessage, 'onPipeMessage']);
if (!isset($this->server->setting['task_worker_num']) || $this->server->setting['task_worker_num'] < 1) {
return;
}
$this->getContainer()->get(TaskManager::class)->taskListener($this->server);
}
@@ -247,22 +264,6 @@ class ServerManager extends Component
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*
*
*
* $data = new Table($this->container->get(OutputInterface::class));
* $data->setHeaders(['key', 'value']);
*
* $array = [];
* foreach ($this->server->setting as $key => $value) {
* $array[] = [$key, $value];
* $array[] = new TableSeparator();
* }
*
* array_pop($array);
*
* $data->setStyle('box-double');
* $data->setRows($array);
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
{
@@ -292,9 +293,6 @@ class ServerManager extends Component
*/
private function addDefaultListener(array $settings): void
{
if (($this->server->setting['task_worker_num'] ?? 0) > 0) {
$this->addTaskListener($settings['events']);
}
$this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server);
if (!empty($settings['events']) && is_array($settings['events'])) {
$this->addServiceEvents($settings['events'], $this->server);
@@ -328,55 +326,4 @@ class ServerManager extends Component
}
/**
* @param mixed $message
* @param int $workerId
* @return mixed
*/
public function sendMessage(mixed $message, int $workerId): mixed
{
return $this->server?->sendMessage($message, $workerId);
}
/**
* @param array $events
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
private function addTaskListener(array $events = []): void
{
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
$reflect = $this->container->get(OnServerTask::class);
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
} else {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']);
}
}
/**
* @param array|null $settings
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function bindCallback(?array $settings = [])
{
if (count($settings) < 1) {
return;
}
foreach ($settings as $event_type => $callback) {
if ($this->server->getCallback($event_type) !== null) {
continue;
}
if (is_array($callback) && !is_object($callback[0])) {
$callback[0] = $this->container->get($callback[0]);
}
$this->server->on($event_type, $callback);
}
}
}
-91
View File
@@ -1,91 +0,0 @@
<?php
namespace Kiri\Server\Tasker;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Core\HashMap;
use Kiri;
use ReflectionException;
use Kiri\Server\Contract\OnTaskInterface;
use Kiri\Server\SwooleServerInterface;
use Swoole\Server;
/**
*
*/
class AsyncTaskExecute extends Component
{
/**
* @var Server|\Swoole\WebSocket\Server|\Swoole\Http\Server
*/
public mixed $server = null;
private HashMap $hashMap;
/**
*
*/
public function init()
{
$this->hashMap = new HashMap();
}
/**
* @param string $key
* @param $handler
*/
public function reg(string $key, $handler)
{
$this->hashMap->put($key, $handler);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int $workerId
* @throws Exception
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
{
if (!$this->server) {
$this->server = Kiri::getDi()->get(SwooleServerInterface::class);
}
if ($workerId < 0 || $workerId > $this->server->setting['task_worker_num']) {
$workerId = random_int(0, $this->server->setting['task_worker_num'] - 1);
}
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param $handler
* @param $params
* @return object
* @throws ReflectionException
* @throws Exception
*/
private function handle($handler, $params): object
{
if (!class_exists($handler) && $this->hashMap->has($handler)) {
$handler = $this->hashMap->get($handler);
}
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
return $implements->newInstanceArgs($params);
}
}
-94
View File
@@ -1,94 +0,0 @@
<?php
namespace Kiri\Server\Tasker;
use Kiri\Annotation\Inject;
use Kiri\Abstracts\Logger;
use Kiri\Exception\ConfigException;
use Kiri\Server\Contract\OnTaskInterface;
use Swoole\Server;
/**
* Class OnServerTask
* @package Server\Task
*/
class OnServerTask
{
#[Inject(Logger::class)]
public Logger $logger;
/**
* @param Server $server
* @param int $task_id
* @param int $src_worker_id
* @param mixed $data
* @throws ConfigException
*/
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data)
{
try {
$data = $this->resolve($data);
} catch (\Throwable $exception) {
$data = jTraceEx($exception);
$this->logger->error('task', [error_trigger_format($exception)]);
} finally {
$server->finish($data);
}
}
/**
* @param Server $server
* @param Server\Task $task
* @throws ConfigException
*/
public function onCoroutineTask(Server $server, Server\Task $task)
{
try {
$data = $this->resolve($task->data);
} catch (\Throwable $exception) {
$data = jTraceEx($exception);
$this->logger->error('task', [error_trigger_format($exception)]);
} finally {
$task->finish($data);
}
}
/**
* @param $data
* @return null
*/
private function resolve($data)
{
$execute = unserialize($data);
if ($execute instanceof OnTaskInterface) {
return $execute->execute();
}
return null;
}
/**
* @param Server $server
* @param int $task_id
* @param mixed $data
*/
public function onFinish(Server $server, int $task_id, mixed $data)
{
if (!($data instanceof OnTaskInterface)) {
return;
}
$data->finish($server, $task_id);
}
}