diff --git a/HttpServer/Events/OnWorkerStart.php b/HttpServer/Events/OnWorkerStart.php index 3af1cac0..6ec4b83d 100644 --- a/HttpServer/Events/OnWorkerStart.php +++ b/HttpServer/Events/OnWorkerStart.php @@ -27,6 +27,8 @@ class OnWorkerStart extends Callback private int $_taskTable = 0; + private int $signal = SIGTERM | SIGKILL | SIGUSR2 | SIGUSR1; + /** * @param Server $server @@ -38,12 +40,7 @@ class OnWorkerStart extends Callback */ public function onHandler(Server $server, int $worker_id): void { - Coroutine::set([ - 'enable_deadlock_check' => false, - // 'exit_condition' => function () { - // return Coroutine::stats()['coroutine_num'] === 0; - // } - ]); + Coroutine::set(['enable_deadlock_check' => false]); putenv('workerId=' . $worker_id); $get_name = $this->get_process_name($server, $worker_id); @@ -52,11 +49,13 @@ class OnWorkerStart extends Callback } $this->onSignal($server, $worker_id); - - $this->debug(sprintf(workerName($worker_id) . ' #%d is start.....', $worker_id)); if ($worker_id >= $server->setting['worker_num']) { fire(Event::SERVER_TASK_START); + + putenv('environmental=' . Snowflake::TASK); } else { + putenv('environmental=' . Snowflake::WORKER); + Snowflake::setWorkerId($server->worker_pid); $this->setWorkerAction($worker_id); } @@ -68,22 +67,17 @@ class OnWorkerStart extends Callback */ public function onSignal($server, $worker_id) { - Coroutine\go(function (Server $server, $worker_id) { - $sigkill = Coroutine::waitSignal(SIGTERM | SIGKILL | SIGUSR2 | SIGUSR1); - - Timer::clearAll(); + $this->debug(sprintf(workerName($worker_id) . ' #%d is start.....', $worker_id)); + Coroutine\go(function (Server $server) { + $sigkill = Coroutine::waitSignal($this->signal); if ($sigkill === false) { - do { - $number = Co::stats()['coroutine_num']; - var_dump($number); - if ($number === 0) { - break; - } - Coroutine::sleep(0.01); - } while (true); + return $server->stop(); + } + while (Co::stats()['coroutine_num'] > 0) { + Coroutine::sleep(0.01); } return $server->stop(); - }, $server, $worker_id); + }, $server); } diff --git a/HttpServer/Server.php b/HttpServer/Server.php index dec611cc..d4832335 100644 --- a/HttpServer/Server.php +++ b/HttpServer/Server.php @@ -43,479 +43,475 @@ use Swoole\Runtime; */ class Server extends HttpService { - use Action; + use Action; - const HTTP = 'HTTP'; - const TCP = 'TCP'; - const PACKAGE = 'PACKAGE'; - const WEBSOCKET = 'WEBSOCKET'; + const HTTP = 'HTTP'; + const TCP = 'TCP'; + const PACKAGE = 'PACKAGE'; + const WEBSOCKET = 'WEBSOCKET'; - private array $listening = []; - private array $server = [ - 'HTTP' => [SWOOLE_TCP, Http::class], - 'TCP' => [SWOOLE_TCP, Receive::class], - 'PACKAGE' => [SWOOLE_UDP, Packet::class], - 'WEBSOCKET' => [SWOOLE_SOCK_TCP, Websocket::class], - ]; + private array $listening = []; + private array $server = [ + 'HTTP' => [SWOOLE_TCP, Http::class], + 'TCP' => [SWOOLE_TCP, Receive::class], + 'PACKAGE' => [SWOOLE_UDP, Packet::class], + 'WEBSOCKET' => [SWOOLE_SOCK_TCP, Websocket::class], + ]; - private Packet|Websocket|Receive|null|Http $baseServer = null; + private Packet|Websocket|Receive|null|Http $baseServer = null; - public int $daemon = 0; + public int $daemon = 0; - private array $listenTypes = []; + private array $listenTypes = []; - private array $process = [ - 'biomonitoring' => Biomonitoring::class - ]; + private array $process = [ + 'biomonitoring' => Biomonitoring::class + ]; - private array $params = []; + private array $params = []; - /** - * @param $name - * @param $process - * @param array $params - */ - public function addProcess($name, $process, $params = []) - { - $this->process[$name] = $process; - $this->params[$name] = $params; - } + /** + * @param $name + * @param $process + * @param array $params + */ + public function addProcess($name, $process, $params = []) + { + $this->process[$name] = $process; + $this->params[$name] = $params; + } - /** - * @return array - */ - public function getProcesses(): array - { - return $this->process ?? []; - } + /** + * @return array + */ + public function getProcesses(): array + { + return $this->process ?? []; + } - /** - * @param array $configs - * @return Packet|Websocket|Receive|Http|null - * @throws ConfigException - * @throws Exception - */ - public function initCore(array $configs): Packet|Websocket|Receive|Http|null - { - $this->enableCoroutine((bool)Config::get('settings.enable_coroutine')); + /** + * @param array $configs + * @return Packet|Websocket|Receive|Http|null + * @throws ConfigException + * @throws Exception + */ + public function initCore(array $configs): Packet|Websocket|Receive|Http|null + { + $this->enableCoroutine((bool)Config::get('settings.enable_coroutine')); - $this->orders($configs); - $this->onProcessListener(); - return $this->getServer(); - } + $this->orders($configs); + $this->onProcessListener(); + return $this->getServer(); + } - /** - * @param $configs - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - private function orders($configs): Packet|Websocket|Receive|Http|null - { - $servers = $this->sortServers($configs); - foreach ($servers as $server) { - $this->create($server); - if (!$this->baseServer) { - return null; - } - } - return $this->baseServer; - } + /** + * @param $configs + * @return Packet|Websocket|Receive|Http|null + * @throws Exception + */ + private function orders($configs): Packet|Websocket|Receive|Http|null + { + $servers = $this->sortServers($configs); + foreach ($servers as $server) { + $this->create($server); + if (!$this->baseServer) { + return null; + } + } + return $this->baseServer; + } - /** - * @return string start server - * - * start server - * @throws ConfigException - * @throws Exception - */ - public function start(): string - { - $configs = Config::get('servers', true); + /** + * @return string start server + * + * start server + * @throws ConfigException + * @throws Exception + */ + public function start(): string + { + $configs = Config::get('servers', true); - Snowflake::clearWorkerId(); + Snowflake::clearWorkerId(); - $baseServer = $this->initCore($configs); - if (!$baseServer) { - return 'ok'; - } - return $baseServer->start(); - } + $baseServer = $this->initCore($configs); + if (!$baseServer) { + return 'ok'; + } + return $baseServer->start(); + } - /** - * @param $host - * @param $Port - * @return Packet|Websocket|Receive|Http|null - */ - public function error_stop($host, $Port): Packet|Websocket|Receive|Http|null - { - $this->error(sprintf('Port %s::%d is already.', $host, $Port)); - if ($this->baseServer) { - $this->baseServer->shutdown(); - } - return $this->baseServer; - } + /** + * @param $host + * @param $Port + * @return Packet|Websocket|Receive|Http|null + */ + public function error_stop($host, $Port): Packet|Websocket|Receive|Http|null + { + $this->error(sprintf('Port %s::%d is already.', $host, $Port)); + if ($this->baseServer) { + $this->baseServer->shutdown(); + } + return $this->baseServer; + } - /** - * @return bool - * @throws ConfigException - */ - public function isRunner(): bool - { - $port = $this->sortServers(Config::get('servers')); - if (empty($port)) { - return false; - } - if (Snowflake::isLinux()) { - exec('netstat -tunlp | grep ' . $port[0]['port'], $output); - } else { - exec('lsof -i :' . $port[0]['port'] . ' | grep -i "LISTEN"', $output); - } - return !empty($output); - } + /** + * @return bool + * @throws ConfigException + */ + public function isRunner(): bool + { + $port = $this->sortServers(Config::get('servers')); + if (empty($port)) { + return false; + } + if (Snowflake::isLinux()) { + exec('netstat -tunlp | grep ' . $port[0]['port'], $output); + } else { + exec('lsof -i :' . $port[0]['port'] . ' | grep -i "LISTEN"', $output); + } + return !empty($output); + } - /** - * @return void - * - * start server - * @throws Exception - */ - public function shutdown() - { - $this->stop($this); - } + /** + * @return void + * + * start server + * @throws Exception + */ + public function shutdown() + { + $this->stop($this); + } - /** - * @param bool $isEnable - */ - private function enableCoroutine($isEnable = true) - { - if (!$isEnable) { - return; - } - Runtime::enableCoroutine(true, SWOOLE_HOOK_TCP | - SWOOLE_HOOK_UNIX | - SWOOLE_HOOK_UDP | - SWOOLE_HOOK_UDG | - SWOOLE_HOOK_SSL | - SWOOLE_HOOK_TLS | - SWOOLE_HOOK_SLEEP | - SWOOLE_HOOK_STREAM_FUNCTION | - SWOOLE_HOOK_PROC - ); - } + /** + * @param bool $isEnable + */ + private function enableCoroutine($isEnable = true) + { + if (!$isEnable) { + return; + } + Runtime::enableCoroutine(true, SWOOLE_HOOK_TCP | + SWOOLE_HOOK_UNIX | + SWOOLE_HOOK_UDP | + SWOOLE_HOOK_UDG | + SWOOLE_HOOK_SSL | + SWOOLE_HOOK_TLS | + SWOOLE_HOOK_SLEEP | + SWOOLE_HOOK_STREAM_FUNCTION | + SWOOLE_HOOK_PROC + ); + } - /** - * @throws ConfigException - * @throws Exception - */ - public function onProcessListener(): void - { - if (!($this->baseServer instanceof \Swoole\Server)) { - return; - } + /** + * @throws ConfigException + * @throws Exception + */ + public function onProcessListener(): void + { + if (!($this->baseServer instanceof \Swoole\Server)) { + return; + } - $processes = Config::get('processes'); - if (!empty($processes) && is_array($processes)) { - $this->deliveryProcess(merge($processes, $this->process)); - } else { - $this->deliveryProcess($this->process); - } - } + $processes = Config::get('processes'); + if (!empty($processes) && is_array($processes)) { + $this->deliveryProcess(merge($processes, $this->process)); + } else { + $this->deliveryProcess($this->process); + } + } - /** - * @param $processes - * @throws Exception - */ - private function deliveryProcess($processes) - { - $application = Snowflake::app(); - if (empty($processes) || !is_array($processes)) { - return; - } - foreach ($processes as $name => $process) { - $is_enable_coroutine = true; - if (is_array($process)) { - [$process, $is_enable_coroutine] = $process; - } - $this->debug(sprintf('Process %s', $name . '::' . $process)); - if (!is_string($process)) { - continue; - } - $system = new $process(Snowflake::app(), $name, $is_enable_coroutine); - if (isset($this->params[$name])) { - $system->write(Json::encode($this->params[$name])); - } - $this->baseServer->addProcess($system); - $application->set($process, $system); - } - } + /** + * @param $processes + * @throws Exception + */ + private function deliveryProcess($processes) + { + $application = Snowflake::app(); + if (empty($processes) || !is_array($processes)) { + return; + } + foreach ($processes as $name => $process) { + $this->debug(sprintf('Process %s', $process)); + if (!is_string($process)) { + continue; + } + $system = new $process(Snowflake::app(), $name, true); + if (isset($this->params[$name])) { + $system->write(Json::encode($this->params[$name])); + } + $this->baseServer->addProcess($system); + $application->set($process, $system); + } + } - /** - * @param $daemon - * @return Server - */ - public function setDaemon($daemon): static - { - if (!in_array($daemon, [0, 1])) { - return $this; - } - $this->daemon = $daemon; - return $this; - } + /** + * @param $daemon + * @return Server + */ + public function setDaemon($daemon): static + { + if (!in_array($daemon, [0, 1])) { + return $this; + } + $this->daemon = $daemon; + return $this; + } - /** - * @return Packet|Websocket|Receive|Http|null - */ - public function getServer(): Packet|Websocket|Receive|Http|null - { - return $this->baseServer; - } + /** + * @return Packet|Websocket|Receive|Http|null + */ + public function getServer(): Packet|Websocket|Receive|Http|null + { + return $this->baseServer; + } - /** - * @param $config - * @return mixed - * @throws Exception - */ - private function create($config): mixed - { - $settings = Config::get('settings', false, []); - if (!isset($this->server[$config['type']])) { - throw new Exception('Unknown server type(' . $config['type'] . ').'); - } - $server = $this->dispatchCreate($config, $settings); - if (isset($config['events'])) { - $this->createEventListen($config); - } - return $server; - } + /** + * @param $config + * @return mixed + * @throws Exception + */ + private function create($config): mixed + { + $settings = Config::get('settings', false, []); + if (!isset($this->server[$config['type']])) { + throw new Exception('Unknown server type(' . $config['type'] . ').'); + } + $server = $this->dispatchCreate($config, $settings); + if (isset($config['events'])) { + $this->createEventListen($config); + } + return $server; + } - /** - * @param $config - * @throws Exception - */ - protected function createEventListen($config) - { - if (!is_array($config['events'])) { - return; - } - $event = Snowflake::app()->getEvent(); - foreach ($config['events'] as $name => $_event) { - $event->on($name, $_event); - } - } + /** + * @param $config + * @throws Exception + */ + protected function createEventListen($config) + { + if (!is_array($config['events'])) { + return; + } + $event = Snowflake::app()->getEvent(); + foreach ($config['events'] as $name => $_event) { + $event->on($name, $_event); + } + } - /** - * @param $config - * @param $settings - * @return \Swoole\Server|Packet|Receive|Http|Websocket|null - * @throws NotFindClassException - * @throws ReflectionException - * @throws Exception - */ - private function dispatchCreate($config, $settings): \Swoole\Server|Packet|Receive|Http|Websocket|null - { - if (!($this->baseServer instanceof \Swoole\Server)) { - $this->parseServer($config, $settings); - } else { - if ($this->isUse($config['port'])) { - return $this->error_stop($config['host'], $config['port']); - } - $newListener = $this->baseServer->addlistener($config['host'], $config['port'], $config['mode']); - if (isset($config['settings']) && is_array($config['settings'])) { - $newListener->set($config['settings']); - } - $this->onListenerBind($config, $this->baseServer); - } - return $this->baseServer; - } + /** + * @param $config + * @param $settings + * @return \Swoole\Server|Packet|Receive|Http|Websocket|null + * @throws NotFindClassException + * @throws ReflectionException + * @throws Exception + */ + private function dispatchCreate($config, $settings): \Swoole\Server|Packet|Receive|Http|Websocket|null + { + if (!($this->baseServer instanceof \Swoole\Server)) { + $this->parseServer($config, $settings); + } else { + if ($this->isUse($config['port'])) { + return $this->error_stop($config['host'], $config['port']); + } + $newListener = $this->baseServer->addlistener($config['host'], $config['port'], $config['mode']); + if (isset($config['settings']) && is_array($config['settings'])) { + $newListener->set($config['settings']); + } + $this->onListenerBind($config, $this->baseServer); + } + return $this->baseServer; + } - /** - * @param $config - * @param $settings - * @return Packet|Websocket|Receive|Http|null - * @throws Exception - */ - private function parseServer($config, $settings): Packet|Websocket|Receive|Http|null - { - $class = $this->dispatch($config['type']); - if ($this->isUse($config['port'])) { - return $this->error_stop($config['host'], $config['port']); - } - if (isset($config['settings']) && !empty($config['settings'])) { - $settings = array_merge($settings, $config['settings']); - } - $this->baseServer = new $class($config['host'], $config['port'], SWOOLE_PROCESS, $config['mode']); - $settings['daemonize'] = $this->daemon; - if (!isset($settings['pid_file'])) { - $settings['pid_file'] = APP_PATH . 'storage/server.pid'; - } - if ($this->baseServer instanceof Websocket) { - $this->onLoadWebsocketHandler(); - } else if ($this->baseServer instanceof Http) { - $this->onLoadHttpHandler(); - } - $this->baseServer->set($settings); + /** + * @param $config + * @param $settings + * @return Packet|Websocket|Receive|Http|null + * @throws Exception + */ + private function parseServer($config, $settings): Packet|Websocket|Receive|Http|null + { + $class = $this->dispatch($config['type']); + if ($this->isUse($config['port'])) { + return $this->error_stop($config['host'], $config['port']); + } + if (isset($config['settings']) && !empty($config['settings'])) { + $settings = array_merge($settings, $config['settings']); + } + $this->baseServer = new $class($config['host'], $config['port'], SWOOLE_PROCESS, $config['mode']); + $settings['daemonize'] = $this->daemon; + if (!isset($settings['pid_file'])) { + $settings['pid_file'] = APP_PATH . 'storage/server.pid'; + } + if ($this->baseServer instanceof Websocket) { + $this->onLoadWebsocketHandler(); + } else if ($this->baseServer instanceof Http) { + $this->onLoadHttpHandler(); + } + $this->baseServer->set($settings); - return $this->baseServer; - } + return $this->baseServer; + } - /** - * @param $config - * @param $newListener - * @return void - * @throws ReflectionException - * @throws Exception - * @throws NotFindClassException - */ - private function onListenerBind($config, $newListener) - { - $this->debug(sprintf('Listener %s::%d -> %s', $config['host'], $config['port'], $config['mode'])); - if ($config['type'] == self::WEBSOCKET) { - throw new Exception('Base server must instanceof \Swoole\Websocket\Server::class.'); - } else if (!in_array($config['type'], [self::HTTP, self::TCP, self::PACKAGE])) { - throw new Exception('Unknown server type(' . $config['type'] . ').'); - } - if (in_array($config['type'], $this->listenTypes)) { - return; - } - if ($config['type'] == self::HTTP) { - if (in_array($config['type'], $this->listenTypes)) { - throw new Exception('Base server must instanceof \Swoole\Websocket\Server::class.'); - } - $this->onBind($newListener, 'request', [Snowflake::createObject(OnRequest::class), 'onHandler']); - } else { - $this->noHttp($newListener, $config); - } - array_push($this->listenTypes, $config['type']); - } + /** + * @param $config + * @param $newListener + * @return void + * @throws ReflectionException + * @throws Exception + * @throws NotFindClassException + */ + private function onListenerBind($config, $newListener) + { + $this->debug(sprintf('Listener %s::%d -> %s', $config['host'], $config['port'], $config['mode'])); + if ($config['type'] == self::WEBSOCKET) { + throw new Exception('Base server must instanceof \Swoole\Websocket\Server::class.'); + } else if (!in_array($config['type'], [self::HTTP, self::TCP, self::PACKAGE])) { + throw new Exception('Unknown server type(' . $config['type'] . ').'); + } + if (in_array($config['type'], $this->listenTypes)) { + return; + } + if ($config['type'] == self::HTTP) { + if (in_array($config['type'], $this->listenTypes)) { + throw new Exception('Base server must instanceof \Swoole\Websocket\Server::class.'); + } + $this->onBind($newListener, 'request', [Snowflake::createObject(OnRequest::class), 'onHandler']); + } else { + $this->noHttp($newListener, $config); + } + array_push($this->listenTypes, $config['type']); + } - /** - * @param $newListener - * @param $config - * @throws NotFindClassException - * @throws ReflectionException - * @throws Exception - */ - private function noHttp($newListener, $config) - { - $this->onBind($newListener, 'connect', [Snowflake::createObject(OnConnect::class), 'onHandler']); - $this->onBind($newListener, 'close', [Snowflake::createObject(OnClose::class), 'onHandler']); - if ($config['type'] == self::TCP) { - $this->onBind($newListener, 'receive', [$class = new OnReceive(), 'onHandler']); - } else { - $this->onBind($newListener, 'packet', [$class = new OnPacket(), 'onHandler']); - } - $class->pack = $config['resolve']['pack'] ?? null; - $class->unpack = $config['resolve']['unpack'] ?? null; - } + /** + * @param $newListener + * @param $config + * @throws NotFindClassException + * @throws ReflectionException + * @throws Exception + */ + private function noHttp($newListener, $config) + { + $this->onBind($newListener, 'connect', [Snowflake::createObject(OnConnect::class), 'onHandler']); + $this->onBind($newListener, 'close', [Snowflake::createObject(OnClose::class), 'onHandler']); + if ($config['type'] == self::TCP) { + $this->onBind($newListener, 'receive', [$class = new OnReceive(), 'onHandler']); + } else { + $this->onBind($newListener, 'packet', [$class = new OnPacket(), 'onHandler']); + } + $class->pack = $config['resolve']['pack'] ?? null; + $class->unpack = $config['resolve']['unpack'] ?? null; + } - /** - * @param $server - * @param $name - * @param $callback - * @throws Exception - */ - private function onBind($server, $name, $callback) - { - if (in_array($name, $this->listening)) { - return; - } - if ($name === 'request') { - $this->onLoadHttpHandler(); - } - array_push($this->listening, $name); - $server->on($name, $callback); - } + /** + * @param $server + * @param $name + * @param $callback + * @throws Exception + */ + private function onBind($server, $name, $callback) + { + if (in_array($name, $this->listening)) { + return; + } + if ($name === 'request') { + $this->onLoadHttpHandler(); + } + array_push($this->listening, $name); + $server->on($name, $callback); + } - /** - * Load router handler - * @throws Exception - */ - public function onLoadHttpHandler() - { - $event = Snowflake::app()->getEvent(); - $event->on(Event::SERVER_WORKER_START, function () { - $router = Snowflake::app()->getRouter(); - $router->loadRouterSetting(); + /** + * Load router handler + * @throws Exception + */ + public function onLoadHttpHandler() + { + $event = Snowflake::app()->getEvent(); + $event->on(Event::SERVER_WORKER_START, function () { + $router = Snowflake::app()->getRouter(); + $router->loadRouterSetting(); - $attributes = Snowflake::app()->getAttributes(); - $attributes->read(CONTROLLER_PATH, 'App\Http\Controllers', 'controllers'); - }); - } + $attributes = Snowflake::app()->getAttributes(); + $attributes->read(CONTROLLER_PATH, 'App\Http\Controllers', 'controllers'); + }); + } - /** - * @throws Exception - */ - public function onLoadWebsocketHandler() - { - $event = Snowflake::app()->getEvent(); - $event->on(Event::SERVER_WORKER_START, function () { - $attributes = Snowflake::app()->getAttributes(); - $attributes->read(SOCKET_PATH, 'App\Websocket', 'sockets'); - }); - } + /** + * @throws Exception + */ + public function onLoadWebsocketHandler() + { + $event = Snowflake::app()->getEvent(); + $event->on(Event::SERVER_WORKER_START, function () { + $attributes = Snowflake::app()->getAttributes(); + $attributes->read(SOCKET_PATH, 'App\Websocket', 'sockets'); + }); + } - /** - * @param $type - * @return string - */ - private function dispatch($type): string - { - $default = [ - self::HTTP => Http::class, - self::WEBSOCKET => Websocket::class, - self::TCP => Receive::class, - self::PACKAGE => Packet::class - ]; - return $default[$type] ?? Receive::class; - } + /** + * @param $type + * @return string + */ + private function dispatch($type): string + { + $default = [ + self::HTTP => Http::class, + self::WEBSOCKET => Websocket::class, + self::TCP => Receive::class, + self::PACKAGE => Packet::class + ]; + return $default[$type] ?? Receive::class; + } - /** - * @param $servers - * @return array - */ - private function sortServers($servers): array - { - $array = []; - foreach ($servers as $server) { - switch ($server['type']) { - case self::WEBSOCKET: - array_unshift($array, $server); - break; - case self::HTTP: - case self::PACKAGE | self::TCP: - $array[] = $server; - break; - default: - $array[] = $server; - } - } - return $array; - } + /** + * @param $servers + * @return array + */ + private function sortServers($servers): array + { + $array = []; + foreach ($servers as $server) { + switch ($server['type']) { + case self::WEBSOCKET: + array_unshift($array, $server); + break; + case self::HTTP: + case self::PACKAGE | self::TCP: + $array[] = $server; + break; + default: + $array[] = $server; + } + } + return $array; + } } diff --git a/System/Process/Process.php b/System/Process/Process.php index dfd1a1aa..0a744bb9 100644 --- a/System/Process/Process.php +++ b/System/Process/Process.php @@ -17,31 +17,47 @@ use Snowflake\Snowflake; abstract class Process extends \Swoole\Process implements SProcess { - /** @var Application $application */ - protected Application $application; + /** @var Application $application */ + protected Application $application; - /** - * Process constructor. - * @param $application - * @param $name - * @param bool $enable_coroutine - * @throws Exception - */ - public function __construct($application, $name, $enable_coroutine = true) - { - $class = get_called_class(); - parent::__construct(function ($process) use ($name, $class) { - fire(Event::SERVER_WORKER_START); - putenv('workerId=Process.0'); - if (Snowflake::isLinux()) { - $prefix = ucfirst(rtrim(Snowflake::app()->id, ':')); - $this->name($prefix . ': ' . $class); - } - $this->onHandler($process); - }, false, 1, $enable_coroutine); - $this->application = $application; - Snowflake::setWorkerId($this->pid); - } + /** + * Process constructor. + * @param $application + * @param $name + * @param bool $enable_coroutine + * @throws Exception + */ + public function __construct($application, $name, $enable_coroutine = true) + { + parent::__construct([$this, '_load'], true, 1, $enable_coroutine); + $this->application = $application; + Snowflake::setWorkerId($this->pid); + } + + /** + * @param Process $process + * @throws \Snowflake\Exception\ComponentException + */ + private function _load(Process $process) + { + putenv('environmental=' . Snowflake::PROCESS); + + fire(Event::SERVER_WORKER_START); + if (Snowflake::isLinux()) { + $this->name($this->getPrefix()); + } + $this->onHandler($process); + } + + + /** + * @return string + */ + private function getPrefix(): string + { + return ucfirst(rtrim(Snowflake::app()->id, ':') . ': ' . get_called_class()); + } + } diff --git a/System/Snowflake.php b/System/Snowflake.php index ecc40b30..7b9ea044 100644 --- a/System/Snowflake.php +++ b/System/Snowflake.php @@ -400,6 +400,20 @@ class Snowflake private static array $_autoload = []; + const PROCESS = 'process'; + const TASK = 'task'; + const WORKER = 'worker'; + + + /** + * @return string|null + */ + public static function getEnvironmental(): ?string + { + return env('environmental'); + } + + /** * @param $class * @param $file