modify plugin name

This commit is contained in:
2022-02-14 10:42:27 +08:00
parent f1a9b74122
commit 567ceb69c0
2 changed files with 89 additions and 130 deletions
+87 -128
View File
@@ -3,7 +3,6 @@
namespace Kiri\Server\Coroutine;
use Kiri\Abstracts\Component;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException;
use Kiri\Server\Constant;
use Kiri\Server\ProcessManager;
@@ -79,146 +78,106 @@ class Http extends Component
}
public function start(array $config)
{
$this->configs = $this->sortService($config['ports']);
run(function () use ($config) {
$event = \Kiri::getDi()->get(EventDispatch::class);
$this->startTaskWorker($config);
foreach ($config as $value) {
$value = $this->resolveCallback($value);
if ($value['type'] == Constant::SERVER_TYPE_HTTP) {
$onRequest = $value['events'][Constant::REQUEST] ?? null;
if (is_null($onRequest)) {
throw new \Exception('Server callback con\'t null.');
}
Coroutine::create(function () use ($value, $event, $onRequest) {
$this->bindHttpService($value, $onRequest);
});
} else if ($value['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
$handshake = $value['events'][Constant::HANDSHAKE] ?? null;
$open = $value['events'][Constant::OPEN] ?? null;
$close = $value['events'][Constant::CLOSE] ?? null;
$message = $value['events'][Constant::MESSAGE] ?? null;
if (is_null($message)) {
throw new \Exception('Server callback con\'t null.');
}
Coroutine::create(function () use ($value, $handshake, $open, $close, $message) {
$this->WebSocketService($value, $handshake, $open, $close, $message);
});
} else {
$message = $value['events'][Constant::RECEIVE] ?? null;
if (is_null($message)) {
throw new \Exception('Server callback con\'t null.');
}
$conn = $value['events'][Constant::CONNECT] ?? null;
$close = $value['events'][Constant::CLOSE] ?? null;
Coroutine::create(function () use ($value, $message, $conn, $close) {
$this->bindTcpService($value, $message, $conn, $close);
});
}
}
});
}
/**
* @param $value
* @param $onRequest
* @return void
* @throws \Exception
*/
protected function bindHttpService($value, $onRequest)
{
$server = new Coroutine\Http\Server($value['host'], $value['port'], null, true);
$this->servers[$value['port']] = $server;
$server->handle('/', function (Request $request, Response $response) use ($onRequest) {
call_user_func($onRequest, $request, $response);
});
$server->start();
$this->bindHttpService($value, $onRequest);
}
/**
* @param $value
* @param $message
* @param $conn
* @param $close
* @param array $config
* @param $daemon
* @return void
* @throws ContainerExceptionInterface
* @throws Exception
* @throws NotFoundExceptionInterface
*/
protected function bindTcpService($value, $message, $conn, $close)
public function initBaseServer(array $config, $daemon)
{
$server = new Coroutine\Server($value['host'], $value['port'], null, true);
$this->servers[$value['port']] = $server;
$server->handle(function (Coroutine\Server\Connection $connection) use ($message, $conn, $close) {
if (!is_null($conn)) {
call_user_func($conn, $connection);
}
while (true) {
$data = $connection->recv(1024);
if ($data === '' || $data === false) {
defer(function () use ($close, $connection) {
call_user_func($close, $connection);
});
$connection->close();
break;
$this->configs = $config;
foreach ($config as $value) {
$value = $this->resolveCallback($value);
if ($value['type'] == Constant::SERVER_TYPE_HTTP) {
$onRequest = $value['events'][Constant::REQUEST] ?? null;
if (is_null($onRequest)) {
throw new \Exception('Server callback con\'t null.');
}
call_user_func($message, $data);
}
});
$server->start();
$server = new Coroutine\Http\Server($value['host'], $value['port'], null, true);
$this->bindTcpService($value, $message, $conn, $close);
$this->servers[$value['port']] = $server;
$server->handle('/', function (Request $request, Response $response) use ($onRequest) {
call_user_func($onRequest, $request, $response);
});
} else if ($value['type'] == Constant::SERVER_TYPE_WEBSOCKET) {
$handshake = $value['events'][Constant::HANDSHAKE] ?? null;
$open = $value['events'][Constant::OPEN] ?? null;
$close = $value['events'][Constant::CLOSE] ?? null;
$message = $value['events'][Constant::MESSAGE] ?? null;
if (is_null($message)) {
throw new \Exception('Server callback con\'t null.');
}
$server = new Coroutine\Http\Server($value['host'], $value['port'], null, true);
$sender = $this->getContainer()->get(Sender::class);
$sender->setServer($server);
$this->servers[$value['port']] = $server;
$server->handle('/', function (Request $request, Response $response) use ($handshake, $open, $close, $message) {
if (is_null($handshake)) {
$response->upgrade();
} else {
call_user_func($handshake, $request, $response);
}
if ($response->isWritable() && is_callable($open)) {
call_user_func($open, $response);
}
while (($data = $response->recv()) instanceof Frame) {
call_user_func($message, $data);
}
call_user_func($close, $response->fd);
});
} else {
$message = $value['events'][Constant::RECEIVE] ?? null;
if (is_null($message)) {
throw new \Exception('Server callback con\'t null.');
}
$conn = $value['events'][Constant::CONNECT] ?? null;
$close = $value['events'][Constant::CLOSE] ?? null;
$server = new Coroutine\Server($value['host'], $value['port'], null, true);
$this->servers[$value['port']] = $server;
$server->handle(function (Coroutine\Server\Connection $connection) use ($message, $conn, $close) {
if (!is_null($conn)) {
call_user_func($conn, $connection);
}
while (true) {
$data = $connection->recv(1024);
if ($data === '' || $data === false) {
defer(function () use ($close, $connection) {
call_user_func($close, $connection);
});
$connection->close();
break;
}
call_user_func($message, $data);
}
});
}
}
}
/**
* @param $value
* @param $handshake
* @param $open
* @param $close
* @param $message
* @return void
*/
private function WebSocketService($value, $handshake, $open, $close, $message)
public function start()
{
$server = new Coroutine\Http\Server($value['host'], $value['port'], null, true);
$sender = $this->getContainer()->get(Sender::class);
$sender->setServer($server);
$this->servers[$value['port']] = $server;
$server->handle('/', function (Request $request, Response $response) use ($handshake, $open, $close, $message) {
if (is_null($handshake)) {
$response->upgrade();
} else {
call_user_func($handshake, $request, $response);
run(function () {
$this->startTaskWorker($this->configs);
foreach ($this->servers as $value) {
Coroutine::create(function () use ($value) {
$value->start();
});
}
if ($response->isWritable() && is_callable($open)) {
call_user_func($open, $response);
}
while (($data = $response->recv()) instanceof Frame) {
call_user_func($message, $data);
}
call_user_func($close, $response->fd);
});
$server->start();
$this->WebSocketService($value, $handshake, $open, $close, $message);
}
+2 -2
View File
@@ -42,7 +42,7 @@ class Server extends HttpService
public State $state;
public ServerManager $manager;
public Kiri\Server\Coroutine\Http $manager;
/**
@@ -50,7 +50,7 @@ class Server extends HttpService
*/
public function init()
{
$this->manager = Kiri::getContainer()->get(ServerManager::class);
$this->manager = Kiri::getContainer()->get(Kiri\Server\Coroutine\Http::class);
$enable_coroutine = Config::get('servers.settings.enable_coroutine', false);
Config::set('servers.settings.enable_coroutine', true);
if ($enable_coroutine != true) {