This commit is contained in:
2021-08-05 17:02:12 +08:00
parent 6d4b352c30
commit f0eb29fbd5
3 changed files with 10 additions and 7 deletions
+1 -2
View File
@@ -35,13 +35,12 @@ use Snowflake\Snowflake;
*/
public function execute(mixed $class, mixed $method = null): bool
{
var_dump(class_implements($class));
if (!($class instanceof ConsumerInterface)) {
return false;
}
/** @var KafkaProvider $container */
$container = Snowflake::app()->get('kafka-container');
$container = Snowflake::getDi()->get(KafkaProvider::class);
$container->addConsumer($this->topic, $class);
return true;
-1
View File
@@ -27,7 +27,6 @@ class KafkaProvider extends BaseObject
if (isset($this->_topics[$topic])) {
return;
}
var_dump($topic, $handler);
$this->_topics[$topic] = $handler::class;
}
+9 -4
View File
@@ -4,11 +4,13 @@ namespace Kafka;
use Server\SInterface\PipeMessage;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Snowflake;
/**
*
*/
class Message implements ConsumerInterface, PipeMessage
class Message implements PipeMessage
{
@@ -21,12 +23,15 @@ class Message implements ConsumerInterface, PipeMessage
/**
*
* @throws \ReflectionException
* @throws NotFindClassException
*/
public function process(): void
{
// TODO: Implement process() method.
var_dump($this->struct);
/** @var KafkaProvider $container */
$container = Snowflake::getDi()->get(KafkaProvider::class);
$data = $container->getConsumer($this->struct->topic);
var_dump($data);
}
}