This commit is contained in:
2021-08-03 18:18:09 +08:00
parent 324db2fa3f
commit 5b28c52a10
43 changed files with 755 additions and 653 deletions
+1
View File
@@ -233,6 +233,7 @@ class ActiveRecord extends BaseActiveRecord
* @return bool
* @throws NotFindClassException
* @throws ReflectionException
* @throws Exception
*/
public static function updateAll(mixed $condition, array $attributes = []): bool
{
+10
View File
@@ -0,0 +1,10 @@
<?php
namespace Database\Affair;
class BeginTransaction
{
}
+10
View File
@@ -0,0 +1,10 @@
<?php
namespace Database\Affair;
use Exception;
class Commit
{
}
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Database\Affair;
class Rollback
{
}
+17 -2
View File
@@ -15,6 +15,8 @@ use Annotation\Inject;
use ArrayAccess;
use Database\ActiveQuery;
use Database\ActiveRecord;
use Database\Affair\BeginTransaction;
use Database\Affair\Rollback;
use Database\Connection;
use Database\HasMany;
use Database\HasOne;
@@ -30,6 +32,7 @@ use Snowflake\Abstracts\Component;
use Snowflake\Abstracts\Config;
use Snowflake\Abstracts\TraitApplication;
use Snowflake\Application;
use Snowflake\Events\EventDispatch;
use Snowflake\Exception\ConfigException;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Snowflake;
@@ -90,7 +93,7 @@ abstract class BaseActiveRecord extends Component implements IOrm, ArrayAccess
protected array $actions = [];
/** @var string */
/** @var string */
private static string $connection = 'db';
@@ -115,6 +118,17 @@ abstract class BaseActiveRecord extends Component implements IOrm, ArrayAccess
return Snowflake::app();
}
/**
* @param EventDispatch $eventDispatch
* @param array $config
* @throws Exception
*/
public function __construct(public EventDispatch $eventDispatch, array $config = [])
{
parent::__construct($config);
}
/**
* @param $data
* @return ActiveRecord
@@ -324,6 +338,7 @@ abstract class BaseActiveRecord extends Component implements IOrm, ArrayAccess
return $first[$field];
}
/**
* @return ActiveQuery
*/
@@ -332,6 +347,7 @@ abstract class BaseActiveRecord extends Component implements IOrm, ArrayAccess
return static::query();
}
/**
* @return ActiveQuery
*/
@@ -523,7 +539,6 @@ abstract class BaseActiveRecord extends Component implements IOrm, ArrayAccess
return false;
}
if ($this->beforeSave($this)) {
static::getDb()->enablingTransactions();
[$change, $condition, $fields] = $this->filtration_and_separation();
if (!$this->isNewExample) {
return $this->updateInternal($fields, $condition, $change);
+4 -2
View File
@@ -133,9 +133,11 @@ class Command extends Component
$this->warning('Mysql:' . Json::encode([$this->sql, $this->params]) . (microtime(true) - $time));
}
$this->prepare?->closeCursor();
return $result;
} catch (\Throwable $exception) {
return $this->addError($this->sql . '. error: ' . $exception->getMessage(), 'mysql');
$result = $this->addError($this->sql . '. error: ' . $exception->getMessage(), 'mysql');
} finally {
$this->db->release();
return $result;
}
}
+39 -28
View File
@@ -11,14 +11,21 @@ declare(strict_types=1);
namespace Database;
use Annotation\Inject;
use Database\Affair\BeginTransaction;
use Database\Affair\Commit;
use Database\Affair\Rollback;
use Database\Mysql\Schema;
use Exception;
use JetBrains\PhpStorm\Pure;
use PDO;
use ReflectionException;
use Server\Events\OnWorkerExit;
use Server\Events\OnWorkerStop;
use Snowflake\Abstracts\Component;
use Snowflake\Abstracts\Config;
use Snowflake\Event;
use Snowflake\Events\EventProvider;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Snowflake;
@@ -28,8 +35,6 @@ use Snowflake\Snowflake;
*/
class Connection extends Component
{
const TRANSACTION_COMMIT = 'transaction::commit';
const TRANSACTION_ROLLBACK = 'transaction::rollback';
public string $id = 'db';
public string $cds = '';
@@ -47,45 +52,46 @@ class Connection extends Component
* enable database cache
*/
public bool $enableCache = false;
/**
* @var string
*/
public string $cacheDriver = 'redis';
/**
* @var array
*
* @example [
* 'cds' => 'mysql:dbname=dbname;host=127.0.0.1',
* 'username' => 'root',
* 'password' => 'root'
* ]
*/
public array $slaveConfig = [];
private ?Schema $_schema = null;
/**
* @var Schema
*/
#[Inject(Schema::class)]
public Schema $_schema;
/**
* @throws Exception
* @var EventProvider
*/
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
/**
* execute by __construct
*/
public function init()
{
Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'disconnect']);
Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'clear_connection']);
$this->eventProvider->on(OnWorkerStop::class, [$this, 'clear_connection'], 0);
$this->eventProvider->on(OnWorkerExit::class, [$this, 'clear_connection'], 0);
$this->eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0);
$this->eventProvider->on(Rollback::class, [$this, 'rollback'], 0);
$this->eventProvider->on(Commit::class, [$this, 'commit'], 0);
}
/**
* @throws Exception
*/
public function enablingTransactions()
{
if (!Db::transactionsActive()) {
return;
}
$this->beginTransaction();
Event::on(Connection::TRANSACTION_ROLLBACK, [$this, 'rollback'], [], true);
Event::on(Connection::TRANSACTION_COMMIT, [$this, 'commit'], [], true);
}
/**
* @param null $sql
* @return PDO
@@ -233,6 +239,7 @@ class Connection extends Component
public function rollback()
{
$this->connections()->rollback($this->cds);
$this->release();
}
/**
@@ -242,6 +249,7 @@ class Connection extends Component
public function commit()
{
$this->connections()->commit($this->cds);
$this->release();
}
/**
@@ -279,8 +287,11 @@ class Connection extends Component
*/
public function release()
{
if (!Snowflake::isWorker() && !Snowflake::isProcess()) {
$this->clear_connection();
return;
}
$connections = $this->connections();
$connections->release($this->cds, true);
$connections->release($this->slaveConfig['cds'], false);
}
@@ -306,8 +317,8 @@ class Connection extends Component
{
$connections = $this->connections();
$connections->release($this->cds, true);
$connections->release($this->slaveConfig['cds'], false);
$connections->disconnect($this->cds, true);
$connections->disconnect($this->slaveConfig['cds'], false);
}
+84 -81
View File
@@ -4,19 +4,15 @@ declare(strict_types=1);
namespace Database;
use Annotation\IAnnotation;
use Annotation\Inject;
use Exception;
use ReflectionException;
use Server\Constant;
use Server\Events\OnWorkerStart;
use Snowflake\Abstracts\Config;
use Snowflake\Abstracts\Providers;
use Snowflake\Application;
use Snowflake\Event;
use Snowflake\Exception\ComponentException;
use Snowflake\Events\EventProvider;
use Snowflake\Exception\ConfigException;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Exception\NotFindPropertyException;
use Snowflake\Snowflake;
use Snowflake\Abstracts\Config;
/**
* Class DatabasesProviders
@@ -25,89 +21,96 @@ use Snowflake\Abstracts\Config;
class DatabasesProviders extends Providers
{
private array $_pooLength = ['min' => 0, 'max' => 1];
private array $_pooLength = ['min' => 0, 'max' => 1];
/**
* @param Application $application
* @throws Exception
*/
public function onImport(Application $application)
{
$application->set('db', $this);
$this->_pooLength = Config::get('databases.pool', ['min' => 0, 'max' => 1]);
Event::on(Event::SERVER_TASK_START, [$this, 'createPool']);
}
/**
* @var EventProvider
*/
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
/**
* @param $name
* @return Connection
* @throws ConfigException
* @throws Exception
*/
public function get($name): Connection
{
$application = Snowflake::app();
if (!$application->has('databases.' . $name)) {
$application->set('databases.' . $name, $this->_settings($this->getConfig($name)));
}
return $application->get('databases.' . $name);
}
/**
* @param Application $application
* @throws Exception
*/
public function onImport(Application $application)
{
$application->set('db', $this);
$this->_pooLength = Config::get('databases.pool', ['min' => 0, 'max' => 1]);
$this->eventProvider->on(OnWorkerStart::class, [$this, 'createPool']);
}
/**
* @throws ConfigException
* @throws Exception
*/
public function createPool()
{
$databases = Config::get('databases.connections', []);
if (empty($databases)) {
return;
}
$application = Snowflake::app();
foreach ($databases as $name => $database) {
/** @var Connection $connection */
$application->set('databases.' . $name, $this->_settings($database));
$application->get('databases.' . $name)->fill();
}
}
/**
* @param $name
* @return Connection
* @throws ConfigException
* @throws Exception
*/
public function get($name): Connection
{
$application = Snowflake::app();
if (!$application->has('databases.' . $name)) {
$application->set('databases.' . $name, $this->_settings($this->getConfig($name)));
}
return $application->get('databases.' . $name);
}
/**
* @param $database
* @return array
*/
private function _settings($database): array
{
return [
'class' => Connection::class,
'id' => $database['id'],
'cds' => $database['cds'],
'username' => $database['username'],
'password' => $database['password'],
'tablePrefix' => $database['tablePrefix'],
'database' => $database['database'],
'maxNumber' => $this->_pooLength['max'],
'minNumber' => $this->_pooLength['min'],
'charset' => $database['charset'] ?? 'utf8mb4',
'slaveConfig' => $database['slaveConfig']
];
}
/**
* @throws ConfigException
* @throws Exception
*/
public function createPool(OnWorkerStart $onWorkerStart)
{
$databases = Config::get('databases.connections', []);
if (empty($databases)) {
return;
}
$application = Snowflake::app();
foreach ($databases as $name => $database) {
/** @var Connection $connection */
$application->set('databases.' . $name, $this->_settings($database));
$application->get('databases.' . $name)->fill();
}
}
/**
* @param $name
* @return mixed
* @throws ConfigException
*/
public function getConfig($name): mixed
{
return Config::get('databases.connections.' . $name, null, true);
}
/**
* @param $database
* @return array
*/
private function _settings($database): array
{
return [
'class' => Connection::class,
'id' => $database['id'],
'cds' => $database['cds'],
'username' => $database['username'],
'password' => $database['password'],
'tablePrefix' => $database['tablePrefix'],
'database' => $database['database'],
'maxNumber' => $this->_pooLength['max'],
'minNumber' => $this->_pooLength['min'],
'charset' => $database['charset'] ?? 'utf8mb4',
'slaveConfig' => $database['slaveConfig']
];
}
/**
* @param $name
* @return mixed
* @throws ConfigException
*/
public function getConfig($name): mixed
{
return Config::get('databases.connections.' . $name, null, true);
}
}
+12 -9
View File
@@ -9,10 +9,13 @@ declare(strict_types=1);
namespace Database;
use Database\Affair\BeginTransaction;
use Database\Affair\Commit;
use Database\Affair\Rollback;
use Database\Traits\QueryTrait;
use Exception;
use Snowflake\Abstracts\Config;
use Snowflake\Event;
use Snowflake\Events\EventDispatch;
use Snowflake\Exception\ConfigException;
/**
@@ -39,19 +42,21 @@ class Db implements ISqlBuilder
*/
public static function beginTransaction()
{
if (!static::transactionsActive()) {
di(EventDispatch::class)->dispatch(new BeginTransaction());
}
static::$_inTransaction = true;
}
/**
* @throws Exception
*/
public static function commit()
{
if (!static::transactionsActive()) {
return;
if (static::transactionsActive()) {
di(EventDispatch::class)->dispatch(new Commit());
}
Event::trigger(Connection::TRANSACTION_COMMIT);
Event::offName(Connection::TRANSACTION_COMMIT);
static::$_inTransaction = false;
}
@@ -61,11 +66,9 @@ class Db implements ISqlBuilder
*/
public static function rollback()
{
if (!static::transactionsActive()) {
return;
if (static::transactionsActive()) {
di(EventDispatch::class)->dispatch(new Rollback());
}
Event::trigger(Connection::TRANSACTION_ROLLBACK);
Event::offName(Connection::TRANSACTION_ROLLBACK);
static::$_inTransaction = false;
}
+12 -32
View File
@@ -11,7 +11,6 @@ use Snowflake\Channel;
use Snowflake\Core\Json;
use Snowflake\Core\Xml;
use Snowflake\Event;
use Snowflake\Snowflake;
use Swoole\Coroutine\Http2\Client as H2Client;
use Swoole\Http2\Request;
use Swoole\Http2\Response;
@@ -28,16 +27,11 @@ class Http2 extends Component
private array $_clients = [];
private Channel $channel;
/**
* @throws Exception
*/
public function init()
{
$this->channel = Snowflake::getApp('channel');
Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'releases']);
Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'clean']);
}
@@ -48,11 +42,6 @@ class Http2 extends Component
*/
public function releases()
{
foreach ($this->_clients as $name => $client) {
/** @var H2Client $client */
$client->close();
$this->channel->push($client, 'http2.' . $name);
}
$this->_clients = [];
}
@@ -111,7 +100,7 @@ class Http2 extends Component
* @return Result
* @throws Exception
*/
public function get($domain, $path, $params = [], $timeout = -1): Result
public function get($domain, $path, array $params = [], int $timeout = -1): Result
{
$request = $this->dispatch($domain, $path, 'GET', $params, $timeout);
@@ -127,7 +116,7 @@ class Http2 extends Component
* @return Result
* @throws Exception
*/
public function post($domain, $path, $params = [], $timeout = -1): Result
public function post($domain, $path, array $params = [], int $timeout = -1): Result
{
$request = $this->dispatch($domain, $path, 'POST', $params, $timeout);
@@ -143,7 +132,7 @@ class Http2 extends Component
* @return Result
* @throws Exception
*/
public function upload($domain, $path, $params = [], $timeout = -1): Result
public function upload($domain, $path, array $params = [], int $timeout = -1): Result
{
$request = $this->dispatch($domain, $path, 'POST', $params, $timeout, true);
@@ -159,7 +148,7 @@ class Http2 extends Component
* @return Result
* @throws Exception
*/
public function delete($domain, $path, $params = [], $timeout = -1): Result
public function delete($domain, $path, array $params = [], int $timeout = -1): Result
{
$request = $this->dispatch($domain, $path, 'DELETE', $params, $timeout);
@@ -177,7 +166,7 @@ class Http2 extends Component
* @return mixed
* @throws Exception
*/
private function dispatch($domain, $path, $method, $params = [], $timeout = -1, $isUpload = false): mixed
private function dispatch($domain, $path, $method, array $params = [], int $timeout = -1, bool $isUpload = false): mixed
{
[$domain, $isSsl] = $this->clear($domain);
@@ -185,7 +174,6 @@ class Http2 extends Component
$request->headers = array_merge($request->headers, [
'Host' => $domain
]);
defer(fn() => $this->channel->push($request, 'request.' . $method . $path));
return $this->doRequest($request, $domain, $isSsl, $timeout);
}
@@ -215,7 +203,6 @@ class Http2 extends Component
private function doRequest(Request $request, $domain, $ssl, $timeout): mixed
{
$client = $this->getClient($domain, $ssl, $timeout);
defer(fn() => $this->channel->push($client, 'http2.' . $domain));
$client->send($request);
if (Context::getContext('http2isRecv') === false) {
return null;
@@ -275,18 +262,15 @@ class Http2 extends Component
* @return Request
* @throws Exception
*/
public function getRequest($path, $method, $params, $isUpload = false): Request
public function getRequest($path, $method, $params, bool $isUpload = false): Request
{
if (!str_starts_with($path, '/')) {
$path = '/' . $path;
}
$channel = Snowflake::app()->getChannel();
$request = $channel->pop('request.' . $method . $path, function () use ($path, $method) {
$request = new Request();
$request->method = $method;
$request->path = $path;
return $request;
});
$request = new Request();
$request->method = $method;
$request->path = $path;
if ($method === 'GET') {
$request->path .= '?' . http_build_query($params);
} else {
@@ -304,16 +288,12 @@ class Http2 extends Component
* @return H2Client
* @throws Exception
*/
private function getClient($domain, $isSsl = false, $timeout = -1): H2Client
private function getClient($domain, bool $isSsl = false, int $timeout = -1): H2Client
{
if (isset($this->_clients[$domain])) {
return $this->_clients[$domain];
}
$pool = Snowflake::app()->getChannel();
/** @var H2Client $client */
$client = $pool->pop('http2.' . $domain, function () use ($domain, $isSsl, $timeout) {
return $this->newRequest($domain, $isSsl, $timeout);
});
$client = $this->newRequest($domain, $isSsl, $timeout);
if ((!$client->connected || !$client->ping()) && !$client->connect()) {
throw new Exception($client->errMsg, $client->errCode);
}
+30 -22
View File
@@ -19,25 +19,27 @@ class Context extends BaseContext
/**
* @param $id
* @param $context
* @param null $coroutineId
* @return mixed
*/
public static function setContext($id, $context): mixed
public static function setContext($id, $context, $coroutineId = null): mixed
{
if (Coroutine::getCid() === -1) {
return static::$_contents[$id] = $context;
}
return Coroutine::getContext()[$id] = $context;
return Coroutine::getContext($coroutineId)[$id] = $context;
}
/**
* @param $id
* @param int $value
* @param null $coroutineId
* @return bool|int
*/
public static function increment($id, int $value = 1): bool|int
public static function increment($id, int $value = 1, $coroutineId = null): bool|int
{
if (!isset(Coroutine::getContext()[$id])) {
return Coroutine::getContext()[$id] += $value;
if (!isset(Coroutine::getContext($coroutineId)[$id])) {
return Coroutine::getContext($coroutineId)[$id] += $value;
}
return false;
}
@@ -45,15 +47,16 @@ class Context extends BaseContext
/**
* @param $id
* @param int $value
* @param null $coroutineId
* @return bool|int
*/
public static function decrement($id, int $value = 1): bool|int
public static function decrement($id, int $value = 1, $coroutineId = null): bool|int
{
if (!static::hasContext($id)) {
return false;
}
if (isset(Coroutine::getContext()[$id])) {
return Coroutine::getContext()[$id] -= $value;
if (isset(Coroutine::getContext($coroutineId)[$id])) {
return Coroutine::getContext($coroutineId)[$id] -= $value;
}
return false;
}
@@ -61,25 +64,27 @@ class Context extends BaseContext
/**
* @param $id
* @param null $default
* @param null $coroutineId
* @return mixed
*/
public static function getContext($id, $default = null): mixed
public static function getContext($id, $default = null, $coroutineId = null): mixed
{
if (Coroutine::getCid() === -1) {
return static::loadByStatic($id, $default);
}
return static::loadByContext($id, $default);
return static::loadByContext($id, $default, $coroutineId);
}
/**
* @param $id
* @param null $default
* @param null $coroutineId
* @return mixed
*/
private static function loadByContext($id, $default = null): mixed
private static function loadByContext($id, $default = null, $coroutineId = null): mixed
{
$data = Coroutine::getContext()[$id] ?? null;
$data = Coroutine::getContext($coroutineId)[$id] ?? null;
if ($data === null) {
return $default;
}
@@ -103,12 +108,13 @@ class Context extends BaseContext
/**
* @param null $coroutineId
* @return mixed
*/
public static function getAllContext(): mixed
public static function getAllContext($coroutineId = null): mixed
{
if (Coroutine::getCid() === -1) {
return Coroutine::getContext() ?? [];
return Coroutine::getContext($coroutineId) ?? [];
} else {
return static::$_contents ?? [];
}
@@ -116,16 +122,17 @@ class Context extends BaseContext
/**
* @param string $id
* @param null $coroutineId
*/
public static function remove(string $id)
public static function remove(string $id, $coroutineId = null)
{
if (!static::hasContext($id)) {
if (!static::hasContext($id, $coroutineId)) {
return;
}
if (Coroutine::getCid() === -1) {
unset(static::$_contents[$id]);
} else {
unset(Coroutine::getContext()[$id]);
unset(Coroutine::getContext($coroutineId)[$id]);
}
}
@@ -134,12 +141,12 @@ class Context extends BaseContext
* @param null $key
* @return bool
*/
public static function hasContext($id, $key = null): bool
public static function hasContext($id, $key = null, $coroutineId = null): bool
{
if (Coroutine::getCid() === -1) {
return static::searchByStatic($id, $key);
}
return static::searchByCoroutine($id, $key);
return static::searchByCoroutine($id, $key, $coroutineId);
}
@@ -163,15 +170,16 @@ class Context extends BaseContext
/**
* @param $id
* @param null $key
* @param null $coroutineId
* @return bool
*/
private static function searchByCoroutine($id, $key = null): bool
private static function searchByCoroutine($id, $key = null, $coroutineId = null): bool
{
if (!isset(Coroutine::getContext()[$id])) {
if (!isset(Coroutine::getContext($coroutineId)[$id])) {
return false;
}
if ($key !== null) {
return isset((Coroutine::getContext()[$id] ?? [])[$key]);
return isset((Coroutine::getContext($coroutineId)[$id] ?? [])[$key]);
}
return true;
}
+111 -136
View File
@@ -7,12 +7,9 @@ use Exception;
use RdKafka\Conf;
use RdKafka\ProducerTopic;
use RdKafka\TopicConf;
use ReflectionException;
use Snowflake\Abstracts\Component;
use Snowflake\Abstracts\Config;
use Snowflake\Event;
use Snowflake\Exception\ConfigException;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Snowflake;
/**
@@ -30,77 +27,74 @@ class Producer extends Component
{
private Conf $conf;
private TopicConf $topicConf;
private Conf $conf;
private TopicConf $topicConf;
private ?\RdKafka\Producer $producer = null;
private ?\RdKafka\Producer $producer = null;
private bool $isAck = true;
private bool $isAck = true;
/**
* Producer constructor.
* @param array $config
* @throws Exception
*/
public function __construct(array $config = [])
{
parent::__construct($config);
if (!class_exists(Conf::class)) {
return;
}
$this->conf = new Conf();
$this->topicConf = new TopicConf();
$this->conf->setErrorCb(function ($kafka, $err, $reason) {
$this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
}
/**
* Producer constructor.
* @param array $config
* @throws Exception
*/
public function __construct(array $config = [])
{
parent::__construct($config);
if (!class_exists(Conf::class)) {
return;
}
$this->conf = new Conf();
$this->topicConf = new TopicConf();
$this->conf->setErrorCb(function ($kafka, $err, $reason) {
$this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
}
/**
* @param string $servers
* @return Producer
*/
public function setBrokers(string $servers): static
{
$this->conf->set('metadata.broker.list', $servers);
return $this;
}
public function setBrokers(string $servers): static
{
$this->conf->set('metadata.broker.list', $servers);
return $this;
}
/**
* @param string $groupId
* @return Producer
*/
public function setGroupId(string $groupId): static
{
$this->conf->set('group.id', $groupId);
return $this;
}
/**
* @param string $groupId
* @return Producer
*/
public function setGroupId(string $groupId): static
{
$this->conf->set('group.id', $groupId);
return $this;
}
/**
* @param string $topic
* @param array $params
* @param string|null $groupId
* @throws Exception
*/
public function dispatch(string $topic, array $params = [], string $groupId = null)
{
$this->beforePushMessage($topic, $groupId);
$this->sendMessage($topic, [$params]);
}
/**
* @param string $topic
* @param array $params
* @param string|null $groupId
* @throws Exception
*/
public function dispatch(string $topic, array $params = [], string $groupId = null)
{
$this->beforePushMessage($topic, $groupId);
$this->sendMessage($topic, [$params]);
}
/**
* @return \RdKafka\Producer
* @throws Exception
*/
private function getProducer(): \RdKafka\Producer
{
$pool = Snowflake::app()->getChannel();
return $pool->pop(\RdKafka\Producer::class, function () {
return Snowflake::createObject(\RdKafka\Producer::class, [$this->conf]);
});
}
/**
* @return \RdKafka\Producer
* @throws Exception
*/
private function getProducer(): \RdKafka\Producer
{
return Snowflake::createObject(\RdKafka\Producer::class, [$this->conf]);
}
/**
@@ -109,13 +103,10 @@ class Producer extends Component
* @return ProducerTopic
* @throws Exception
*/
private function getProducerTopic(\RdKafka\Producer $producer, $topic): ProducerTopic
{
$pool = Snowflake::app()->getChannel();
return $pool->pop($topic . '::' . ProducerTopic::class, function () use ($producer, $topic) {
return $producer->newTopic($topic, $this->topicConf);
});
}
private function getProducerTopic(\RdKafka\Producer $producer, $topic): ProducerTopic
{
return $producer->newTopic($topic, $this->topicConf);
}
/**
@@ -126,12 +117,12 @@ class Producer extends Component
* @throws ConfigException
* @throws Exception
*/
public function batch(string $toPic, ?string $key, array $data, ?string $groupId = null)
{
$this->beforePushMessage($toPic, $groupId);
public function batch(string $toPic, ?string $key, array $data, ?string $groupId = null)
{
$this->beforePushMessage($toPic, $groupId);
$this->sendMessage($toPic, $data, $key);
}
$this->sendMessage($toPic, $data, $key);
}
/**
@@ -140,23 +131,23 @@ class Producer extends Component
* @throws ConfigException
* @throws Exception
*/
private function beforePushMessage($topic, $groupId): void
{
$consumers = Config::get('kafka.producers.' . $topic);
if (empty($consumers) || !is_array($consumers)) {
throw new Exception('You need set kafka.producers config');
}
if (!isset($consumers['brokers'])) {
throw new Exception('You need set brokers config.');
}
if (!empty($groupId)) {
$consumers['groupId'] = $groupId;
} else if (!isset($consumers['groupId'])) {
$consumers['groupId'] = $topic . ':' . Snowflake::localhost();
}
$this->setGroupId($consumers['groupId']);
$this->setBrokers($consumers['brokers']);
}
private function beforePushMessage($topic, $groupId): void
{
$consumers = Config::get('kafka.producers.' . $topic);
if (empty($consumers) || !is_array($consumers)) {
throw new Exception('You need set kafka.producers config');
}
if (!isset($consumers['brokers'])) {
throw new Exception('You need set brokers config.');
}
if (!empty($groupId)) {
$consumers['groupId'] = $groupId;
} else if (!isset($consumers['groupId'])) {
$consumers['groupId'] = $topic . ':' . Snowflake::localhost();
}
$this->setGroupId($consumers['groupId']);
$this->setBrokers($consumers['brokers']);
}
/**
@@ -165,58 +156,42 @@ class Producer extends Component
* @param string $key
* @throws Exception
*/
private function sendMessage(string $topic, array $message, string $key = '')
{
$producer = $this->getProducer();
$producerTopic = $this->getProducerTopic($producer, $topic);
if ($this->isAck) {
$this->flush($producer);
}
foreach ($message as $value) {
$producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key);
$producer->poll(0);
}
$this->flush($producer);
$this->recover($topic, $producer, $producerTopic);
}
/**
* @param bool $ack
*/
public function setAsk(bool $ack){
$this->isAck = $ack;
$this->topicConf->set('request.required.acks', $this->isAck ? '1' : '0');
}
private function sendMessage(string $topic, array $message, string $key = '')
{
$producer = $this->getProducer();
$producerTopic = $this->getProducerTopic($producer, $topic);
if ($this->isAck) {
$this->flush($producer);
}
foreach ($message as $value) {
$producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key);
$producer->poll(0);
}
$this->flush($producer);
}
/**
* @param string $topic
* @param \RdKafka\Producer $producer
* @param ProducerTopic $producerTopic
* @throws Exception
* @param bool $ack
*/
private function recover(string $topic, \RdKafka\Producer $producer, ProducerTopic $producerTopic)
{
$channel = Snowflake::app()->getChannel();
$channel->push($producerTopic, $topic . '::' . ProducerTopic::class);
$channel->push($producer, \RdKafka\Producer::class);
}
public function setAsk(bool $ack)
{
$this->isAck = $ack;
$this->topicConf->set('request.required.acks', $this->isAck ? '1' : '0');
}
/**
* @param \RdKafka\Producer $producer
*/
public function flush(\RdKafka\Producer $producer)
{
while ($producer->getOutQLen() > 0) {
$result = $producer->flush(100);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
}
/**
* @param \RdKafka\Producer $producer
*/
public function flush(\RdKafka\Producer $producer)
{
while ($producer->getOutQLen() > 0) {
$result = $producer->flush(100);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
}
}
+9 -21
View File
@@ -8,7 +8,6 @@ use Exception;
use Snowflake\Abstracts\Component;
use Snowflake\Channel;
use Snowflake\Core\Json;
use Snowflake\Snowflake;
use Swoole\Coroutine\Client as CClient;
@@ -69,10 +68,6 @@ class Client extends Component
if ($port < 0) {
return $this;
}
/** @var Channel $channel */
$channel = Snowflake::app()->get('channel');
$channel->push($this->client, $host . $port . CClient::class);
$this->client = null;
return $this;
}
@@ -114,26 +109,19 @@ class Client extends Component
*/
public function getClient(): CClient
{
/** @var Channel $channel */
$channel = Snowflake::app()->get('channel');
$host = $this->config['host'] ?? '127.0.0.1';
$port = $this->config['port'] ?? 0;
if ($port < 0) {
throw new Exception('Related service not have port(404)');
}
return $channel->pop($host . $port . CClient::class, function () {
$client = new CClient($this->config['mode'] ?? SWOOLE_SOCK_TCP);
$client->set([
'timeout' => 0.5,
'connect_timeout' => 1.0,
'write_timeout' => 10.0,
'read_timeout' => 0.5,
'open_tcp_keepalive' => true,
]);
return $client;
});
$client = new CClient($this->config['mode'] ?? SWOOLE_SOCK_TCP);
$client->set([
'timeout' => 0.5,
'connect_timeout' => 1.0,
'write_timeout' => 10.0,
'read_timeout' => 0.5,
'open_tcp_keepalive' => true,
]);
return $client;
}
+30 -13
View File
@@ -3,14 +3,17 @@
namespace Rpc;
use Annotation\Inject;
use Exception;
use HttpServer\Http\Context;
use HttpServer\Http\Request;
use HttpServer\Route\Router;
use Server\Constant;
use Server\Events\OnAfterRequest;
use Snowflake\Core\Json;
use Snowflake\Event;
use Snowflake\Events\EventDispatch;
use Snowflake\Events\EventProvider;
use Snowflake\Snowflake;
use Swoole\Http\Request;
use Swoole\Server;
use function Swoole\Coroutine\defer;
@@ -28,6 +31,16 @@ class Service extends \Server\Abstracts\Server
private Router $router;
/** @var EventProvider */
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
/** @var EventDispatch */
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* @throws Exception
*/
@@ -45,7 +58,7 @@ class Service extends \Server\Abstracts\Server
*/
public function onConnect(Server $server, int $fd, int $reactorId)
{
defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES));
defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest()));
$this->runEvent(Constant::CONNECT, null, [$server, $fd, $reactorId]);
}
@@ -59,7 +72,7 @@ class Service extends \Server\Abstracts\Server
*/
public function onClose(Server $server, int $fd)
{
defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES));
defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest()));
$this->runEvent(Constant::CLOSE, null, [$server, $fd]);
}
@@ -73,7 +86,7 @@ class Service extends \Server\Abstracts\Server
*/
public function onDisconnect(Server $server, int $fd)
{
defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES));
defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest()));
$this->runEvent(Constant::DISCONNECT, null, [$server, $fd]);
}
@@ -88,11 +101,11 @@ class Service extends \Server\Abstracts\Server
*/
public function onReceive(Server $server, int $fd, int $reID, string $data)
{
defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES));
defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest()));
try {
$client = $server->getClientInfo($fd, $reID);
$request = $this->requestSpl((int)$client['server_port'], $data);
$request = $this->requestSpl((int)$client['server_port'], $data, $fd);
$result = $this->router->find_path($request)?->dispatch();
@@ -111,7 +124,7 @@ class Service extends \Server\Abstracts\Server
*/
public function onPacket(Server $server, string $data, array $client)
{
defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES));
defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest()));
try {
$request = $this->requestSpl((int)$client['server_port'], $data);
@@ -127,10 +140,11 @@ class Service extends \Server\Abstracts\Server
/**
* @param int $server_port
* @param string $data
* @param int $fd
* @return mixed
* @throws Exception
*/
public function requestSpl(int $server_port, string $data): mixed
public function requestSpl(int $server_port, string $data, int $fd = 0): \HttpServer\Http\Request
{
$sRequest = new Request();
@@ -143,11 +157,14 @@ class Service extends \Server\Abstracts\Server
throw new Exception('Protocol format error.');
}
$sRequest->params->setPosts($data);
$sRequest->headers->setRequestUri('rpc/p' . $server_port . '/' . ltrim($cmd, '/'));
$sRequest->headers->setRequestMethod('rpc');
$sRequest->fd = $fd;
$sRequest->post = $data;
$sRequest->header['request_uri'] = 'rpc/p' . $server_port . '/' . ltrim($cmd, '/');
$sRequest->header['request_method'] = 'rpc';
return Context::setContext(Request::class, $sRequest);
Context::setContext(Request::class, $sRequest);
return \request();
}
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Server\Events;
class OnAfterRequest
{
}
+8
View File
@@ -0,0 +1,8 @@
<?php
namespace Server\Events;
class OnBeforeRequest
{
}
+23
View File
@@ -0,0 +1,23 @@
<?php
namespace Server\Events;
use Swoole\Server;
/**
*
*/
class OnWorkerError
{
/**
* @param Server $server
* @param int $workerId
*/
public function __construct(Server $server, int $worker_id, int $worker_pid, int $exit_code, int $signal)
{
}
}
+23
View File
@@ -0,0 +1,23 @@
<?php
namespace Server\Events;
use Swoole\Server;
/**
*
*/
class OnWorkerExit
{
/**
* @param Server $server
* @param int $workerId
*/
public function __construct(Server $server, int $workerId)
{
}
}
+23
View File
@@ -0,0 +1,23 @@
<?php
namespace Server\Events;
use Swoole\Server;
/**
*
*/
class OnWorkerStart
{
/**
* @param Server $server
* @param int $workerId
*/
public function __construct(Server $server, int $workerId)
{
}
}
+23
View File
@@ -0,0 +1,23 @@
<?php
namespace Server\Events;
use Swoole\Server;
/**
*
*/
class OnWorkerStop
{
/**
* @param Server $server
* @param int $workerId
*/
public function __construct(Server $server, int $workerId)
{
}
}
+14 -10
View File
@@ -2,10 +2,14 @@
namespace Server;
use Annotation\Inject;
use Exception;
use HttpServer\Http\Context;
use HttpServer\Http\Request as HSRequest;
use HttpServer\Route\Router;
use Server\Events\OnAfterRequest;
use Snowflake\Event;
use Snowflake\Events\EventDispatch;
use Snowflake\Snowflake;
use Swoole\Error;
use Swoole\Http\Request;
@@ -26,21 +30,19 @@ class HTTPServerListener extends Abstracts\Server
use ListenerHelper;
/** @var Router|mixed */
#[Inject('router')]
private Router $router;
/** @var \HttpServer\Http\Response|mixed */
#[Inject(\HttpServer\Http\Response::class)]
private \HttpServer\Http\Response $response;
/**
* HTTPServerListener constructor.
* @throws Exception
*/
public function __construct()
{
$this->router = Snowflake::getApp('router');
$this->response = di(\HttpServer\Http\Response::class);
parent::__construct();
}
/** @var EventDispatch */
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* UDPServerListener constructor.
@@ -93,6 +95,8 @@ class HTTPServerListener extends Abstracts\Server
$this->router->dispatch(HSRequest::create($request));
} catch (Error | Throwable $exception) {
$this->response->send(jTraceEx($exception), 500);
} finally {
$this->eventDispatch->dispatch(new OnAfterRequest());
}
}
+11 -2
View File
@@ -2,11 +2,15 @@
namespace Server\Manager;
use Annotation\Inject;
use Exception;
use Server\Abstracts\Server;
use Server\Constant;
use Server\Events\OnAfterRequest;
use Server\SInterface\PipeMessage;
use Snowflake\Event;
use Snowflake\Events\EventDispatch;
use Snowflake\Exception\ConfigException;
/**
@@ -17,9 +21,14 @@ class ServerDefaultEvent extends Server
{
/** @var EventDispatch */
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* @param \Swoole\Server $server
* @throws \Snowflake\Exception\ConfigException
* @throws ConfigException
*/
public function onStart(\Swoole\Server $server)
{
@@ -49,7 +58,7 @@ class ServerDefaultEvent extends Server
if (!is_object($message) || !($message instanceof PipeMessage)) {
return;
}
defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES));
defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest()));
$this->runEvent(Constant::PIPE_MESSAGE,
function (\Swoole\Server $server, $src_worker_id, $message) {
call_user_func([$message, 'execute']);
+12 -4
View File
@@ -2,9 +2,11 @@
namespace Server;
use Annotation\Inject;
use Exception;
use ReflectionException;
use Server\Events\OnAfterRequest;
use Snowflake\Event;
use Snowflake\Events\EventDispatch;
use Swoole\Server;
use Swoole\Server\Port;
@@ -21,6 +23,11 @@ class TCPServerListener extends Abstracts\Server
protected static bool|Port $_tcp;
/** @var EventDispatch */
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* UDPServerListener constructor.
* @param Server|Port $server
@@ -51,7 +58,7 @@ class TCPServerListener extends Abstracts\Server
{
$this->runEvent(Constant::CONNECT, null, [$server, $fd]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
@@ -66,7 +73,7 @@ class TCPServerListener extends Abstracts\Server
{
$this->runEvent(Constant::RECEIVE, null, [$server, $fd, $reactor_id, $data]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
@@ -79,7 +86,8 @@ class TCPServerListener extends Abstracts\Server
{
$this->runEvent(Constant::CLOSE, null, [$server, $fd]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
}
+8 -2
View File
@@ -2,9 +2,12 @@
namespace Server;
use Annotation\Inject;
use Exception;
use ReflectionException;
use Server\Events\OnAfterRequest;
use Snowflake\Event;
use Snowflake\Events\EventDispatch;
use Snowflake\Exception\NotFindClassException;
use Swoole\Server;
use Swoole\Server\Port;
@@ -17,7 +20,10 @@ use Swoole\Server\Port;
class UDPServerListener extends Abstracts\Server
{
protected static bool|Port $_udp;
/** @var EventDispatch */
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
use ListenerHelper;
@@ -50,7 +56,7 @@ class UDPServerListener extends Abstracts\Server
{
$this->runEvent(Constant::MESSAGE, null, [$server, $data, $clientInfo]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
}
+13 -6
View File
@@ -2,9 +2,11 @@
namespace Server;
use Annotation\Inject;
use Exception;
use ReflectionException;
use Snowflake\Event;
use Server\Events\OnAfterRequest;
use Snowflake\Events\EventDispatch;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Snowflake;
use Swoole\Http\Request;
@@ -25,6 +27,11 @@ class WebSocketServerListener extends Abstracts\Server
use ListenerHelper;
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* @param mixed $server
* @param array|null $settings
@@ -88,7 +95,7 @@ class WebSocketServerListener extends Abstracts\Server
}
$this->runEvent(Constant::HANDSHAKE, fn() => $this->disconnect($request, $response), [$request, $response]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
@@ -112,7 +119,7 @@ class WebSocketServerListener extends Abstracts\Server
{
$this->runEvent(Constant::CONNECT, fn() => $server->confirm($fd), [$server, $fd]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
@@ -125,7 +132,7 @@ class WebSocketServerListener extends Abstracts\Server
{
$this->runEvent(Constant::MESSAGE, fn() => $server->push($frame->fd, '.'), [$server, $frame]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
@@ -138,7 +145,7 @@ class WebSocketServerListener extends Abstracts\Server
{
$this->runEvent(Constant::CLOSE, fn() => $server->confirm($fd), [$server, $fd]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
@@ -151,7 +158,7 @@ class WebSocketServerListener extends Abstracts\Server
{
$this->runEvent(Constant::DISCONNECT, fn() => $server->confirm($fd), [$server, $fd]);
$this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES);
$this->eventDispatch->dispatch(new OnAfterRequest());
}
}
+18 -12
View File
@@ -3,13 +3,17 @@
namespace Server\Worker;
use Annotation\Annotation;
use Annotation\Inject;
use Exception;
use ReflectionException;
use Server\ApplicationStore;
use Server\Constant;
use Server\Events\OnWorkerError;
use Server\Events\OnWorkerExit;
use Server\Events\OnWorkerStart;
use Server\Events\OnWorkerStop;
use Snowflake\Abstracts\Config;
use Snowflake\Core\Help;
use Snowflake\Event;
use Snowflake\Events\EventDispatch;
use Snowflake\Exception\ConfigException;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Runtime;
@@ -26,6 +30,13 @@ class ServerWorker extends \Server\Abstracts\Server
{
/**
* @var EventDispatch
*/
#[Inject(EventDispatch::class)]
public EventDispatch $eventDispatch;
/**
* @param Server $server
* @param int $workerId
@@ -34,18 +45,15 @@ class ServerWorker extends \Server\Abstracts\Server
public function onWorkerStart(Server $server, int $workerId)
{
$this->_setConfigCache($workerId);
$store = ApplicationStore::getStore()->instance();
$annotation = Snowflake::app()->getAnnotation();
$annotation->read(APP_PATH . 'app');
$this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId));
$this->runEvent(Constant::WORKER_START, null, [$server, $workerId]);
$this->workerInitExecutor($server, $annotation, $workerId);
$this->interpretDirectory($annotation);
$store->close();
}
@@ -128,9 +136,7 @@ class ServerWorker extends \Server\Abstracts\Server
{
$this->runEvent(Constant::WORKER_STOP, null, [$server, $workerId]);
Event::trigger(Event::SERVER_WORKER_STOP);
fire(Event::SYSTEM_RESOURCE_CLEAN);
$this->eventDispatch->dispatch(new OnWorkerStop($server, $workerId));
Timer::clearAll();
}
@@ -147,7 +153,7 @@ class ServerWorker extends \Server\Abstracts\Server
putenv('state=exit');
Event::trigger(Event::SERVER_WORKER_EXIT, [$server, $workerId]);
$this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId));
Snowflake::getApp('logger')->insert();
}
@@ -165,7 +171,7 @@ class ServerWorker extends \Server\Abstracts\Server
{
$this->runEvent(Constant::WORKER_ERROR, null, [$server, $worker_id, $worker_pid, $exit_code, $signal]);
Event::trigger(Event::SERVER_WORKER_ERROR);
$this->eventDispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal));
$message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s',
$worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9)
-12
View File
@@ -29,7 +29,6 @@ use Server\ServerManager;
use Snowflake\Aop;
use Snowflake\Async;
use Snowflake\Cache\Redis;
use Snowflake\Channel;
use Snowflake\Di\Service;
use Snowflake\Error\ErrorHandler;
use Snowflake\Error\Logger;
@@ -423,16 +422,6 @@ abstract class BaseApplication extends Service
}
/**
* @return Channel
* @throws Exception
*/
public function getChannel(): Channel
{
return $this->get('channel');
}
/**
* @return Pool
* @throws Exception
@@ -470,7 +459,6 @@ abstract class BaseApplication extends Service
'goto' => ['class' => BaseGoto::class],
'response' => ['class' => Response::class],
'request' => ['class' => Request::class],
'channel' => ['class' => Channel::class],
'rpc' => ['class' => \Rpc\Producer::class],
'rpc-service' => ['class' => \Rpc\Service::class],
'http2' => ['class' => Http2::class],
-2
View File
@@ -19,7 +19,6 @@ use Kafka\Producer;
use Rpc\Producer as RPCProducer;
use Snowflake\Async;
use Snowflake\Cache\Redis;
use Snowflake\Channel;
use Snowflake\Error\Logger;
use Snowflake\Event;
use Snowflake\Jwt\Jwt;
@@ -50,7 +49,6 @@ use Snowflake\Pool\Pool;
* @property \Snowflake\Crontab\Producer $crontab
* @property HttpFilter $filter
* @property RPCProducer $rpc
* @property Channel $channel
* @property Shutdown $shutdown
* @property Pool $clientsPool
*/
+1 -1
View File
@@ -95,7 +95,7 @@ class Application extends BaseApplication
if (!class_exists($service)) {
throw new NotFindClassException($service);
}
$class = Snowflake::createObject($service);
$class = Snowflake::getDi()->get($service);
if (method_exists($class, 'onImport')) {
$class->onImport($this);
}
+16 -18
View File
@@ -9,11 +9,14 @@ declare(strict_types=1);
namespace Snowflake\Cache;
use Annotation\Inject;
use Exception;
use Server\Events\OnWorkerExit;
use Server\Events\OnWorkerStop;
use Snowflake\Abstracts\Component;
use Snowflake\Abstracts\Config;
use Snowflake\Core\Json;
use Snowflake\Event;
use Snowflake\Events\EventProvider;
use Snowflake\Exception\ConfigException;
use Snowflake\Snowflake;
@@ -21,35 +24,32 @@ use Snowflake\Snowflake;
* Class Redis
* @package Snowflake\Snowflake\Cache
* @see \Redis
*
* @mixin \Redis
*/
class Redis extends Component
{
/**
* @throws Exception
* @var EventProvider
*/
public function init()
{
Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'destroy']);
Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'release']);
Event::on(Event::SERVER_WORKER_START, [$this, 'createPool']);
Event::on(Event::SERVER_TASK_START, [$this, 'createPool']);
}
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
/**
* @throws ConfigException
* @throws Exception
*/
public function createPool()
public function init()
{
$connections = Snowflake::app()->getRedisFromPool();
$config = $this->get_config();
$length = Config::get('connections.pool.max',10);
$length = Config::get('connections.pool.max', 10);
$this->eventProvider->on(OnWorkerStop::class, [$this, 'destroy'], 0);
$this->eventProvider->on(OnWorkerExit::class, [$this, 'destroy'], 0);
$connections->initConnections('Redis:' . $config['host'], true, $length);
}
@@ -68,12 +68,11 @@ class Redis extends Component
$data = $this->{$name}(...$arguments);
} else {
$data = $this->proxy()->{$name}(...$arguments);
$this->release();
}
if (microtime(true) - $time >= 0.02) {
$this->warning('Redis:' . Json::encode([$name, $arguments]) . (microtime(true) - $time));
}
return $data;
}
@@ -94,7 +93,7 @@ if (_nx ~= 0) then
end
return 0
SCRIPT;
return $this->proxy()->eval($script, ['{lock}:' . $key, $timeout], 1);
return $this->eval($script, ['{lock}:' . $key, $timeout], 1);
}
@@ -105,8 +104,7 @@ SCRIPT;
*/
public function unlock($key): int
{
$redis = $this->proxy();
return $redis->del('{lock}:' . $key);
return $this->del('{lock}:' . $key);
}
-108
View File
@@ -1,108 +0,0 @@
<?php
namespace Snowflake;
use Closure;
use Exception;
use Snowflake\Abstracts\Component;
use SplQueue;
/**
* Class Channel
* @package Snowflake
*/
class Channel extends Component
{
private static ?array $_channels = [];
private static ?array $_waitRecover = [];
public function init()
{
Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'recover']);
Event::on(Event::SERVER_WORKER_EXIT, [$this, 'cleanAll']);
}
/**
* 回收对象
*/
public function recover()
{
foreach (Channel::$_waitRecover as $key => $value) {
if (empty($value)) {
continue;
}
$channel = $this->channelInit($key);
if ($channel->count() >= 100) {
continue;
}
foreach ($value as $item) {
$channel->enqueue($item);
}
}
Channel::$_waitRecover = [];
}
/**
* @param mixed $value
* @param string $name
* @throws Exception
*/
public function push(mixed $value, string $name = ''): void
{
if (!isset(Channel::$_waitRecover[$name])) {
Channel::$_waitRecover[$name] = [];
}
Channel::$_waitRecover[$name][] = $value;
}
/**
* @param string $name
* @return bool|SplQueue
*/
private function channelInit(string $name = ''): bool|SplQueue
{
if (!isset(static::$_channels[$name]) || !(static::$_channels[$name] instanceof SplQueue)) {
static::$_channels[$name] = new SplQueue();
}
return static::$_channels[$name];
}
/**
*
* 清空缓存
*/
public function cleanAll()
{
static::$_channels = null;
static::$_channels = [];
}
/**
* @param string $name
* @param Closure $closure
* @return mixed
*/
public function pop(string $name, Closure $closure): mixed
{
$channel = $this->channelInit($name);
if ($channel->isEmpty()) {
return call_user_func($closure);
}
return $channel->dequeue();
}
}
+12 -4
View File
@@ -9,11 +9,14 @@ declare(strict_types=1);
namespace Snowflake\Error;
use Annotation\Inject;
use Exception;
use Server\Events\OnAfterRequest;
use Snowflake\Abstracts\Component;
use Snowflake\Abstracts\Config;
use Snowflake\Core\Json;
use Snowflake\Event;
use Snowflake\Events\EventProvider;
use Snowflake\Exception\ConfigException;
use Snowflake\Snowflake;
use Swoole\Coroutine;
@@ -29,15 +32,20 @@ class Logger extends Component
private array $logs = [];
/** @var EventProvider */
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
private array $sources = [];
/**
*
*/
public function init()
{
Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'insert']);
Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'closeSource']);
Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'closeSource']);
Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'insert']);
$this->eventProvider->on(OnAfterRequest::class, [$this, 'insert']);
}
+17 -50
View File
@@ -7,6 +7,7 @@ namespace Snowflake;
use Exception;
use Snowflake\Abstracts\BaseObject;
use Swoole\Coroutine;
/**
* Class Event
@@ -85,18 +86,16 @@ class Event extends BaseObject
* @param bool $isAppend
* @throws Exception
*/
public static function on($name, $callback, array $parameter = [], bool $isAppend = false)
public static function on($name, $callback, bool $isAppend = false)
{
if (!isset(static::$_events[$name])) {
static::$_events[$name] = [];
}
if ($callback instanceof \Closure) {
$callback = \Closure::bind($callback, Snowflake::app());
} else if (is_array($callback) && is_string($callback[0])) {
if (is_array($callback) && is_string($callback[0])) {
if (!class_exists($callback[0])) {
throw new Exception('Undefined callback class.');
}
$callback[0] = Snowflake::createObject($callback[0]);
$callback[0] = di($callback[0]);
}
if (static::exists($name, $callback)) {
return;
@@ -130,15 +129,10 @@ class Event extends BaseObject
/**
* @param $name
* @return bool
*/
public static function offName($name): bool
public static function offName($name): void
{
if (!static::exists($name)) {
return true;
}
unset(static::$_events[$name]);
return static::exists($name);
}
@@ -147,14 +141,11 @@ class Event extends BaseObject
* @param null $callback
* @return bool
*/
public static function exists($name, $callback = null): bool
public static function exists($name, $callback): bool
{
if (!isset(static::$_events[$name])) {
if ($callback instanceof \Closure || !isset(static::$_events[$name])) {
return false;
}
if ($callback === null) {
return true;
}
foreach (static::$_events[$name] as $event) {
[$handler, $parameter] = $event;
if ($handler === $callback) {
@@ -175,7 +166,6 @@ class Event extends BaseObject
if (!static::exists($name, $handler)) {
return null;
}
if (empty($handler)) {
return static::$_events[$name];
}
@@ -216,11 +206,14 @@ class Event extends BaseObject
*/
public static function trigger($name, $parameter = null, bool $is_remove = false): bool
{
foreach ((static::$_events[$name] ?? []) as $event) {
foreach ((static::$_events[$name] ?? []) as $key => $event) {
static::execute($event, $parameter);
if ($event instanceof \Closure) {
unset(static::$_events[$name][$key]);
}
}
if ($is_remove) {
static::offName($name);
unset(static::$_events[$name]);
}
return true;
}
@@ -229,45 +222,19 @@ class Event extends BaseObject
/**
* @param $event
* @param $parameter
* @return bool
* @return void
* @throws Exception
*/
private static function execute($event, $parameter): bool
private static function execute($event, $parameter): void
{
try {
$meta = static::mergeParams($event[1], $parameter);
if (call_user_func($event[0], ...$meta) === false) {
return false;
}
return true;
call_user_func($event[0], ...$parameter);
} catch (\Throwable $throwable) {
return logger()->addError($throwable, 'throwable');
logger()->addError($throwable, 'throwable');
return;
}
}
/**
* @param $defaultParameter
* @param array $parameter
* @return array
*/
private static function mergeParams($defaultParameter, array $parameter = []): array
{
if (empty($defaultParameter)) {
$defaultParameter = $parameter;
} else {
if (!is_array($parameter)) {
$parameter = [];
}
foreach ($parameter as $key => $value) {
$defaultParameter[] = $value;
}
}
if (!is_array($defaultParameter)) {
$defaultParameter = [$defaultParameter];
}
return $defaultParameter;
}
}
+20 -15
View File
@@ -2,31 +2,36 @@
namespace Snowflake\Events;
class EventDispatch
use Annotation\Inject;
use Snowflake\Abstracts\BaseObject;
/**
*
*/
class EventDispatch extends BaseObject
{
private EventListener $eventListener;
#[Inject(EventProvider::class)]
public EventProvider $eventProvider;
/**
* @param $event
* @param array $params
* @param object $triggerEvent
* @return object
*/
public function emit($event, array $params = [])
public function dispatch(object $triggerEvent): object
{
$events = $this->eventListener->getEventListeners($event);
if (empty($events)) {
return;
}
while ($events->valid()) {
/** @var EventDispatchInterface $interface */
$interface = $events->current();
$interface->onHandler();
if ($interface->stopPagination()) {
$lists = $this->eventProvider->getListenersForEvent($triggerEvent);
foreach ($lists as $listener) {
/** @var Struct $list */
$listener($triggerEvent);
if ($triggerEvent instanceof StoppableEventInterface && $triggerEvent->isPropagationStopped()) {
break;
}
$events->next();
}
return $triggerEvent;
}
}
-18
View File
@@ -1,18 +0,0 @@
<?php
namespace Snowflake\Events;
/**
*
*/
interface EventDispatchInterface
{
public function getZOrder(): int;
public function onHandler(): void;
public function stopPagination(): bool;
}
+16
View File
@@ -0,0 +1,16 @@
<?php
namespace Snowflake\Events;
/**
*
*/
interface EventInterface
{
public function process(): void;
}
-37
View File
@@ -1,37 +0,0 @@
<?php
namespace Snowflake\Events;
use SplPriorityQueue;
class EventListener
{
/** @var SplPriorityQueue[] */
private array $_events = [];
/**
* @param $event
* @param EventDispatchInterface $handler
*/
public function on($event, EventDispatchInterface $handler)
{
if (!isset($this->_events[$event])) {
$this->_events[$event] = new SplPriorityQueue();
}
$this->_events[$event]->insert($handler, $handler->getZOrder());
}
/**
* @param $event
* @return SplPriorityQueue|null
*/
public function getEventListeners($event): ?SplPriorityQueue
{
return $this->_events[$event] ?? null;
}
}
+45
View File
@@ -0,0 +1,45 @@
<?php
namespace Snowflake\Events;
/**
*
*/
class EventProvider implements EventProviders
{
/** @var Struct[] */
private array $_listeners = [];
/**
* @param object $event
* @return iterable
*/
public function getListenersForEvent(object $event): iterable
{
$queue = new \SplPriorityQueue();
// TODO: Implement getListenersForEvent() method.
foreach ($this->_listeners as $listener) {
if (!($event instanceof $listener->event)) {
continue;
}
$queue->insert($listener, $listener->priority);
}
return $queue;
}
/**
* @param string $event
* @param callable $handler
* @param int $zOrder
*/
public function on(string $event, callable $handler, int $zOrder = 1)
{
$this->_listeners[$event][] = new Struct($event, $handler, $zOrder);
}
}
+15 -1
View File
@@ -2,7 +2,21 @@
namespace Snowflake\Events;
class EventProviders
/**
*
*/
interface EventProviders
{
/**
* @param object $event
* An event for which to return the relevant listeners.
* @return iterable<callable>
* An iterable (array, iterator, or generator) of callables. Each
* callable MUST be type-compatible with $event.
*/
public function getListenersForEvent(object $event): iterable;
}
+24
View File
@@ -0,0 +1,24 @@
<?php
namespace Snowflake\Events;
/**
*
*/
interface StoppableEventInterface
{
/**
* Is propagation stopped?
*
* This will typically only be used by the Dispatcher to determine if the
* previous listener halted propagation.
*
* @return bool
* True if the Event is complete and no further listeners should be called.
* False to continue calling listeners.
*/
public function isPropagationStopped() : bool;
}
+25
View File
@@ -0,0 +1,25 @@
<?php
declare(strict_types=1);
namespace Snowflake\Events;
/**
*
*/
class Struct
{
public string $event;
public \Closure $listener;
public int $priority;
public function __construct(string $event, callable $listener, int $priority)
{
$this->event = $event;
$this->listener = $listener;
$this->priority = $priority;
}
}
+1 -1
View File
@@ -182,7 +182,7 @@ class Connection extends Component
return;
}
if ($client->inTransaction()) {
$client->commit();
return;
}
$this->getPool()->push($coroutineName, $client);
Context::remove($coroutineName);
+2 -4
View File
@@ -538,14 +538,12 @@ if (!function_exists('event')) {
/**
* @param $name
* @param $callback
* @param array $params
* @param bool $isAppend
* @throws Exception
* @throws Exception
*/
function event($name, $callback, array $params = [], bool $isAppend = true)
function event($name, $callback, bool $isAppend = true)
{
Event::on($name, $callback, $params, $isAppend);
Event::on($name, $callback, $isAppend);
}
}