eventProvider = Kiri::getDi()->get(EventProvider::class); } public function init() { $this->heartbeat_check(); $this->eventProvider->on(OnWorkerExit::class, [$this, 'stopHeartbeatCheck']); } /** * */ public function heartbeat_check(): void { if (env('state') == 'exit') { return; } if ($this->_timer === -1 && Context::inCoroutine()) { $this->_timer = Timer::tick(1000, function () { try { if (env('state') == 'exit') { echo 'timer end.' . PHP_EOL; } if (time() - $this->_last > 10 * 60) { $this->stopHeartbeatCheck(); $this->pdo = null; } } catch (\Throwable $throwable) { error($throwable); } }); } } /** * */ public function stopHeartbeatCheck(): void { if ($this->_timer > -1) { Timer::clear($this->_timer); } $this->_timer = -1; } /** * @param string $name * @param array $arguments * @return mixed * @throws RedisConnectException */ public function __call(string $name, array $arguments) { if (!method_exists($this, $name)) { return $this->_pdo()->{$name}(...$arguments); } return $this->{$name}(...$arguments); } /** * @return \Redis * @throws RedisConnectException */ public function _pdo(): \Redis { if ($this->_timer === -1) { $this->heartbeat_check(); } if (!($this->pdo instanceof \Redis)) { $this->pdo = $this->newClient(); } return $this->pdo; } /** * @return \Redis * @throws RedisConnectException */ private function newClient(): \Redis { $redis = new \Redis(); if (!$redis->pconnect($this->host, $this->port, $this->timeout)) { throw new RedisConnectException(sprintf('The Redis Connect %s::%d Fail.', $this->host, $this->port)); } if (!empty($config['auth']) && !$redis->auth($config['auth'])) { throw new RedisConnectException(sprintf('Redis Error: %s, Host %s, Auth %s', $redis->getLastError(), $this->host, $this->auth)); } if ($this->read_timeout < 0) { $this->read_timeout = 0; } $redis->select($this->database); $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->read_timeout); $redis->setOption(\Redis::OPT_PREFIX, $this->prefix); return $redis; } }