modify plugin name
This commit is contained in:
+27
-45
@@ -23,7 +23,11 @@ use Kiri\Abstracts\Config;
|
||||
use Kiri\Events\EventProvider;
|
||||
use Kiri\Exception\NotFindClassException;
|
||||
use Kiri\Server\Events\OnWorkerExit;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
use ReflectionException;
|
||||
use Kiri\Pool\Connection as PoolConnection;
|
||||
use Kiri\Di\ContainerInterface;
|
||||
|
||||
/**
|
||||
* Class Connection
|
||||
@@ -48,6 +52,9 @@ class Connection extends Component
|
||||
|
||||
public array $pool;
|
||||
|
||||
|
||||
private PoolConnection $connection;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
* enable database cache
|
||||
@@ -75,12 +82,17 @@ class Connection extends Component
|
||||
|
||||
/**
|
||||
* @param EventProvider $eventProvider
|
||||
* @param Kiri\Di\ContainerInterface $container
|
||||
* @param array $config
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __construct(public EventProvider $eventProvider, array $config = [])
|
||||
public function __construct(public EventProvider $eventProvider, public ContainerInterface $container, array $config = [])
|
||||
{
|
||||
parent::__construct($config);
|
||||
|
||||
$this->connection = $this->container->get(PoolConnection::class);
|
||||
}
|
||||
|
||||
|
||||
@@ -88,7 +100,7 @@ class Connection extends Component
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function init()
|
||||
public function init(): void
|
||||
{
|
||||
$this->eventProvider->on(OnWorkerExit::class, [$this, 'clear_connection'], 0);
|
||||
$this->eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0);
|
||||
@@ -99,28 +111,16 @@ class Connection extends Component
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $isSearch
|
||||
* @return PDO
|
||||
* @throws Exception
|
||||
*/
|
||||
public function getConnect($isSearch): PDO
|
||||
{
|
||||
return !$isSearch ? $this->getPdo() : $this->getSlaveClient();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function connectPoolInstance()
|
||||
{
|
||||
$connections = $this->connections();
|
||||
$pool = Config::get('databases.pool.max', 10);
|
||||
if (!empty($this->slaveConfig) && $this->cds != $this->slaveConfig['cds']) {
|
||||
$connections->initConnections('Mysql:' . $this->slaveConfig['cds'], $pool);
|
||||
$this->connection->initConnections('Mysql:' . $this->slaveConfig['cds'], $pool);
|
||||
} else {
|
||||
$connections->initConnections('Mysql:' . $this->cds, $pool);
|
||||
$this->connection->initConnections('Mysql:' . $this->cds, $pool);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,7 +149,7 @@ class Connection extends Component
|
||||
*/
|
||||
public function getMasterClient(): PDO
|
||||
{
|
||||
return $this->connections()->get([
|
||||
return $this->connection->get([
|
||||
'cds' => $this->cds,
|
||||
'username' => $this->username,
|
||||
'password' => $this->password,
|
||||
@@ -170,20 +170,9 @@ class Connection extends Component
|
||||
if (empty($this->slaveConfig) || $this->slaveConfig['cds'] == $this->cds) {
|
||||
return $this->getPdo();
|
||||
}
|
||||
return $this->connections()->get($this->slaveConfig);
|
||||
return $this->connection->get($this->slaveConfig);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return \Kiri\Pool\Connection
|
||||
* @throws Exception
|
||||
*/
|
||||
private function connections(): \Kiri\Pool\Connection
|
||||
{
|
||||
return Kiri::getDi()->get(\Kiri\Pool\Connection::class);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return $this
|
||||
* @throws Exception
|
||||
@@ -264,7 +253,7 @@ class Connection extends Component
|
||||
*/
|
||||
public function release(PDO $pdo)
|
||||
{
|
||||
$connections = $this->connections();
|
||||
$connections = $this->connection;
|
||||
if (!$pdo->inTransaction()) {
|
||||
$cds = $this->cds;
|
||||
if (isset($this->slaveConfig['cds'])) {
|
||||
@@ -282,15 +271,11 @@ class Connection extends Component
|
||||
*/
|
||||
public function clear_connection()
|
||||
{
|
||||
$connections = $this->connections();
|
||||
|
||||
$connections->connection_clear($this->cds);
|
||||
|
||||
if (!isset($this->slaveConfig['cds'])) {
|
||||
$this->slaveConfig['cds'] = $this->cds;
|
||||
$this->connection->connection_clear($this->cds);
|
||||
if (!isset($this->slaveConfig['cds']) || $this->cds == $this->slaveConfig['cds']) {
|
||||
return;
|
||||
}
|
||||
|
||||
$connections->connection_clear($this->slaveConfig['cds']);
|
||||
$this->connection->connection_clear($this->slaveConfig['cds']);
|
||||
}
|
||||
|
||||
|
||||
@@ -299,14 +284,11 @@ class Connection extends Component
|
||||
*/
|
||||
public function disconnect()
|
||||
{
|
||||
$connections = $this->connections();
|
||||
$connections->disconnect($this->cds);
|
||||
|
||||
if (!isset($this->slaveConfig['cds'])) {
|
||||
$this->slaveConfig['cds'] = $this->cds;
|
||||
$this->connection->disconnect($this->cds);
|
||||
if (!isset($this->slaveConfig['cds']) || $this->cds == $this->slaveConfig['cds']) {
|
||||
return;
|
||||
}
|
||||
|
||||
$connections->disconnect($this->slaveConfig['cds']);
|
||||
$this->connection->disconnect($this->slaveConfig['cds']);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
+43
-4
@@ -9,7 +9,12 @@ use Kiri;
|
||||
use Kiri\Abstracts\Config;
|
||||
use Kiri\Abstracts\Providers;
|
||||
use Kiri\Application;
|
||||
use Kiri\Pool\Connection as PoolConnection;
|
||||
use Kiri\Exception\ConfigException;
|
||||
use Kiri\Events\EventProvider;
|
||||
use Kiri\Annotation\Inject;
|
||||
use Kiri\Server\Events\OnWorkerStart;
|
||||
use Kiri\Server\Events\OnTaskerStart;
|
||||
|
||||
/**
|
||||
* Class DatabasesProviders
|
||||
@@ -19,21 +24,29 @@ class DatabasesProviders extends Providers
|
||||
{
|
||||
|
||||
|
||||
/**
|
||||
* @var EventProvider
|
||||
*/
|
||||
#[Inject(EventProvider::class)]
|
||||
public EventProvider $provider;
|
||||
|
||||
|
||||
/**
|
||||
* @param Application $application
|
||||
* @return void
|
||||
* @throws ConfigException
|
||||
* @throws Exception
|
||||
*/
|
||||
public function onImport(Application $application)
|
||||
public function onImport(Application $application): void
|
||||
{
|
||||
$databases = Config::get('databases.connections', []);
|
||||
if (empty($databases)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$app = Kiri::app();
|
||||
$this->provider->on(OnWorkerStart::class, [$this, 'check']);
|
||||
$this->provider->on(OnTaskerStart::class, [$this, 'check']);
|
||||
foreach ($databases as $key => $database) {
|
||||
$app->set($key, $this->_settings($database));
|
||||
$application->set($key, $this->_settings($database));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +61,32 @@ class DatabasesProviders extends Providers
|
||||
return Kiri::app()->get($name);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param OnTaskerStart|OnWorkerStart $start
|
||||
* @return void
|
||||
*/
|
||||
public function check(OnTaskerStart|OnWorkerStart $start): void
|
||||
{
|
||||
$start->server->tick(60000, function () {
|
||||
$databases = Config::get('databases.connections', []);
|
||||
if (empty($databases)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$connection = Kiri::getDi()->get(PoolConnection::class);
|
||||
foreach ($databases as $database) {
|
||||
$connection->check($database['cds']);
|
||||
if (isset($database['slaveConfig']) && isset($database['slaveConfig']['cds'])) {
|
||||
if ($database['slaveConfig']['cds'] != $database['cds']) {
|
||||
$connection->check($database['cds']);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param $database
|
||||
* @return array
|
||||
|
||||
+24
-37
@@ -28,8 +28,6 @@ class PDO implements StopHeartbeatCheck
|
||||
|
||||
private int $_timer = -1;
|
||||
|
||||
private int $_last = 0;
|
||||
|
||||
public string $dbname;
|
||||
public string $cds;
|
||||
public string $username;
|
||||
@@ -60,10 +58,10 @@ class PDO implements StopHeartbeatCheck
|
||||
|
||||
/**
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function init(): void
|
||||
{
|
||||
$this->heartbeat_check();
|
||||
$eventProvider = Kiri::getDi()->get(EventProvider::class);
|
||||
$eventProvider->on(OnWorkerExit::class, [$this, 'onWorkerExit']);
|
||||
}
|
||||
@@ -82,50 +80,18 @@ class PDO implements StopHeartbeatCheck
|
||||
* @param Kiri\Server\Events\OnWorkerExit $exit
|
||||
* @return void
|
||||
*/
|
||||
public function onWorkerExit(OnWorkerExit $exit)
|
||||
public function onWorkerExit(OnWorkerExit $exit): void
|
||||
{
|
||||
$this->stopHeartbeatCheck();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function heartbeat_check(): void
|
||||
{
|
||||
if ($this->_timer === -1) {
|
||||
$this->_timer = Timer::tick(1000, [$this, 'waite']);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
private function waite(): void
|
||||
{
|
||||
try {
|
||||
$tick = (int)Config::get('databases.pool.tick', 60);
|
||||
if ($this->_timer == -1 || time() - $this->_last > $tick) {
|
||||
$this->stopHeartbeatCheck();
|
||||
|
||||
$this->pdo = null;
|
||||
}
|
||||
} catch (\Throwable $throwable) {
|
||||
error($throwable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function stopHeartbeatCheck(): void
|
||||
{
|
||||
if ($this->_timer > -1) {
|
||||
Timer::clear($this->_timer);
|
||||
}
|
||||
$this->_timer = -1;
|
||||
$this->pdo = null;
|
||||
}
|
||||
|
||||
|
||||
@@ -254,6 +220,27 @@ class PDO implements StopHeartbeatCheck
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function check(): bool
|
||||
{
|
||||
try {
|
||||
if (!($this->pdo instanceof \PDO)) {
|
||||
return $result = false;
|
||||
}
|
||||
$this->pdo->getAttribute(\PDO::ATTR_SERVER_INFO);
|
||||
|
||||
$result = true;
|
||||
} catch (\Throwable $throwable) {
|
||||
$this->pdo = null;
|
||||
$result = false;
|
||||
} finally {
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param PDOStatement $statement
|
||||
* @param array $params
|
||||
|
||||
Reference in New Issue
Block a user