Compare commits

...

8 Commits

Author SHA1 Message Date
as2252258 c2fde2e6d7 Revert "改名"
This reverts commit fdf58326
2022-01-24 11:46:11 +08:00
as2252258 5ddd0bc6d7 Revert "改名"
This reverts commit fdf58326
2022-01-21 15:22:32 +08:00
as2252258 d22db391f3 Revert "改名"
This reverts commit fdf58326
2022-01-20 19:04:15 +08:00
as2252258 bcd78aba56 Revert "改名"
This reverts commit fdf58326
2022-01-18 17:50:32 +08:00
as2252258 c899f51f35 Revert "改名"
This reverts commit fdf58326
2022-01-18 16:07:05 +08:00
as2252258 1419e96c89 Revert "改名"
This reverts commit fdf58326
2022-01-17 19:04:26 +08:00
as2252258 224b52db49 Revert "改名"
This reverts commit fdf58326
2022-01-17 18:45:00 +08:00
as2252258 248f4d7100 Revert "改名"
This reverts commit fdf58326
2022-01-14 16:11:23 +08:00
13 changed files with 60 additions and 351 deletions
-17
View File
@@ -1,17 +0,0 @@
<?php
namespace Kiri\Server\Contract;
use Swoole\Server;
interface OnTaskInterface
{
public function execute();
public function finish(Server $server, int $task_id);
}
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Kiri\Server\Events;
class OnProcessStart
{
}
-4
View File
@@ -15,10 +15,6 @@ class OnPipeMessage extends Server
{
/** @var EventDispatch */
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* @param \Swoole\Server $server
+4 -10
View File
@@ -18,13 +18,7 @@ use Kiri\Server\Events\OnStart;
*/
class OnServer extends Server
{
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* @param \Swoole\Server $server
@@ -35,7 +29,7 @@ class OnServer extends Server
{
$this->setProcessName(sprintf('start[%d].server', $server->master_pid));
$this->eventDispatch->dispatch(new OnStart($server));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnStart($server));
}
@@ -45,7 +39,7 @@ class OnServer extends Server
*/
public function onBeforeShutdown(\Swoole\Server $server)
{
$this->eventDispatch->dispatch(new OnBeforeShutdown($server));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnBeforeShutdown($server));
}
@@ -55,7 +49,7 @@ class OnServer extends Server
*/
public function onShutdown(\Swoole\Server $server)
{
$this->eventDispatch->dispatch(new OnShutdown($server));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnShutdown($server));
}
+2 -8
View File
@@ -18,12 +18,6 @@ use Kiri\Server\Events\OnManagerStop;
class OnServerManager extends Server
{
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* @param \Swoole\Server $server
@@ -33,7 +27,7 @@ class OnServerManager extends Server
{
$this->setProcessName(sprintf('manger[%d].0', $server->manager_pid));
$this->eventDispatch->dispatch(new OnManagerStart($server));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnManagerStart($server));
}
@@ -43,7 +37,7 @@ class OnServerManager extends Server
*/
public function onManagerStop(\Swoole\Server $server)
{
$this->eventDispatch->dispatch(new OnManagerStop($server));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnManagerStop($server));
}
+2 -9
View File
@@ -16,20 +16,13 @@ class OnServerReload
{
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* @param Server $server
* @throws \ReflectionException
*/
public function onBeforeReload(Server $server)
{
$this->eventDispatch->dispatch(new OnBeforeReload($server));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnBeforeReload($server));
}
@@ -39,7 +32,7 @@ class OnServerReload
*/
public function onAfterReload(Server $server)
{
$this->eventDispatch->dispatch(new OnAfterReload($server));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnAfterReload($server));
}
}
+8 -14
View File
@@ -28,13 +28,6 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
{
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
public Router $collector;
@@ -54,19 +47,20 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
*/
public function onWorkerStart(Server $server, int $workerId)
{
$this->eventDispatch->dispatch(new OnBeforeWorkerStart($workerId));
$dispatch = \Kiri::getDi()->get(EventDispatch::class);
$dispatch->dispatch(new OnBeforeWorkerStart($workerId));
set_env('environmental_workerId', $workerId);
if ($workerId < $server->setting['worker_num']) {
$this->setProcessName(sprintf('Worker[%d].%d', $server->worker_pid, $workerId));
$this->collector->scan_build_route();
$this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId));
$dispatch->dispatch(new OnWorkerStart($server, $workerId));
set_env('environmental', Kiri::WORKER);
} else {
$this->eventDispatch->dispatch(new OnTaskStart($server, $workerId));
$dispatch->dispatch(new OnTaskStart($server, $workerId));
$this->setProcessName(sprintf('Tasker[%d].%d', $server->worker_pid, $workerId));
set_env('environmental', Kiri::TASK);
}
$this->eventDispatch->dispatch(new OnAfterWorkerStart());
$dispatch->dispatch(new OnAfterWorkerStart());
}
@@ -78,7 +72,7 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
public function onWorkerStop(Server $server, int $workerId)
{
Timer::clearAll();
$this->eventDispatch->dispatch(new OnWorkerStop($server, $workerId));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerStop($server, $workerId));
}
@@ -89,7 +83,7 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
*/
public function onWorkerExit(Server $server, int $workerId)
{
$this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerExit($server, $workerId));
}
@@ -103,7 +97,7 @@ class OnServerWorker extends \Kiri\Server\Abstracts\Server
*/
public function onWorkerError(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal)
{
$this->eventDispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
\Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
$message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s',
$worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9)
+14 -3
View File
@@ -2,14 +2,16 @@
namespace Kiri\Server;
use Kiri;
use Kiri\Abstracts\Config;
use Kiri\Annotation\Inject;
use Kiri\Context;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri;
use Kiri\Server\Abstracts\BaseProcess;
use Kiri\Server\Broadcast\Message;
use Kiri\Server\Contract\OnProcessInterface;
use Kiri\Server\Events\OnProcessStart;
use Psr\Log\LoggerInterface;
use Swoole\Coroutine;
use Swoole\Process;
@@ -40,8 +42,14 @@ class ProcessManager
$system = sprintf('[%s].process', Config::get('id', 'system-service'));
$this->logger->debug($system . ' ' . $customProcess->getName() . ' start.');
$server->addProcess($process = $this->parse($customProcess, $system));
$process = $this->parse($customProcess, $system);
if (Context::inCoroutine()) {
Coroutine::create(function () use ($process) {
$process->start();
});
} else {
$server->addProcess($process = $this->parse($customProcess, $system));
}
$this->_process[$customProcess->getName()] = $process;
}
@@ -57,6 +65,9 @@ class ProcessManager
if (Kiri::getPlatform()->isLinux()) {
$process->name($system . '(' . $customProcess->getName() . ')');
}
Kiri::getDi()->get(EventDispatch::class)->dispatch(new OnProcessStart());
set_env('environmental', Kiri::PROCESS);
$channel = Coroutine::create(function () use ($process, $customProcess) {
while (!$customProcess->isStop()) {
+6 -6
View File
@@ -11,11 +11,11 @@ use Kiri\Annotation\Inject;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Message\Handler\Abstracts\HttpService;
use Kiri\Message\Handler\Router;
use Kiri\Server\Events\OnShutdown;
use Psr\Container\ContainerExceptionInterface;
use Kiri\Server\Events\OnServerBeforeStart;
use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
use Swoole\Coroutine;
@@ -80,7 +80,7 @@ class Server extends HttpService
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws ReflectionException
* @throws Exception
*/
public function start(): mixed
@@ -95,9 +95,9 @@ class Server extends HttpService
$processes = array_merge($this->process, Config::get('processes', []));
$this->container->get(ProcessManager::class)->batch($processes);
$this->getContainer()->get(ProcessManager::class)->batch($processes);
$this->eventDispatch->dispatch(new OnServerBeforeStart());
$this->getEventDispatch()->dispatch(new OnServerBeforeStart());
return $this->manager->getServer()->start();
}
@@ -108,7 +108,7 @@ class Server extends HttpService
* @throws ConfigException
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
* @throws ReflectionException
* @throws Exception
*/
public function shutdown()
@@ -117,7 +117,7 @@ class Server extends HttpService
foreach ($this->manager->sortService($configs['ports'] ?? []) as $config) {
$this->state->exit($config['port']);
}
$this->container->get(EventDispatch::class)->dispatch(new OnShutdown());
$this->getContainer()->get(EventDispatch::class)->dispatch(new OnShutdown());
}
-6
View File
@@ -30,12 +30,6 @@ class ServerCommand extends Command
const ACTIONS = ['start', 'stop', 'restart'];
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $eventProvider;
/**
*
+16 -89
View File
@@ -17,15 +17,11 @@ use Kiri\Server\Contract\OnPacketInterface;
use Kiri\Server\Contract\OnReceiveInterface;
use Kiri\Server\Handler\OnPipeMessage;
use Kiri\Server\Handler\OnServer;
use Kiri\Server\Handler\OnServerManager;
use Kiri\Server\Handler\OnServerReload;
use Kiri\Server\Handler\OnServerWorker;
use Kiri\Server\Tasker\OnServerTask;
use Kiri\Websocket\WebSocketInterface;
use Kiri\Task\TaskManager;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Swoole\Http\Server as HServer;
use Swoole\Process;
use Swoole\Server;
use Swoole\Server\Port;
use Swoole\WebSocket\Server as WServer;
@@ -62,20 +58,12 @@ class ServerManager extends Component
private Server|null $server = null;
protected array $initProcesses = [];
const DEFAULT_EVENT = [
Constant::WORKER_START => [OnServerWorker::class, 'onWorkerStart'],
Constant::WORKER_EXIT => [OnServerWorker::class, 'onWorkerExit'],
Constant::WORKER_STOP => [OnServerWorker::class, 'onWorkerStop'],
Constant::WORKER_ERROR => [OnServerWorker::class, 'onWorkerError'],
Constant::MANAGER_START => [OnServerManager::class, 'onManagerStart'],
Constant::MANAGER_STOP => [OnServerManager::class, 'onManagerStop'],
Constant::BEFORE_RELOAD => [OnServerReload::class, 'onBeforeReload'],
Constant::AFTER_RELOAD => [OnServerReload::class, 'onAfterReload'],
Constant::START => [OnServer::class, 'onStart'],
Constant::BEFORE_SHUTDOWN => [OnServer::class, 'onBeforeShutdown'],
Constant::SHUTDOWN => [OnServer::class, 'onShutdown'],
];
@@ -139,16 +127,25 @@ class ServerManager extends Component
foreach ($this->sortService($configs['ports']) as $config) {
$this->startListenerHandler($context, $config, $daemon);
}
$this->bindCallback([Constant::PIPE_MESSAGE => [OnPipeMessage::class, 'onPipeMessage']]);
$this->bindPipeMessage();
}
/**
* @return array<string,Process>
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function getProcesses(): array
public function bindPipeMessage(): void
{
return $this->initProcesses;
$pipeMessage = $this->getContainer()->get(OnPipeMessage::class);
$this->server->on(Constant::PIPE_MESSAGE, [$pipeMessage, 'onPipeMessage']);
if (!isset($this->server->setting['task_worker_num']) || $this->server->setting['task_worker_num'] < 1) {
return;
}
$this->getContainer()->get(TaskManager::class)->taskListener($this->server);
}
@@ -247,22 +244,6 @@ class ServerManager extends Component
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws Exception
*
*
*
* $data = new Table($this->container->get(OutputInterface::class));
* $data->setHeaders(['key', 'value']);
*
* $array = [];
* foreach ($this->server->setting as $key => $value) {
* $array[] = [$key, $value];
* $array[] = new TableSeparator();
* }
*
* array_pop($array);
*
* $data->setStyle('box-double');
* $data->setRows($array);
*/
private function createBaseServer(string $type, string $host, int $port, int $mode, array $settings = [])
{
@@ -275,7 +256,7 @@ class ServerManager extends Component
$this->server = new $match($host, $port, SWOOLE_PROCESS, $mode);
$this->server->set(array_merge(Config::get('server.settings', []), $settings['settings']));
$this->container->setBindings(SwooleServerInterface::class, $this->server);
$this->getContainer()->setBindings(SwooleServerInterface::class, $this->server);
$id = Config::get('id', 'system-service');
@@ -292,9 +273,6 @@ class ServerManager extends Component
*/
private function addDefaultListener(array $settings): void
{
if (($this->server->setting['task_worker_num'] ?? 0) > 0) {
$this->addTaskListener($settings['events']);
}
$this->addServiceEvents(ServerManager::DEFAULT_EVENT, $this->server);
if (!empty($settings['events']) && is_array($settings['events'])) {
$this->addServiceEvents($settings['events'], $this->server);
@@ -312,7 +290,7 @@ class ServerManager extends Component
{
foreach ($events as $name => $event) {
if (is_array($event) && is_string($event[0])) {
$event[0] = $this->container->get($event[0]);
$event[0] = $this->getContainer()->get($event[0]);
}
$server->on($name, $event);
}
@@ -328,55 +306,4 @@ class ServerManager extends Component
}
/**
* @param mixed $message
* @param int $workerId
* @return mixed
*/
public function sendMessage(mixed $message, int $workerId): mixed
{
return $this->server?->sendMessage($message, $workerId);
}
/**
* @param array $events
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
private function addTaskListener(array $events = []): void
{
$task_use_object = $this->server->setting['task_object'] ?? $this->server->setting['task_use_object'] ?? false;
$reflect = $this->container->get(OnServerTask::class);
$this->server->on('finish', $events[Constant::FINISH] ?? [$reflect, 'onFinish']);
if ($task_use_object || $this->server->setting['task_enable_coroutine']) {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onCoroutineTask']);
} else {
$this->server->on('task', $events[Constant::TASK] ?? [$reflect, 'onTask']);
}
}
/**
* @param array|null $settings
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function bindCallback(?array $settings = [])
{
if (count($settings) < 1) {
return;
}
foreach ($settings as $event_type => $callback) {
if ($this->server->getCallback($event_type) !== null) {
continue;
}
if (is_array($callback) && !is_object($callback[0])) {
$callback[0] = $this->container->get($callback[0]);
}
$this->server->on($event_type, $callback);
}
}
}
-91
View File
@@ -1,91 +0,0 @@
<?php
namespace Kiri\Server\Tasker;
use Exception;
use Kiri\Abstracts\Component;
use Kiri\Core\HashMap;
use Kiri;
use ReflectionException;
use Kiri\Server\Contract\OnTaskInterface;
use Kiri\Server\SwooleServerInterface;
use Swoole\Server;
/**
*
*/
class AsyncTaskExecute extends Component
{
/**
* @var Server|\Swoole\WebSocket\Server|\Swoole\Http\Server
*/
public mixed $server = null;
private HashMap $hashMap;
/**
*
*/
public function init()
{
$this->hashMap = new HashMap();
}
/**
* @param string $key
* @param $handler
*/
public function reg(string $key, $handler)
{
$this->hashMap->put($key, $handler);
}
/**
* @param OnTaskInterface|string $handler
* @param array $params
* @param int $workerId
* @throws Exception
*/
public function execute(OnTaskInterface|string $handler, array $params = [], int $workerId = -1)
{
if (!$this->server) {
$this->server = Kiri::getDi()->get(SwooleServerInterface::class);
}
if ($workerId < 0 || $workerId > $this->server->setting['task_worker_num']) {
$workerId = random_int(0, $this->server->setting['task_worker_num'] - 1);
}
if (is_string($handler)) {
$handler = $this->handle($handler, $params);
}
$this->server->task(serialize($handler), $workerId);
}
/**
* @param $handler
* @param $params
* @return object
* @throws ReflectionException
* @throws Exception
*/
private function handle($handler, $params): object
{
if (!class_exists($handler) && $this->hashMap->has($handler)) {
$handler = $this->hashMap->get($handler);
}
$implements = $this->container->getReflect($handler);
if (!in_array(OnTaskInterface::class, $implements->getInterfaceNames())) {
throw new Exception('Task must instance ' . OnTaskInterface::class);
}
return $implements->newInstanceArgs($params);
}
}
-94
View File
@@ -1,94 +0,0 @@
<?php
namespace Kiri\Server\Tasker;
use Kiri\Annotation\Inject;
use Kiri\Abstracts\Logger;
use Kiri\Exception\ConfigException;
use Kiri\Server\Contract\OnTaskInterface;
use Swoole\Server;
/**
* Class OnServerTask
* @package Server\Task
*/
class OnServerTask
{
#[Inject(Logger::class)]
public Logger $logger;
/**
* @param Server $server
* @param int $task_id
* @param int $src_worker_id
* @param mixed $data
* @throws ConfigException
*/
public function onTask(Server $server, int $task_id, int $src_worker_id, mixed $data)
{
try {
$data = $this->resolve($data);
} catch (\Throwable $exception) {
$data = jTraceEx($exception);
$this->logger->error('task', [error_trigger_format($exception)]);
} finally {
$server->finish($data);
}
}
/**
* @param Server $server
* @param Server\Task $task
* @throws ConfigException
*/
public function onCoroutineTask(Server $server, Server\Task $task)
{
try {
$data = $this->resolve($task->data);
} catch (\Throwable $exception) {
$data = jTraceEx($exception);
$this->logger->error('task', [error_trigger_format($exception)]);
} finally {
$task->finish($data);
}
}
/**
* @param $data
* @return null
*/
private function resolve($data)
{
$execute = unserialize($data);
if ($execute instanceof OnTaskInterface) {
return $execute->execute();
}
return null;
}
/**
* @param Server $server
* @param int $task_id
* @param mixed $data
*/
public function onFinish(Server $server, int $task_id, mixed $data)
{
if (!($data instanceof OnTaskInterface)) {
return;
}
$data->finish($server, $task_id);
}
}