Files
kiri-core/Kafka/TaskContainer.php
T
as2252258@163.com 6fbb0dea76 modify
2021-04-11 17:18:54 +08:00

63 lines
1.0 KiB
PHP

<?php
namespace Kafka;
use Snowflake\Abstracts\BaseObject;
/**
* Class TaskContainer
* @package Kafka
*/
class TaskContainer extends BaseObject
{
private array $_topics = [];
private static TaskContainer $container;
/**
* @return \Kafka\TaskContainer
*/
public static function getInstance(): TaskContainer
{
if (!(static::$container instanceof TaskContainer)) {
static::$container = new TaskContainer();
}
return static::$container;
}
/**
* @param $topic
* @param $handler
*/
public function addConsumer($topic, $handler)
{
if (isset($this->_topics[$topic])) {
return;
}
$this->_topics[$topic] = $handler;
}
/**
* @param $topic
* @param \Kafka\Struct $struct
*/
public function process($topic, Struct $struct)
{
$handler = $this->_topics[$topic] ?? null;
if (empty($handler)) {
return;
}
call_user_func($handler, $struct);
}
}