Files
kiri-http-server/Abstracts/CoroutineServer.php
T

243 lines
6.0 KiB
PHP
Raw Normal View History

2022-06-20 17:25:01 +08:00
<?php
namespace Kiri\Server\Abstracts;
use Kiri\Abstracts\Config;
use Kiri\Di\ContainerInterface;
use Kiri\Events\EventDispatch;
2022-06-22 10:53:58 +08:00
use Kiri\Events\EventProvider;
2022-06-20 18:00:00 +08:00
use Kiri\Exception\ConfigException;
2022-06-20 17:25:01 +08:00
use Kiri\Server\Constant;
2022-06-20 18:14:25 +08:00
use Kiri\Server\Events\OnShutdown;
2022-06-20 17:25:01 +08:00
use Kiri\Server\Events\OnWorkerStart;
use Kiri\Server\Events\OnWorkerStop;
use Kiri\Server\ServerInterface;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use Swoole\Coroutine;
use Swoole\Http\Request;
use Swoole\Http\Response;
2022-06-22 10:21:56 +08:00
use Swoole\Process;
2022-06-20 17:25:01 +08:00
use Swoole\Server;
use function Swoole\Coroutine\run;
class CoroutineServer implements ServerInterface
{
/**
* @var array<Coroutine\Server|Coroutine\Http\Server>
*/
private array $servers = [];
use TraitServer;
2022-06-20 18:14:25 +08:00
private bool $isShutdown = false;
2022-06-20 17:25:01 +08:00
/**
* @param Config $config
* @param ContainerInterface $container
* @param EventDispatch $dispatch
2022-06-22 10:53:58 +08:00
* @param EventProvider $provider
2022-06-20 17:25:01 +08:00
* @param LoggerInterface $logger
* @param ProcessManager $processManager
* @param array $params
*/
public function __construct(public Config $config,
public ContainerInterface $container,
public EventDispatch $dispatch,
2022-06-22 10:53:58 +08:00
public EventProvider $provider,
2022-06-20 17:25:01 +08:00
public LoggerInterface $logger,
public ProcessManager $processManager,
public array $params = []
)
{
}
/**
* @param string $name
2022-06-20 17:31:04 +08:00
* @return Server|Coroutine\Server|Coroutine\Http\Server|null
2022-06-20 17:25:01 +08:00
*/
2022-06-20 17:31:04 +08:00
public function getServer(string $name = ''): Server|Coroutine\Server|Coroutine\Http\Server|null
2022-06-20 17:25:01 +08:00
{
2022-06-20 17:31:04 +08:00
if (empty($this->servers)) {
return null;
}
if (empty($name)) {
return current($this->servers);
}
2022-06-20 17:25:01 +08:00
return $this->servers[$name] ?? null;
}
/**
* @param array $service
* @param int $daemon
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function initCoreServers(array $service, int $daemon = 0): void
{
2022-06-22 10:53:58 +08:00
$this->provider->on(OnShutdown::class, function () {
$process = $this->container->get(ProcessManager::class);
$process->shutdown();
});
2022-06-20 17:25:01 +08:00
// TODO: Implement initCoreServers() method.
2022-06-20 18:20:51 +08:00
$service = $this->genConfigService($service);
2022-06-20 18:00:00 +08:00
foreach ($service as $value) {
$this->addListener($value);
}
2022-06-20 17:25:01 +08:00
}
/**
* @param \Kiri\Server\Config $config
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function addListener(\Kiri\Server\Config $config): void
{
// TODO: Implement addListener() method.
$class = $this->getCoroutineServerClass($config->type);
/** @var Coroutine\Server|Coroutine\Http\Server $server */
$server = new $class($config->host, $config->port);
$server->set($config->settings);
2022-06-22 10:13:44 +08:00
if (!($server instanceof Coroutine\Server)) {
2022-06-20 17:25:01 +08:00
$this->onRequestCallback($server, $config);
2022-06-22 10:13:44 +08:00
} else {
$this->onTcpConnection($server, $config);
2022-06-20 17:25:01 +08:00
}
$this->servers[$config->name] = $server;
}
/**
* @param Coroutine\Http\Server $server
* @param \Kiri\Server\Config $config
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function onRequestCallback(Coroutine\Http\Server $server, \Kiri\Server\Config $config): void
{
$requestCallback = $config->events[Constant::REQUEST] ?? null;
if (empty($requestCallback)) {
return;
}
if (is_array($requestCallback) && is_string($requestCallback[0])) {
$requestCallback[0] = $this->container->get($requestCallback[0]);
}
2022-06-22 10:13:44 +08:00
$server->handle('/', $requestCallback);
2022-06-20 17:25:01 +08:00
}
/**
* @param Coroutine\Server $server
* @param \Kiri\Server\Config $config
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
public function onTcpConnection(Coroutine\Server $server, \Kiri\Server\Config $config): void
{
$requestCallback = $config->events[Constant::RECEIVE] ?? null;
if (is_null($requestCallback)) {
return;
}
if (is_array($requestCallback) && is_string($requestCallback[0])) {
$requestCallback[0] = $this->container->get($requestCallback[0]);
}
$closeCallback = $config->events[Constant::CLOSE] ?? null;
$server->handle(function (Coroutine\Server\Connection $connection) use ($requestCallback, $closeCallback) {
defer(function () use ($connection, $closeCallback) {
call_user_func($closeCallback, $connection->exportSocket()->fd);
});
while (true) {
$read = $connection->recv();
if ($read === null || $read === false) {
break;
}
$requestCallback($read);
}
});
}
2022-06-20 18:14:25 +08:00
/**
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
*/
public function shutdown(): void
{
$this->isShutdown = true;
$this->processManager->shutdown();
foreach ($this->servers as $server) {
$server->shutdown();
}
$this->dispatch->dispatch(new OnShutdown());
}
2022-06-20 17:25:01 +08:00
/**
* @return void
2022-06-20 18:00:00 +08:00
* @throws ConfigException
2022-06-20 17:25:01 +08:00
*/
public function start(): void
{
run(function () {
2022-06-22 10:53:58 +08:00
$provider = $this->container->get(EventProvider::class);
$merge = array_merge(Config::get('processes', []), $this->getProcess());
$this->processManager->batch($merge);
2022-06-20 17:25:01 +08:00
foreach ($this->servers as $server) {
Coroutine::create(function () use ($server) {
2022-06-22 10:53:58 +08:00
2022-06-20 18:00:00 +08:00
$this->runServer($server);
2022-06-20 17:25:01 +08:00
});
}
});
2022-06-22 10:21:56 +08:00
Process::wait();
2022-06-20 17:25:01 +08:00
}
/**
* @param Coroutine\Http\Server|Coroutine\Server $server
* @return void
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
* @throws \ReflectionException
*/
public function runServer(Coroutine\Http\Server|Coroutine\Server $server): void
{
$this->dispatch->dispatch(new OnWorkerStart($server, 0));
2022-06-22 10:23:09 +08:00
try {
$server->start();
} catch (\Throwable $throwable) {
$this->logger->error($throwable->getMessage(), [$throwable]);
} finally {
$this->dispatch->dispatch(new OnWorkerStop($server, 0));
if ($this->isShutdown) {
return;
}
$this->runServer($server);
2022-06-20 18:14:25 +08:00
}
2022-06-20 17:25:01 +08:00
}
}