From 10e0bdc93bbd9a9fed13b75f9e2b83591f214967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Fri, 9 Oct 2020 18:34:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Kafka/Consumer/State.php | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/Kafka/Consumer/State.php b/Kafka/Consumer/State.php index b49590ab..a61371f3 100644 --- a/Kafka/Consumer/State.php +++ b/Kafka/Consumer/State.php @@ -15,7 +15,6 @@ namespace Kafka\Consumer; use Swoole\Timer; -use function Amp\repeat; /** * +------------------------------------------------------------------------------ @@ -137,13 +136,13 @@ class State continue; } $interval = isset($option['interval']) ? $option['interval'] : 200; - repeat(function ($watcherId) use ($request, $option) { + Timer::tick($msInterval = $interval, function ($watcherId) use ($request, $option) { if ($this->checkRun($request) && $option['func'] != null) { $context = call_user_func($option['func']); $this->processing($request, $context); } $this->requests[$request]['watcher'] = $watcherId; - }, $msInterval = $interval); + }); } // start sync metadata @@ -151,9 +150,9 @@ class State $context = call_user_func($this->requests[self::REQUEST_METADATA]['func']); $this->processing($request, $context); } - repeat(function ($watcherId) { + Timer::tick($msInterval = 1000, function ($watcherId) { $this->report(); - }, $msInterval = 1000); + }); } // }}}