enable_queue; } /** * @return string */ public function getName(): string { if (empty($this->name)) { $this->name = uniqid('p.'); } return $this->name; } /** * @return bool */ public function isStop(): bool { return $this->stop; } /** * @return bool */ public function getRedirectStdinAndStdout(): bool { return $this->redirect_stdin_and_stdout; } /** * @return int */ public function getPipeType(): int { return $this->pipe_type; } /** * @return bool */ public function isEnableCoroutine(): bool { return $this->enable_coroutine; } /** * */ public function stop(): void { $this->stop = true; } /** * @return void */ abstract public function onSigterm(): void; /** * @param Process $process * @return AbstractProcess */ public function onShutdown(Process $process): static { $this->process = $process; if ($this->enable_coroutine) { $array['enable_deadlock_check'] = false; $array['deadlock_check_disable_trace'] = false; $array['exit_condition'] = [$this, 'exit_condition']; Coroutine::set($array); Coroutine::create(fn() => $this->coroutineWaitSignal()); } else { pcntl_signal(\SIGTERM, [$this, 'pointWaitSignal']); pcntl_signal(\SIGINT, [$this, 'pointWaitSignal']); pcntl_signal(\SIGUSR1, [$this, 'pointWaitSignal']); pcntl_signal(SIGHUP, [$this, 'pointWaitSignal']); } return $this; } public function exit_condition(): bool { return Coroutine::stats()['coroutine_num'] === 0; } /** * @param $signal * @return void */ public function pointWaitSignal($signal): void { $this->stop = true; $this->onSigterm(); switch ($signal) { case \SIGTERM: case \SIGUSR1: case \SIGINT: // some stuff before stop consumer e.g. delete lock etc pcntl_signal($signal, SIG_DFL); // restore handler posix_kill(posix_getpid(), $signal); break; // kill self with signal, see https://www.cons.org/cracauer/sigint.html case SIGHUP: // some stuff to restart consumer break; default: // do nothing } } /** * @return void */ public function coroutineWaitSignal(): void { $value = Coroutine::waitSignal(SIGTERM); if ($value) { $this->stop = true; } $this->onSigterm(); } }