From 8bf0941f2b571a93590deea16b57e835ca07a785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Thu, 26 Aug 2021 15:03:19 +0800 Subject: [PATCH] delete kafka --- kiri-crontab/.gitignore | 34 + kiri-crontab/.phpstorm.meta.php | 17 + kiri-crontab/README.md | 0 kiri-crontab/composer.json | 23 + kiri-kafka/Annotation/Kafka.php | 49 -- kiri-kafka/Configuration.php | 1066 ------------------------------ kiri-kafka/Constant.php | 163 ----- kiri-kafka/ConsumerInterface.php | 21 - kiri-kafka/Kafka.php | 194 ------ kiri-kafka/KafkaClient.php | 161 ----- kiri-kafka/KafkaImports.php | 46 -- kiri-kafka/KafkaProvider.php | 43 -- kiri-kafka/Logger.php | 115 ---- kiri-kafka/Struct.php | 36 - kiri-kafka/TopicConfig.php | 287 -------- kiri-kafka/config.php | 11 - p.php | 57 +- 17 files changed, 102 insertions(+), 2221 deletions(-) create mode 100644 kiri-crontab/.gitignore create mode 100644 kiri-crontab/.phpstorm.meta.php create mode 100644 kiri-crontab/README.md create mode 100644 kiri-crontab/composer.json delete mode 100644 kiri-kafka/Annotation/Kafka.php delete mode 100644 kiri-kafka/Configuration.php delete mode 100644 kiri-kafka/Constant.php delete mode 100644 kiri-kafka/ConsumerInterface.php delete mode 100644 kiri-kafka/Kafka.php delete mode 100644 kiri-kafka/KafkaClient.php delete mode 100644 kiri-kafka/KafkaImports.php delete mode 100644 kiri-kafka/KafkaProvider.php delete mode 100644 kiri-kafka/Logger.php delete mode 100644 kiri-kafka/Struct.php delete mode 100644 kiri-kafka/TopicConfig.php delete mode 100644 kiri-kafka/config.php diff --git a/kiri-crontab/.gitignore b/kiri-crontab/.gitignore new file mode 100644 index 00000000..efca283f --- /dev/null +++ b/kiri-crontab/.gitignore @@ -0,0 +1,34 @@ +# Created by .ignore support plugin (hsz.mobi) +### Yii template +assets/* +!assets/.gitignore +protected/runtime/* +!protected/runtime/.gitignore +protected/data/*.db +themes/classic/views/ + +### Example user template template +### Example user template + +# IntelliJ project files +.idea +*.iml +out +gen + +composer.lock + +*.log +commands/result +config/setting.php +tests/ +vendor/ +runtime/ + +*.xml +*.lock + +oot +d + +composer.lock diff --git a/kiri-crontab/.phpstorm.meta.php b/kiri-crontab/.phpstorm.meta.php new file mode 100644 index 00000000..8b86fdba --- /dev/null +++ b/kiri-crontab/.phpstorm.meta.php @@ -0,0 +1,17 @@ +=8.0", + "ext-json": "*", + "psr/event-dispatcher": "^1.0" + }, + "autoload": { + "psr-4": { + "Kiri\\Crontab\\": "src/" + } + }, + "require-dev": { + } +} diff --git a/kiri-kafka/Annotation/Kafka.php b/kiri-kafka/Annotation/Kafka.php deleted file mode 100644 index 428578f9..00000000 --- a/kiri-kafka/Annotation/Kafka.php +++ /dev/null @@ -1,49 +0,0 @@ -get(KafkaProvider::class); - $container->addConsumer($this->topic, $class); - - return true; - } - - -} diff --git a/kiri-kafka/Configuration.php b/kiri-kafka/Configuration.php deleted file mode 100644 index 00f00df2..00000000 --- a/kiri-kafka/Configuration.php +++ /dev/null @@ -1,1066 +0,0 @@ -set(Constant::CONFIG_BUILTIN_FEATURES, $builtin_features); - } - - /** - * @param mixed $client_id - */ - public function setClientId(mixed $client_id): void - { - $this->set(Constant::CONFIG_CLIENT_ID, $client_id); - } - - /** - * @param mixed $metadata_broker_list - */ - public function setMetadataBrokerList(mixed $metadata_broker_list): void - { - $this->set(Constant::CONFIG_METADATA_BROKER_LIST, $metadata_broker_list); - } - - /** - * @param mixed $bootstrap_servers - */ - public function setBootstrapServers(mixed $bootstrap_servers): void - { - $this->set(Constant::CONFIG_BOOTSTRAP_SERVERS, $bootstrap_servers); - } - - /** - * @param mixed $message_max_bytes - */ - public function setMessageMaxBytes(mixed $message_max_bytes): void - { - $this->set(Constant::CONFIG_MESSAGE_MAX_BYTES, $message_max_bytes); - } - - /** - * @param mixed $message_copy_max_bytes - */ - public function setMessageCopyMaxBytes(mixed $message_copy_max_bytes): void - { - $this->set(Constant::CONFIG_MESSAGE_COPY_MAX_BYTES, $message_copy_max_bytes); - } - - /** - * @param mixed $receive_message_max_bytes - */ - public function setReceiveMessageMaxBytes(mixed $receive_message_max_bytes): void - { - $this->set(Constant::CONFIG_RECEIVE_MESSAGE_MAX_BYTES, $receive_message_max_bytes); - } - - /** - * @param mixed $max_in_flight_requests_per_connection - */ - public function setMaxInFlightRequestsPerConnection(mixed $max_in_flight_requests_per_connection): void - { - $this->set(Constant::CONFIG_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, $max_in_flight_requests_per_connection); - } - - /** - * @param mixed $max_in_flight - */ - public function setMaxInFlight(mixed $max_in_flight): void - { - $this->set(Constant::CONFIG_MAX_IN_FLIGHT, $max_in_flight); - } - - /** - * @param mixed $topic_metadata_refresh_interval_ms - */ - public function setTopicMetadataRefreshIntervalMs(mixed $topic_metadata_refresh_interval_ms): void - { - $this->set(Constant::CONFIG_TOPIC_METADATA_REFRESH_INTERVAL_MS, $topic_metadata_refresh_interval_ms); - } - - /** - * @param mixed $metadata_max_age_ms - */ - public function setMetadataMaxAgeMs(mixed $metadata_max_age_ms): void - { - $this->set(Constant::CONFIG_METADATA_MAX_AGE_MS, $metadata_max_age_ms); - } - - /** - * @param mixed $topic_metadata_refresh_fast_interval_ms - */ - public function setTopicMetadataRefreshFastIntervalMs(mixed $topic_metadata_refresh_fast_interval_ms): void - { - $this->set(Constant::CONFIG_TOPIC_METADATA_REFRESH_FAST_INTERVAL_MS, $topic_metadata_refresh_fast_interval_ms); - } - - /** - * @param mixed $topic_metadata_refresh_fast_cnt - */ - public function setTopicMetadataRefreshFastCnt(mixed $topic_metadata_refresh_fast_cnt): void - { - $this->set(Constant::CONFIG_TOPIC_METADATA_REFRESH_FAST_CNT, $topic_metadata_refresh_fast_cnt); - } - - /** - * @param mixed $topic_metadata_refresh_sparse - */ - public function setTopicMetadataRefreshSparse(mixed $topic_metadata_refresh_sparse): void - { - $this->set(Constant::CONFIG_TOPIC_METADATA_REFRESH_SPARSE, $topic_metadata_refresh_sparse); - } - - /** - * @param mixed $topic_metadata_propagation_max_ms - */ - public function setTopicMetadataPropagationMaxMs(mixed $topic_metadata_propagation_max_ms): void - { - $this->set(Constant::CONFIG_TOPIC_METADATA_PROPAGATION_MAX_MS, $topic_metadata_propagation_max_ms); - } - - /** - * @param mixed $topic_blacklist - */ - public function setTopicBlacklist(mixed $topic_blacklist): void - { - $this->set(Constant::CONFIG_TOPIC_BLACKLIST, $topic_blacklist); - } - - /** - * @param mixed $debug - */ - public function setDebug(mixed $debug): void - { - $this->set(Constant::CONFIG_DEBUG, $debug); - } - - /** - * @param mixed $socket_timeout_ms - */ - public function setSocketTimeoutMs(mixed $socket_timeout_ms): void - { - $this->set(Constant::CONFIG_SOCKET_TIMEOUT_MS, $socket_timeout_ms); - } - - /** - * @param mixed $socket_blocking_max_ms - */ - public function setSocketBlockingMaxMs(mixed $socket_blocking_max_ms): void - { - $this->set(Constant::CONFIG_SOCKET_BLOCKING_MAX_MS, $socket_blocking_max_ms); - } - - /** - * @param mixed $socket_send_buffer_bytes - */ - public function setSocketSendBufferBytes(mixed $socket_send_buffer_bytes): void - { - $this->set(Constant::CONFIG_SOCKET_SEND_BUFFER_BYTES, $socket_send_buffer_bytes); - } - - /** - * @param mixed $socket_receive_buffer_bytes - */ - public function setSocketReceiveBufferBytes(mixed $socket_receive_buffer_bytes): void - { - $this->set(Constant::CONFIG_SOCKET_RECEIVE_BUFFER_BYTES, $socket_receive_buffer_bytes); - } - - /** - * @param mixed $socket_keepalive_enable - */ - public function setSocketKeepaliveEnable(mixed $socket_keepalive_enable): void - { - $this->set(Constant::CONFIG_SOCKET_KEEPALIVE_ENABLE, $socket_keepalive_enable); - } - - /** - * @param mixed $socket_nagle_disable - */ - public function setSocketNagleDisable(mixed $socket_nagle_disable): void - { - $this->set(Constant::CONFIG_SOCKET_NAGLE_DISABLE, $socket_nagle_disable); - } - - /** - * @param mixed $socket_max_fails - */ - public function setSocketMaxFails(mixed $socket_max_fails): void - { - $this->set(Constant::CONFIG_SOCKET_MAX_FAILS, $socket_max_fails); - } - - /** - * @param mixed $broker_address_ttl - */ - public function setBrokerAddressTtl(mixed $broker_address_ttl): void - { - $this->set(Constant::CONFIG_BROKER_ADDRESS_TTL, $broker_address_ttl); - } - - /** - * @param mixed $broker_address_family - */ - public function setBrokerAddressFamily(mixed $broker_address_family): void - { - $this->set(Constant::CONFIG_BROKER_ADDRESS_FAMILY, $broker_address_family); - } - - /** - * @param mixed $connections_max_idle_ms - */ - public function setConnectionsMaxIdleMs(mixed $connections_max_idle_ms): void - { - $this->set(Constant::CONFIG_CONNECTIONS_MAX_IDLE_MS, $connections_max_idle_ms); - } - - /** - * @param mixed $reconnect_backoff_jitter_ms - */ - public function setReconnectBackoffJitterMs(mixed $reconnect_backoff_jitter_ms): void - { - $this->set(Constant::CONFIG_RECONNECT_BACKOFF_JITTER_MS, $reconnect_backoff_jitter_ms); - } - - /** - * @param mixed $reconnect_backoff_ms - */ - public function setReconnectBackoffMs(mixed $reconnect_backoff_ms): void - { - $this->set(Constant::CONFIG_RECONNECT_BACKOFF_MS, $reconnect_backoff_ms); - } - - /** - * @param mixed $reconnect_backoff_max_ms - */ - public function setReconnectBackoffMaxMs(mixed $reconnect_backoff_max_ms): void - { - $this->set(Constant::CONFIG_RECONNECT_BACKOFF_MAX_MS, $reconnect_backoff_max_ms); - } - - /** - * @param mixed $statistics_interval_ms - */ - public function setStatisticsIntervalMs(mixed $statistics_interval_ms): void - { - $this->set(Constant::CONFIG_STATISTICS_INTERVAL_MS, $statistics_interval_ms); - } - - /** - * @param mixed $enabled_events - */ - public function setEnabledEvents(mixed $enabled_events): void - { - $this->set(Constant::CONFIG_ENABLED_EVENTS, $enabled_events); - } - - /** - * @param mixed $throttle_cb - */ - public function setThrottleCb(mixed $throttle_cb): void - { - $this->set(Constant::CONFIG_THROTTLE_CB, $throttle_cb); - } - - - /** - * @param mixed $log_level - */ - public function setLogLevel(mixed $log_level): void - { - $this->set(Constant::CONFIG_LOG_LEVEL, $log_level); - } - - /** - * @param mixed $log_queue - */ - public function setLogQueue(mixed $log_queue): void - { - $this->set(Constant::CONFIG_LOG_QUEUE, $log_queue); - } - - /** - * @param mixed $log_thread_name - */ - public function setLogThreadName(mixed $log_thread_name): void - { - $this->set(Constant::CONFIG_LOG_THREAD_NAME, $log_thread_name); - } - - /** - * @param mixed $enable_random_seed - */ - public function setEnableRandomSeed(mixed $enable_random_seed): void - { - $this->set(Constant::CONFIG_ENABLE_RANDOM_SEED, $enable_random_seed); - } - - /** - * @param mixed $log_connection_close - */ - public function setLogConnectionClose(mixed $log_connection_close): void - { - $this->set(Constant::CONFIG_LOG_CONNECTION_CLOSE, $log_connection_close); - } - - /** - * @param mixed $background_event_cb - */ - public function setBackgroundEventCb(mixed $background_event_cb): void - { - $this->set(Constant::CONFIG_BACKGROUND_EVENT_CB, $background_event_cb); - } - - /** - * @param mixed $socket_cb - */ - public function setSocketCb(mixed $socket_cb): void - { - $this->set(Constant::CONFIG_SOCKET_CB, $socket_cb); - } - - /** - * @param mixed $connect_cb - */ - public function setConnectCb(mixed $connect_cb): void - { - $this->set(Constant::CONFIG_CONNECT_CB, $connect_cb); - } - - /** - * @param mixed $closesocket_cb - */ - public function setClosesocketCb(mixed $closesocket_cb): void - { - $this->set(Constant::CONFIG_CLOSESOCKET_CB, $closesocket_cb); - } - - /** - * @param mixed $open_cb - */ - public function setOpenCb(mixed $open_cb): void - { - $this->set(Constant::CONFIG_OPEN_CB, $open_cb); - } - - /** - * @param mixed $opaque - */ - public function setOpaque(mixed $opaque): void - { - $this->set(Constant::CONFIG_OPAQUE, $opaque); - } - - /** - * @param mixed $default_topic_conf - */ - public function setDefaultTopicConf(mixed $default_topic_conf): void - { - $this->set(Constant::CONFIG_DEFAULT_TOPIC_CONF, $default_topic_conf); - } - - /** - * @param mixed $internal_termination_signal - */ - public function setInternalTerminationSignal(mixed $internal_termination_signal): void - { - $this->set(Constant::CONFIG_INTERNAL_TERMINATION_SIGNAL, $internal_termination_signal); - } - - /** - * @param mixed $api_version_request - */ - public function setApiVersionRequest(mixed $api_version_request): void - { - $this->set(Constant::CONFIG_API_VERSION_REQUEST, $api_version_request); - } - - /** - * @param mixed $api_version_request_timeout_ms - */ - public function setApiVersionRequestTimeoutMs(mixed $api_version_request_timeout_ms): void - { - $this->set(Constant::CONFIG_API_VERSION_REQUEST_TIMEOUT_MS, $api_version_request_timeout_ms); - } - - /** - * @param mixed $api_version_fallback_ms - */ - public function setApiVersionFallbackMs(mixed $api_version_fallback_ms): void - { - $this->set(Constant::CONFIG_API_VERSION_FALLBACK_MS, $api_version_fallback_ms); - } - - /** - * @param mixed $broker_version_fallback - */ - public function setBrokerVersionFallback(mixed $broker_version_fallback): void - { - $this->set(Constant::CONFIG_BROKER_VERSION_FALLBACK, $broker_version_fallback); - } - - /** - * @param mixed $security_protocol - */ - public function setSecurityProtocol(mixed $security_protocol): void - { - $this->set(Constant::CONFIG_SECURITY_PROTOCOL, $security_protocol); - } - - /** - * @param mixed $ssl_cipher_suites - */ - public function setSslCipherSuites(mixed $ssl_cipher_suites): void - { - $this->set(Constant::CONFIG_SSL_CIPHER_SUITES, $ssl_cipher_suites); - } - - /** - * @param mixed $ssl_curves_list - */ - public function setSslCurvesList(mixed $ssl_curves_list): void - { - $this->set(Constant::CONFIG_SSL_CURVES_LIST, $ssl_curves_list); - } - - /** - * @param mixed $ssl_sigalgs_list - */ - public function setSslSigalgsList(mixed $ssl_sigalgs_list): void - { - $this->set(Constant::CONFIG_SSL_SIGALGS_LIST, $ssl_sigalgs_list); - } - - /** - * @param mixed $ssl_key_location - */ - public function setSslKeyLocation(mixed $ssl_key_location): void - { - $this->set(Constant::CONFIG_SSL_KEY_LOCATION, $ssl_key_location); - } - - /** - * @param mixed $ssl_key_password - */ - public function setSslKeyPassword(mixed $ssl_key_password): void - { - $this->set(Constant::CONFIG_SSL_KEY_PASSWORD, $ssl_key_password); - } - - /** - * @param mixed $ssl_key_pem - */ - public function setSslKeyPem(mixed $ssl_key_pem): void - { - $this->set(Constant::CONFIG_SSL_KEY_PEM, $ssl_key_pem); - } - - /** - * @param mixed $ssl_key - */ - public function setSslKey(mixed $ssl_key): void - { - $this->set(Constant::CONFIG_SSL_KEY, $ssl_key); - } - - /** - * @param mixed $ssl_certificate_location - */ - public function setSslCertificateLocation(mixed $ssl_certificate_location): void - { - $this->set(Constant::CONFIG_SSL_CERTIFICATE_LOCATION, $ssl_certificate_location); - } - - /** - * @param mixed $ssl_certificate_pem - */ - public function setSslCertificatePem(mixed $ssl_certificate_pem): void - { - $this->set(Constant::CONFIG_SSL_CERTIFICATE_PEM, $ssl_certificate_pem); - } - - /** - * @param mixed $ssl_certificate - */ - public function setSslCertificate(mixed $ssl_certificate): void - { - $this->set(Constant::CONFIG_SSL_CERTIFICATE, $ssl_certificate); - } - - /** - * @param mixed $ssl_ca_location - */ - public function setSslCaLocation(mixed $ssl_ca_location): void - { - $this->set(Constant::CONFIG_SSL_CA_LOCATION, $ssl_ca_location); - } - - /** - * @param mixed $ssl_ca - */ - public function setSslCa(mixed $ssl_ca): void - { - $this->set(Constant::CONFIG_SSL_CA, $ssl_ca); - } - - /** - * @param mixed $ssl_ca_certificate_stores - */ - public function setSslCaCertificateStores(mixed $ssl_ca_certificate_stores): void - { - $this->set(Constant::CONFIG_SSL_CA_CERTIFICATE_STORES, $ssl_ca_certificate_stores); - } - - /** - * @param mixed $ssl_crl_location - */ - public function setSslCrlLocation(mixed $ssl_crl_location): void - { - $this->set(Constant::CONFIG_SSL_CRL_LOCATION, $ssl_crl_location); - } - - /** - * @param mixed $ssl_keystore_location - */ - public function setSslKeystoreLocation(mixed $ssl_keystore_location): void - { - $this->set(Constant::CONFIG_SSL_KEYSTORE_LOCATION, $ssl_keystore_location); - } - - /** - * @param mixed $ssl_keystore_password - */ - public function setSslKeystorePassword(mixed $ssl_keystore_password): void - { - $this->set(Constant::CONFIG_SSL_KEYSTORE_PASSWORD, $ssl_keystore_password); - } - - /** - * @param mixed $ssl_engine_location - */ - public function setSslEngineLocation(mixed $ssl_engine_location): void - { - $this->set(Constant::CONFIG_SSL_ENGINE_LOCATION, $ssl_engine_location); - } - - /** - * @param mixed $ssl_engine_id - */ - public function setSslEngineId(mixed $ssl_engine_id): void - { - $this->set(Constant::CONFIG_SSL_ENGINE_ID, $ssl_engine_id); - } - - /** - * @param mixed $ssl_engine_callback_data - */ - public function setSslEngineCallbackData(mixed $ssl_engine_callback_data): void - { - $this->set(Constant::CONFIG_SSL_ENGINE_CALLBACK_DATA, $ssl_engine_callback_data); - } - - /** - * @param mixed $enable_ssl_certificate_verification - */ - public function setEnableSslCertificateVerification(mixed $enable_ssl_certificate_verification): void - { - $this->set(Constant::CONFIG_ENABLE_SSL_CERTIFICATE_VERIFICATION, $enable_ssl_certificate_verification); - } - - /** - * @param mixed $ssl_endpoint_identification_algorithm - */ - public function setSslEndpointIdentificationAlgorithm(mixed $ssl_endpoint_identification_algorithm): void - { - $this->set(Constant::CONFIG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, $ssl_endpoint_identification_algorithm); - } - - /** - * @param mixed $ssl_certificate_verify_cb - */ - public function setSslCertificateVerifyCb(mixed $ssl_certificate_verify_cb): void - { - $this->set(Constant::CONFIG_SSL_CERTIFICATE_VERIFY_CB, $ssl_certificate_verify_cb); - } - - /** - * @param mixed $sasl_mechanisms - */ - public function setSaslMechanisms(mixed $sasl_mechanisms): void - { - $this->set(Constant::CONFIG_SASL_MECHANISMS, $sasl_mechanisms); - } - - /** - * @param mixed $sasl_mechanism - */ - public function setSaslMechanism(mixed $sasl_mechanism): void - { - $this->set(Constant::CONFIG_SASL_MECHANISM, $sasl_mechanism); - } - - /** - * @param mixed $sasl_kerberos_service_name - */ - public function setSaslKerberosServiceName(mixed $sasl_kerberos_service_name): void - { - $this->set(Constant::CONFIG_SASL_KERBEROS_SERVICE_NAME, $sasl_kerberos_service_name); - } - - /** - * @param mixed $sasl_kerberos_principal - */ - public function setSaslKerberosPrincipal(mixed $sasl_kerberos_principal): void - { - $this->set(Constant::CONFIG_SASL_KERBEROS_PRINCIPAL, $sasl_kerberos_principal); - } - - /** - * @param mixed $sasl_kerberos_kinit_cmd - */ - public function setSaslKerberosKinitCmd(mixed $sasl_kerberos_kinit_cmd): void - { - $this->set(Constant::CONFIG_SASL_KERBEROS_KINIT_CMD, $sasl_kerberos_kinit_cmd); - } - - /** - * @param mixed $sasl_kerberos_keytab - */ - public function setSaslKerberosKeytab(mixed $sasl_kerberos_keytab): void - { - $this->set(Constant::CONFIG_SASL_KERBEROS_KEYTAB, $sasl_kerberos_keytab); - } - - /** - * @param mixed $sasl_kerberos_min_time_before_relogin - */ - public function setSaslKerberosMinTimeBeforeRelogin(mixed $sasl_kerberos_min_time_before_relogin): void - { - $this->set(Constant::CONFIG_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, $sasl_kerberos_min_time_before_relogin); - } - - /** - * @param mixed $sasl_username - */ - public function setSaslUsername(mixed $sasl_username): void - { - $this->set(Constant::CONFIG_SASL_USERNAME, $sasl_username); - } - - /** - * @param mixed $sasl_password - */ - public function setSaslPassword(mixed $sasl_password): void - { - $this->set(Constant::CONFIG_SASL_PASSWORD, $sasl_password); - } - - /** - * @param mixed $sasl_oauthbearer_config - */ - public function setSaslOauthbearerConfig(mixed $sasl_oauthbearer_config): void - { - $this->set(Constant::CONFIG_SASL_OAUTHBEARER_CONFIG, $sasl_oauthbearer_config); - } - - /** - * @param mixed $enable_sasl_oauthbearer_unsecure_jwt - */ - public function setEnableSaslOauthbearerUnsecureJwt(mixed $enable_sasl_oauthbearer_unsecure_jwt): void - { - $this->set(Constant::CONFIG_ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT, $enable_sasl_oauthbearer_unsecure_jwt); - } - - /** - * @param mixed $oauthbearer_token_refresh_cb - */ - public function setOauthbearerTokenRefreshCb(mixed $oauthbearer_token_refresh_cb): void - { - $this->set(Constant::CONFIG_OAUTHBEARER_TOKEN_REFRESH_CB, $oauthbearer_token_refresh_cb); - } - - /** - * @param mixed $plugin_library_paths - */ - public function setPluginLibraryPaths(mixed $plugin_library_paths): void - { - $this->set(Constant::CONFIG_PLUGIN_LIBRARY_PATHS, $plugin_library_paths); - } - - /** - * @param mixed $interceptors - */ - public function setInterceptors(mixed $interceptors): void - { - $this->set(Constant::CONFIG_INTERCEPTORS, $interceptors); - } - - /** - * @param mixed $group_id - */ - public function setGroupId(mixed $group_id): void - { - $this->set(Constant::CONFIG_GROUP_ID, $group_id); - } - - /** - * @param mixed $group_instance_id - */ - public function setGroupInstanceId(mixed $group_instance_id): void - { - $this->set(Constant::CONFIG_GROUP_INSTANCE_ID, $group_instance_id); - } - - /** - * @param mixed $partition_assignment_strategy - */ - public function setPartitionAssignmentStrategy(mixed $partition_assignment_strategy): void - { - $this->set(Constant::CONFIG_PARTITION_ASSIGNMENT_STRATEGY, $partition_assignment_strategy); - } - - /** - * @param mixed $session_timeout_ms - */ - public function setSessionTimeoutMs(mixed $session_timeout_ms): void - { - $this->set(Constant::CONFIG_SESSION_TIMEOUT_MS, $session_timeout_ms); - } - - /** - * @param mixed $heartbeat_interval_ms - */ - public function setHeartbeatIntervalMs(mixed $heartbeat_interval_ms): void - { - $this->set(Constant::CONFIG_HEARTBEAT_INTERVAL_MS, $heartbeat_interval_ms); - } - - /** - * @param mixed $group_protocol_type - */ - public function setGroupProtocolType(mixed $group_protocol_type): void - { - $this->set(Constant::CONFIG_GROUP_PROTOCOL_TYPE, $group_protocol_type); - } - - /** - * @param mixed $coordinator_query_interval_ms - */ - public function setCoordinatorQueryIntervalMs(mixed $coordinator_query_interval_ms): void - { - $this->set(Constant::CONFIG_COORDINATOR_QUERY_INTERVAL_MS, $coordinator_query_interval_ms); - } - - /** - * @param mixed $max_poll_interval_ms - */ - public function setMaxPollIntervalMs(mixed $max_poll_interval_ms): void - { - $this->set(Constant::CONFIG_MAX_POLL_INTERVAL_MS, $max_poll_interval_ms); - } - - /** - * @param mixed $enable_auto_commit - */ - public function setEnableAutoCommit(mixed $enable_auto_commit): void - { - $this->set(Constant::CONFIG_ENABLE_AUTO_COMMIT, $enable_auto_commit); - } - - /** - * @param mixed $auto_commit_interval_ms - */ - public function setAutoCommitIntervalMs(mixed $auto_commit_interval_ms): void - { - $this->set(Constant::CONFIG_AUTO_COMMIT_INTERVAL_MS, $auto_commit_interval_ms); - } - - /** - * @param mixed $enable_auto_offset_store - */ - public function setEnableAutoOffsetStore(mixed $enable_auto_offset_store): void - { - $this->set(Constant::CONFIG_ENABLE_AUTO_OFFSET_STORE, $enable_auto_offset_store); - } - - /** - * @param mixed $queued_min_messages - */ - public function setQueuedMinMessages(mixed $queued_min_messages): void - { - $this->set(Constant::CONFIG_QUEUED_MIN_MESSAGES, $queued_min_messages); - } - - /** - * @param mixed $queued_max_messages_kbytes - */ - public function setQueuedMaxMessagesKbytes(mixed $queued_max_messages_kbytes): void - { - $this->set(Constant::CONFIG_QUEUED_MAX_MESSAGES_KBYTES, $queued_max_messages_kbytes); - } - - /** - * @param mixed $fetch_wait_max_ms - */ - public function setFetchWaitMaxMs(mixed $fetch_wait_max_ms): void - { - $this->set(Constant::CONFIG_FETCH_WAIT_MAX_MS, $fetch_wait_max_ms); - } - - /** - * @param mixed $fetch_message_max_bytes - */ - public function setFetchMessageMaxBytes(mixed $fetch_message_max_bytes): void - { - $this->set(Constant::CONFIG_FETCH_MESSAGE_MAX_BYTES, $fetch_message_max_bytes); - } - - /** - * @param mixed $max_partition_fetch_bytes - */ - public function setMaxPartitionFetchBytes(mixed $max_partition_fetch_bytes): void - { - $this->set(Constant::CONFIG_MAX_PARTITION_FETCH_BYTES, $max_partition_fetch_bytes); - } - - /** - * @param mixed $fetch_max_bytes - */ - public function setFetchMaxBytes(mixed $fetch_max_bytes): void - { - $this->set(Constant::CONFIG_FETCH_MAX_BYTES, $fetch_max_bytes); - } - - /** - * @param mixed $fetch_min_bytes - */ - public function setFetchMinBytes(mixed $fetch_min_bytes): void - { - $this->set(Constant::CONFIG_FETCH_MIN_BYTES, $fetch_min_bytes); - } - - /** - * @param mixed $fetch_error_backoff_ms - */ - public function setFetchErrorBackoffMs(mixed $fetch_error_backoff_ms): void - { - $this->set(Constant::CONFIG_FETCH_ERROR_BACKOFF_MS, $fetch_error_backoff_ms); - } - - /** - * @param mixed $offset_store_method - */ - public function setOffsetStoreMethod(mixed $offset_store_method): void - { - $this->set(Constant::CONFIG_OFFSET_STORE_METHOD, $offset_store_method); - } - - /** - * @param mixed $isolation_level - */ - public function setIsolationLevel(mixed $isolation_level): void - { - $this->set(Constant::CONFIG_ISOLATION_LEVEL, $isolation_level); - } - - /** - * @param mixed $enable_partition_eof - */ - public function setEnablePartitionEof(mixed $enable_partition_eof): void - { - $this->set(Constant::CONFIG_ENABLE_PARTITION_EOF, $enable_partition_eof); - } - - /** - * @param mixed $check_crcs - */ - public function setCheckCrcs(mixed $check_crcs): void - { - $this->set(Constant::CONFIG_CHECK_CRCS, $check_crcs); - } - - /** - * @param mixed $allow_auto_create_topics - */ - public function setAllowAutoCreateTopics(mixed $allow_auto_create_topics): void - { - $this->set(Constant::CONFIG_ALLOW_AUTO_CREATE_TOPICS, $allow_auto_create_topics); - } - - /** - * @param mixed $client_rack - */ - public function setClientRack(mixed $client_rack): void - { - $this->set(Constant::CONFIG_CLIENT_RACK, $client_rack); - } - - /** - * @param mixed $transactional_id - */ - public function setTransactionalId(mixed $transactional_id): void - { - $this->set(Constant::CONFIG_TRANSACTIONAL_ID, $transactional_id); - } - - /** - * @param mixed $transaction_timeout_ms - */ - public function setTransactionTimeoutMs(mixed $transaction_timeout_ms): void - { - $this->set(Constant::CONFIG_TRANSACTION_TIMEOUT_MS, $transaction_timeout_ms); - } - - /** - * @param mixed $enable_idempotence - */ - public function setEnableIdempotence(mixed $enable_idempotence): void - { - $this->set(Constant::CONFIG_ENABLE_IDEMPOTENCE, $enable_idempotence); - } - - /** - * @param mixed $enable_gapless_guarantee - */ - public function setEnableGaplessGuarantee(mixed $enable_gapless_guarantee): void - { - $this->set(Constant::CONFIG_ENABLE_GAPLESS_GUARANTEE, $enable_gapless_guarantee); - } - - /** - * @param mixed $queue_buffering_max_messages - */ - public function setQueueBufferingMaxMessages(mixed $queue_buffering_max_messages): void - { - $this->set(Constant::CONFIG_QUEUE_BUFFERING_MAX_MESSAGES, $queue_buffering_max_messages); - } - - /** - * @param mixed $queue_buffering_max_kbytes - */ - public function setQueueBufferingMaxKbytes(mixed $queue_buffering_max_kbytes): void - { - $this->set(Constant::CONFIG_QUEUE_BUFFERING_MAX_KBYTES, $queue_buffering_max_kbytes); - } - - /** - * @param mixed $queue_buffering_max_ms - */ - public function setQueueBufferingMaxMs(mixed $queue_buffering_max_ms): void - { - $this->set(Constant::CONFIG_QUEUE_BUFFERING_MAX_MS, $queue_buffering_max_ms); - } - - /** - * @param mixed $linger_ms - */ - public function setLingerMs(mixed $linger_ms): void - { - $this->set(Constant::CONFIG_LINGER_MS, $linger_ms); - } - - /** - * @param mixed $message_send_max_retries - */ - public function setMessageSendMaxRetries(mixed $message_send_max_retries): void - { - $this->set(Constant::CONFIG_MESSAGE_SEND_MAX_RETRIES, $message_send_max_retries); - } - - /** - * @param mixed $retries - */ - public function setRetries(mixed $retries): void - { - $this->set(Constant::CONFIG_RETRIES, $retries); - } - - /** - * @param mixed $retry_backoff_ms - */ - public function setRetryBackoffMs(mixed $retry_backoff_ms): void - { - $this->set(Constant::CONFIG_RETRY_BACKOFF_MS, $retry_backoff_ms); - } - - /** - * @param mixed $queue_buffering_backpressure_threshold - */ - public function setQueueBufferingBackpressureThreshold(mixed $queue_buffering_backpressure_threshold): void - { - $this->set(Constant::CONFIG_QUEUE_BUFFERING_BACKPRESSURE_THRESHOLD, $queue_buffering_backpressure_threshold); - } - - /** - * @param mixed $compression_codec - */ - public function setCompressionCodec(mixed $compression_codec): void - { - $this->set(Constant::CONFIG_COMPRESSION_CODEC, $compression_codec); - } - - /** - * @param mixed $compression_type - */ - public function setCompressionType(mixed $compression_type): void - { - $this->set(Constant::CONFIG_COMPRESSION_TYPE, $compression_type); - } - - /** - * @param mixed $batch_num_messages - */ - public function setBatchNumMessages(mixed $batch_num_messages): void - { - $this->set(Constant::CONFIG_BATCH_NUM_MESSAGES, $batch_num_messages); - } - - /** - * @param mixed $batch_size - */ - public function setBatchSize(mixed $batch_size): void - { - $this->set(Constant::CONFIG_BATCH_SIZE, $batch_size); - } - - /** - * @param mixed $delivery_report_only_error - */ - public function setDeliveryReportOnlyError(mixed $delivery_report_only_error): void - { - $this->set(Constant::CONFIG_DELIVERY_REPORT_ONLY_ERROR, $delivery_report_only_error); - } - - /** - * @param mixed $dr_cb - */ - public function setDrCb(mixed $dr_cb): void - { - $this->set(Constant::CONFIG_DR_CB, $dr_cb); - } - - - /** - * @param mixed $sticky_partitioning_linger_ms - */ - public function setStickyPartitioningLingerMs(mixed $sticky_partitioning_linger_ms): void - { - $this->set(Constant::CONFIG_STICKY_PARTITIONING_LINGER_MS, $sticky_partitioning_linger_ms); - } - - -} diff --git a/kiri-kafka/Constant.php b/kiri-kafka/Constant.php deleted file mode 100644 index 9995bc1a..00000000 --- a/kiri-kafka/Constant.php +++ /dev/null @@ -1,163 +0,0 @@ -pid . ']'; - - return $name . '.' . 'Kafka Consumer ' . $this->kafkaConfig['topic']; - } - - - /** - * @param Process $process - * @throws \Exception - */ - public function onHandler(Process $process): void - { - $this->waite($process, $this->kafkaConfig); - } - - - /** - * @param Process $process - * @param array $kafkaServer - * @throws \Exception - */ - private function waite(Process $process, array $kafkaServer) - { - try { - [$config, $topic, $conf] = $this->kafkaConfig($kafkaServer); - if (empty($config) && empty($topic) && empty($conf)) { - return; - } - $objRdKafka = new Consumer($config); - $topic = $objRdKafka->newTopic($kafkaServer['topic'], $topic); - - $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); - do { - if ($this->checkProcessIsStop()) { - $this->exit(); - break; - } - $this->resolve($topic, $conf['interval'] ?? 1000); - } while (true); - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - } - - - /** - * @param ConsumerTopic $topic - * @param $interval - * @throws \Exception - */ - private function resolve(ConsumerTopic $topic, $interval) - { - try { - $message = $topic->consume(0, $interval); - if (!empty($message)) { - if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) { - $this->handlerExecute($message->topic_name, $message); - } else if ($message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - logger()->warning('No more messages; will wait for more'); - } else if ($message->err == RD_KAFKA_RESP_ERR__TIMED_OUT) { - logger()->error('Kafka Timed out'); - } else { - logger()->error($message->errstr()); - } - } - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - } - - - /** - * @param $topic - * @param $message - * @throws \Exception - */ - protected function handlerExecute($topic, $message) - { - go(function () use ($topic, $message) { - try { - $server = Kiri::app()->getSwoole(); - - $setting = $server->setting['worker_num']; - - /** @var KafkaProvider $container */ - $container = Kiri::getDi()->get(KafkaProvider::class); - $data = $container->getConsumer($topic); - if (!empty($data)) { - $server->sendMessage(new $data(new Struct($topic, $message)), random_int(0, $setting - 1)); - } - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - }); - } - - - /** - * @param $kafka - * @return array - * @throws \Exception - */ - private function kafkaConfig($kafka): array - { - try { - $conf = new Configuration(); - $conf->setRebalanceCb([$this, 'rebalanced_cb']); - $conf->setGroupId($kafka['groupId']); - $conf->setMetadataBrokerList($kafka['brokers']); - $conf->setSocketTimeoutMs(30000); - - if (function_exists('pcntl_sigprocmask')) { - pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); - $conf->setInternalTerminationSignal((string)SIGIO); - } - - $topicConf = new TopicConfig(); - $topicConf->setAutoCommitEnable(true); - $topicConf->setAutoCommitIntervalMs(100); - - //smallest:简单理解为从头开始消费, - //largest:简单理解为从最新的开始消费 - $topicConf->setAutoOffsetReset('smallest'); - $topicConf->setOffsetStorePath('kafka_offset.log'); - $topicConf->setOffsetStoreMethod('broker'); - - return [$conf, $topicConf, $kafka]; - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - return [null, null, null]; - } - } - - - /** - * @param KafkaConsumer $kafka - * @param $err - * @param array|null $partitions - * @throws Exception - * @throws \Exception - */ - public function rebalanced_cb(KafkaConsumer $kafka, $err, array $partitions = null) - { - if ($err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { - $kafka->assign($partitions); - } else if ($err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { - $kafka->assign(NULL); - } else { - throw new \Exception($err); - } - } - - -} diff --git a/kiri-kafka/KafkaClient.php b/kiri-kafka/KafkaClient.php deleted file mode 100644 index 590f6e93..00000000 --- a/kiri-kafka/KafkaClient.php +++ /dev/null @@ -1,161 +0,0 @@ -conf = di(Configuration::class); - $this->topicConf = di(TopicConfig::class); - $this->setConfig(); - } - - - /** - * @return TopicConfig - */ - public function getTopicConfig(): TopicConfig - { - return $this->topicConf; - } - - - /** - * @return Configuration - */ - public function getConfiguration(): Configuration - { - return $this->conf; - } - - - /** - * @throws ConfigException - */ - private function setConfig() - { - $config = Config::get('kafka.producers.' . $this->topic, null, true); - if (!isset($config['brokers'])) { - throw new ConfigException('Please configure relevant information.'); - } - $this->conf->setMetadataBrokerList($config['brokers']); - $this->conf->setGroupId($this->groupId); - $this->conf->setClientId(current(swoole_get_local_ip())); - $this->conf->setErrorCb(function ($kafka, $err, $reason) { - logger()->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); - }); - } - - - /** - * @param string $key - * @param string|array $params - * @param bool $isAck - * @throws Exception - */ - public function push(string $key, string|array $params, bool $isAck = false) - { - $this->sendMessage([$params], $key, $isAck); - } - - - /** - * @param string|null $key - * @param array $data - * @param bool $isAck - * @throws Exception - */ - public function batch(?string $key, array $data, bool $isAck = false) - { - $this->sendMessage($data, $key, $isAck); - } - - - /** - * @return Producer - * @throws Exception - */ - private function getProducer(): Producer - { - return Kiri::getDi()->get(Producer::class, [$this->conf]); - } - - - /** - * @param Producer $producer - * @param $topic - * @param $isAck - * @return ProducerTopic - */ - private function getProducerTopic(Producer $producer, $topic, $isAck): ProducerTopic - { - $this->topicConf->setRequestRequiredAcks($isAck ? '1' : '0'); - return $producer->newTopic($topic, $this->topicConf); - } - - - /** - * @param array $message - * @param string $key - * @param bool $isAck - * @throws Exception - */ - private function sendMessage(array $message, string $key = '', bool $isAck = false) - { - $producer = $this->getProducer(); - $producerTopic = $this->getProducerTopic($producer, $this->topic, $isAck); - if ($this->isAck) { - $this->flush($producer); - } - foreach ($message as $value) { - $producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key); - $producer->poll(0); - } - $this->flush($producer); - } - - - /** - * @param Producer $producer - */ - private function flush(Producer $producer) - { - while ($producer->getOutQLen() > 0) { - $result = $producer->flush(100); - if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { - break; - } - } - } - - -} diff --git a/kiri-kafka/KafkaImports.php b/kiri-kafka/KafkaImports.php deleted file mode 100644 index aef17eae..00000000 --- a/kiri-kafka/KafkaImports.php +++ /dev/null @@ -1,46 +0,0 @@ - false]); - if (($kafka['enable'] ?? false) == false) { - return; - } - $kafkaServers = Config::get('kafka.consumers', []); - if (empty($kafkaServers)) { - return; - } - /** @var Server $server */ - $server = $application->get('server'); - foreach ($kafkaServers as $kafkaServer) { - $server->addProcess(new Kafka($kafkaServer)); - } - } - -} diff --git a/kiri-kafka/KafkaProvider.php b/kiri-kafka/KafkaProvider.php deleted file mode 100644 index ba75f0d3..00000000 --- a/kiri-kafka/KafkaProvider.php +++ /dev/null @@ -1,43 +0,0 @@ -_topics[$topic])) { - return; - } - $this->_topics[$topic] = $handler; - } - - - /** - * @param string $topic - * @return mixed - */ - public function getConsumer(string $topic): mixed - { - return $this->_topics[$topic] ?? null; - } - -} diff --git a/kiri-kafka/Logger.php b/kiri-kafka/Logger.php deleted file mode 100644 index 83f1d89e..00000000 --- a/kiri-kafka/Logger.php +++ /dev/null @@ -1,115 +0,0 @@ -getLogger(); - $logger->debug($message); - } - - public function critical(mixed $message, array $context = array()) - { - // TODO: Implement critical() method. - var_dump(func_get_args()); - } - - /** - * @param string $message - * @param array $context - * @throws Exception - */ - public function error(mixed $message, array $context = array()) - { - $logger = Kiri::app()->getLogger(); - $logger->error($message); - } - - /** - * @param string $message - * @param array $context - * @throws Exception - */ - public function warning(mixed $message, array $context = array()) - { - $logger = Kiri::app()->getLogger(); - $logger->warning($message); - } - - /** - * @param string $message - * @param array $context - * @throws Exception - */ - public function notice(mixed $message, array $context = array()) - { - $logger = Kiri::app()->getLogger(); - $logger->info($message); - } - - /** - * @param string $message - * @param array $context - * @throws Exception - */ - public function info(mixed $message, array $context = array()) - { - $logger = Kiri::app()->getLogger(); - $logger->info($message); - } - - - /** - * @param string $message - * @param array $context - * @throws Exception - */ - public function debug(mixed $message, array $context = array()) - { - $logger = Kiri::app()->getLogger(); - $logger->debug($message); - } - - /** - * @param $level - * @param $message - * @param array $context - * @throws Exception - */ - public function log($level, mixed $message, array $context = array()) - { - $logger = Kiri::app()->getLogger(); - $logger->debug($message); - } - - -} diff --git a/kiri-kafka/Struct.php b/kiri-kafka/Struct.php deleted file mode 100644 index 7f447899..00000000 --- a/kiri-kafka/Struct.php +++ /dev/null @@ -1,36 +0,0 @@ -payload = swoole_unserialize($message->payload); - - $this->topic = $topic; - $this->offset = $message->offset; - $this->part = $message->partition; - $this->message = $message; - $this->value = $message->payload; - } - -} diff --git a/kiri-kafka/TopicConfig.php b/kiri-kafka/TopicConfig.php deleted file mode 100644 index 37e0c430..00000000 --- a/kiri-kafka/TopicConfig.php +++ /dev/null @@ -1,287 +0,0 @@ -set(Constant::TOPIC_CONF_REQUEST_REQUIRED_ACKS, $request_required_acks); - } - - - /** - * @param mixed|string $acks - * - * producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项: - * (1)acks=0: 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1; - * (2)acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。 - * (3)acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。 - * (4)其他的设置,例如acks=2也是可以的,这将需要给定的acks数量,但是这种策略一般很少用。 - */ - public function setAcks(int $acks): void - { - $this->set(Constant::TOPIC_CONF_ACKS, $acks); - } - - /** - * @param mixed|string $request_timeout_ms - * - * broker尽力实现request.required.acks需求时的等待时间,否则会发送错误到客户端 - */ - public function setRequestTimeoutMs(int $request_timeout_ms): void - { - $this->set(Constant::TOPIC_CONF_REQUEST_TIMEOUT_MS, $request_timeout_ms); - } - - /** - * @param mixed|string $message_timeout_ms - * - * 本地消息超时。此值仅在本地强制执行,并限制生成的消息等待成功传递的时间。0的时间是无限的。 - * 这是librdkafka用于传递消息(包括重试)的最长时间。 - * 超过重试计数或消息超时时发生传递错误。 - * 如果配置了transactional.id,则消息超时将自动调整为transaction.timeout.ms。 - */ - public function setMessageTimeoutMs(int $message_timeout_ms): void - { - $this->set(Constant::TOPIC_CONF_MESSAGE_TIMEOUT_MS, $message_timeout_ms); - } - - /** - * @param mixed|string $delivery_timeout_ms - * - * Alias for message.timeout.ms: Local message timeout. - * This value is only enforced locally and limits the time a produced message waits for successful delivery. - * A time of 0 is infinite. - * This is the maximum time librdkafka may use to deliver a message (including retries). - * Delivery error occurs when either the retry count or the message timeout are exceeded. - * The message timeout is automatically adjusted to transaction.timeout.ms if transactional.id is configured. - */ - public function setDeliveryTimeoutMs(int $delivery_timeout_ms): void - { - $this->set(Constant::TOPIC_CONF_DELIVERY_TIMEOUT_MS, $delivery_timeout_ms); - } - - /** - * @param mixed|string $queuing_strategy - * - * EXPERIMENTAL: subject to change or removal. - * DEPRECATED Producer queuing strategy. - * FIFO preserves produce ordering, while LIFO prioritizes new messages - */ - public function setQueuingStrategy(mixed $queuing_strategy): void - { - $this->set(Constant::TOPIC_CONF_QUEUING_STRATEGY, $queuing_strategy); - } - - /** - * @param mixed|string $produce_offset_report - * - * DEPRECATED No longer used. - */ - public function setProduceOffsetReport(bool $produce_offset_report): void - { - $this->set(Constant::TOPIC_CONF_PRODUCE_OFFSET_REPORT, $produce_offset_report); - } - - /** - * @param mixed|string $partitioner - * - * Partitioner: random - random distribution, - * consistent - CRC32 hash of key (Empty and NULL keys are mapped to single partition), - * consistent_random - CRC32 hash of key (Empty and NULL keys are randomly partitioned), - * murmur2 - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), - * murmur2_random - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. - * This is functionally equivalent to the default partitioner in the Java Producer.), - * fnv1a - FNV-1a hash of key (NULL keys are mapped to single partition), - * fnv1a_random - FNV-1a hash of key (NULL keys are randomly partitioned). - */ - public function setPartitioner(mixed $partitioner): void - { - $this->set(Constant::TOPIC_CONF_PARTITIONER, $partitioner); - } - - /** - * @param mixed|string $partitioner_cb - * - * Custom partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb()) - */ - public function setPartitionerCb(mixed $partitioner_cb): void - { - $this->set(Constant::TOPIC_CONF_PARTITIONER_CB, $partitioner_cb); - } - - /** - * @param mixed|string $msg_order_cmp - * - * EXPERIMENTAL: subject to change or removal. - * DEPRECATED Message queue ordering comparator (set with rd_kafka_topic_conf_set_msg_order_cmp()). - * Also see queuing.strategy. - * - */ - public function setMsgOrderCmp(mixed $msg_order_cmp): void - { - $this->set(Constant::TOPIC_CONF_MSG_ORDER_CMP, $msg_order_cmp); - } - - /** - * @param mixed|string $opaque - * - * Application opaque (set with rd_kafka_topic_conf_set_opaque()) - */ - public function setOpaque(mixed $opaque): void - { - $this->set(Constant::TOPIC_CONF_OPAQUE, $opaque); - } - - /** - * @param mixed|string $compression_codec - * - * Compression codec to use for compressing message sets. inherit = inherit global compression.codec configuration. - */ - public function setCompressionCodec(mixed $compression_codec): void - { - $this->set(Constant::TOPIC_CONF_COMPRESSION_CODEC, $compression_codec); - } - - /** - * @param mixed|string $compression_type - * - * Alias for compression.codec: compression codec to use for compressing message sets. - * This is the default value for all topics, may be overridden by the topic configuration property compression.codec. - */ - public function setCompressionType(mixed $compression_type): void - { - $this->set(Constant::TOPIC_CONF_COMPRESSION_TYPE, $compression_type); - } - - /** - * @param mixed|string $compression_level - * - * Compression level parameter for algorithm selected by configuration property compression.codec. - * Higher values will result in better compression at the cost of more CPU usage. - * Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; - * -1 = codec-dependent default compression level. - */ - public function setCompressionLevel(int $compression_level): void - { - $this->set(Constant::TOPIC_CONF_COMPRESSION_LEVEL, $compression_level); - } - - /** - * @param mixed|string $auto_commit_enable - * - * DEPRECATED [LEGACY PROPERTY: This property is used by the simple legacy consumer only. - * When using the high-level KafkaConsumer, the global enable.auto.commit property must be used instead]. - * If true, periodically commit offset of the last message handed to the application. - * This committed offset will be used when the process restarts to pick up where it left off. - * If false, the application will have to call rd_kafka_offset_store() to store an offset (optional). - * Offsets will be written to broker or local file according to offset.store.method. - */ - public function setAutoCommitEnable(bool $auto_commit_enable): void - { - $this->set(Constant::TOPIC_CONF_AUTO_COMMIT_ENABLE, $auto_commit_enable); - } - - /** - * @param mixed|string $enable_auto_commit - * - * DEPRECATED Alias for auto.commit.enable: [LEGACY PROPERTY: This property is used by the simple legacy consumer only. - * When using the high-level KafkaConsumer, the global enable.auto.commit property must be used instead]. - * If true, periodically commit offset of the last message handed to the application. - * This committed offset will be used when the process restarts to pick up where it left off. - * If false, the application will have to call rd_kafka_offset_store() to store an offset (optional). - * Offsets will be written to broker or local file according to offset.store.method. - */ - public function setEnableAutoCommit(bool $enable_auto_commit): void - { - $this->set(Constant::TOPIC_CONF_ENABLE_AUTO_COMMIT, $enable_auto_commit); - } - - /** - * @param mixed|string $auto_commit_interval_ms - * - * [LEGACY PROPERTY: This setting is used by the simple legacy consumer only. - * When using the high-level KafkaConsumer, the global auto.commit.interval.ms property must be used instead]. - * The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. - */ - public function setAutoCommitIntervalMs(int $auto_commit_interval_ms): void - { - $this->set(Constant::TOPIC_CONF_AUTO_COMMIT_INTERVAL_MS, $auto_commit_interval_ms); - } - - /** - * @param mixed|string $auto_offset_reset - * - * Action to take when there is no initial offset in offset store or the desired offset is out of range: - * 'smallest','earliest' - automatically reset the offset to the smallest offset, - * 'largest','latest' - automatically reset the offset to the largest offset, - * 'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. - */ - public function setAutoOffsetReset(mixed $auto_offset_reset): void - { - $this->set(Constant::TOPIC_CONF_AUTO_OFFSET_RESET, $auto_offset_reset); - } - - /** - * @param mixed|string $offset_store_path - * - * DEPRECATED Path to local file for storing offsets. - * If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. - * File-based offset storage will be removed in a future version. - */ - public function setOffsetStorePath(string $offset_store_path): void - { - $this->set(Constant::TOPIC_CONF_OFFSET_STORE_PATH, $offset_store_path); - } - - /** - * @param mixed|string $offset_store_sync_interval_ms - * - * DEPRECATED fsync() interval for the offset file, in milliseconds. - * Use -1 to disable syncing, and 0 for immediate sync after each write. - * File-based offset storage will be removed in a future version. - */ - public function setOffsetStoreSyncIntervalMs(int $offset_store_sync_interval_ms): void - { - $this->set(Constant::TOPIC_CONF_OFFSET_STORE_SYNC_INTERVAL_MS, $offset_store_sync_interval_ms); - } - - /** - * @param mixed|string $offset_store_method - * - * DEPRECATED Offset commit store method: - * 'file' - DEPRECATED: local file store (offset.store.path, et.al), - * 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.). - */ - public function setOffsetStoreMethod(mixed $offset_store_method): void - { - $this->set(Constant::TOPIC_CONF_OFFSET_STORE_METHOD, $offset_store_method); - } - - /** - * @param mixed|string $consume_callback_max_messages - * - * Maximum number of messages to dispatch in one rd_kafka_consume_callback*() - */ - public function setConsumeCallbackMaxMessages(int $consume_callback_max_messages): void - { - $this->set(Constant::TOPIC_CONF_CONSUME_CALLBACK_MAX_MESSAGES, $consume_callback_max_messages); - } - -} diff --git a/kiri-kafka/config.php b/kiri-kafka/config.php deleted file mode 100644 index 70b9db6d..00000000 --- a/kiri-kafka/config.php +++ /dev/null @@ -1,11 +0,0 @@ - [ - - '' - - - ] -]; diff --git a/p.php b/p.php index 2e83f8e8..0cf8b3d6 100644 --- a/p.php +++ b/p.php @@ -22,29 +22,24 @@ class Crontab public int $loopType = Crontab::LOOP_TYPE_MINUTE; - private int $startTime = 0; - - public int $loopTime = 2; - private int|string $year = 2021; + private int|string $month = '*'; + private int|string $day = '*'; - private int|string $month = 8; + private int|string $hour = '*'; - private int|string $day = 25; + private int|string $minute = '*/2'; - private int|string $hour = 19; + private int|string $second = '1-30'; - private int|string $minute = 02; - - private int|string $second = 32; + private int|string $week = '*'; public function __construct() { - $this->startTime = time(); } @@ -63,33 +58,30 @@ class Crontab public function next(): string { - if ($this->loopType == Crontab::LOOP_TYPE_YEAR) $this->year = '*/' . $this->loopTime; - if ($this->loopType == Crontab::LOOP_TYPE_MONTH) $this->month = '*/' . $this->loopTime; - if ($this->loopType == Crontab::LOOP_TYPE_DAY) $this->day = '*/' . $this->loopTime; - if ($this->loopType == Crontab::LOOP_TYPE_HOUR) $this->hour = '*/' . $this->loopTime; - if ($this->loopType == Crontab::LOOP_TYPE_MINUTE) $this->minute = '*/' . $this->loopTime; - if ($this->loopType == Crontab::LOOP_TYPE_SECOND) $this->second = '*/' . $this->loopTime; - - return sprintf('%s-%s-%s %s:%s:%s', - $this->format($this->year, 'Y'), - $this->format($this->month, 'm'), - $this->format($this->day, 'd'), - $this->format($this->hour, 'H'), - $this->format($this->minute, 'i'), - $this->format($this->second, 's') + $time = time(); + return sprintf('%s-%s-%s %s:%s:%s %s', + date('Y'), + $this->format($time, $this->month, 'm', 'month'), + $this->format($time, $this->day, 'd', 'day'), + $this->format($time, $this->hour, 'H', 'hour'), + $this->format($time, $this->minute, 'i', 'minute'), + $this->format($time, $this->second, 's', 'second'), + $this->format($time, $this->week, 'N'), ); } /** + * @param int $startTime * @param string $text * @param string $match + * @param string|null $format * @return string */ - private function format(string $text, string $match): string + private function format(int &$startTime, string $text, string $match, ?string $format = null): string { $time = date($match); - if ($text == '*') { + if ($text == '*' || $text == '*/1') { return $time; } if (str_contains($text, ',')) { @@ -103,13 +95,13 @@ class Crontab if (str_contains($text, '-')) { $explode = explode('-', $text); if ($time >= $explode[0] && $time <= $explode[1]) { - return intval($time) + 1; + return intval($time); } return '^'; } if (str_contains($text, '/')) { $explode = explode('/', $text); - if ($time % $this->loopTime !== 0) { + if ($time % $explode[1] !== 0) { return '^'; } if ($explode[0] != '*') { @@ -123,6 +115,13 @@ class Crontab } +//$date = date('Y-m-d H:i:s'); +//var_dump(date('Y-m-d H:i:s', strtotime('+10 month', strtotime($date)))); +//var_dump(date('Y-m-d H:i:s', strtotime('+10 day', strtotime($date)))); +//var_dump(date('Y-m-d H:i:s', strtotime('+10 hour', strtotime($date)))); +//var_dump(date('Y-m-d H:i:s', strtotime('+10 minute', strtotime($date)))); +//var_dump(date('Y-m-d H:i:s', strtotime('+10 second', strtotime($date)))); +//var_dump(date('Y-m-d H:i:s', strtotime('+10 week', strtotime($date)))); $c = new Crontab();