diff --git a/HttpServer/Server.php b/HttpServer/Server.php index adfaf7c7..7e0ad18c 100644 --- a/HttpServer/Server.php +++ b/HttpServer/Server.php @@ -215,8 +215,9 @@ class Server extends Application if (!is_string($process)) { continue; } - $system = $application->set($process, new $process(Snowflake::app(), $name)); + $system = new $process(Snowflake::app(), $name); $this->baseServer->addProcess($system); + $application->set($process, $system); } } diff --git a/Queue/Queue.php b/Queue/Queue.php index 6a5296e6..1de549ff 100644 --- a/Queue/Queue.php +++ b/Queue/Queue.php @@ -56,16 +56,15 @@ class Queue extends \Snowflake\Process\Process /** * @param Process $process - * @throws ComponentException */ public function onHandler(Process $process) { - $redis = Snowflake::app()->getRedis(); - Timer::tick(50, function ($timerId) use ($redis) { + Timer::tick(50, function ($timerId) { + $redis = Snowflake::app()->getRedis(); if ($this->shutdown) { return Timer::clear($timerId); } - $data = $redis->zRevRangeByScore(Waiting::QUEUE_WAITING, 0, 20); + $data = $redis->zRevRange(Waiting::QUEUE_WAITING, 0, 20); if (empty($data)) { return 1; } @@ -97,11 +96,9 @@ class Queue extends \Snowflake\Process\Process */ private function scheduler($data) { - $scheduler = new Coroutine\Scheduler(); foreach ($data as $datum) { - $scheduler->add([$this, 'runner'], $datum); + $this->runner($datum); } - $scheduler->start(); if ($this->shutdown === true) { Snowflake::shutdown($this); } @@ -118,8 +115,8 @@ class Queue extends \Snowflake\Process\Process $logger = $this->application->getLogger(); try { $rely_on = unserialize($class); - $this->waiting->remove($class); - if ($rely_on instanceof Consumer) { + $this->waiting->del($rely_on); + if (!($rely_on instanceof Consumer)) { return; } $this->running->add($rely_on); @@ -127,7 +124,7 @@ class Queue extends \Snowflake\Process\Process } catch (\Throwable $exception) { $logger->write($exception->getMessage(), 'queue'); } finally { - $this->running->del($class); + $this->running->del($rely_on); if (isset($rely_on) && $rely_on instanceof Consumer) { $rely_on->onComplete(); $this->complete->add($rely_on); diff --git a/Queue/Running.php b/Queue/Running.php index 5fa1472a..b3a19c35 100644 --- a/Queue/Running.php +++ b/Queue/Running.php @@ -38,12 +38,8 @@ class Running extends \Queue\Abstracts\Queue * @return false|int * @throws ComponentException */ - public function del(string $consumer) + public function del(Consumer $consumer) { - $consumer = unserialize($consumer); - if (!($consumer instanceof Consumer)) { - return true; - } return $this->pop(self::QUEUE_RUNNING, $consumer); }