Kafka: различия между версиями
FireWolf (обсуждение | вклад) |
FireWolf (обсуждение | вклад) (→Ссылки) |
||
(не показано 10 промежуточных версий этого же участника) | |||
Строка 26: | Строка 26: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-console-consumer.sh --bootstrap-server 172.18.0.3:9092 --topic telestat-v2-sink --from-beginning | ~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-console-consumer.sh --bootstrap-server 172.18.0.3:9092 --topic telestat-v2-sink --from-beginning | ||
</syntaxhighlight> | |||
= Информация о группах = | |||
https://kafka.apache.org/documentation/#basic_ops_consumer_lag | |||
Список групп <syntaxhighlight lang="bash">./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list</syntaxhighlight> | |||
Так получим список потребителей, которые используют the Java consumer API (non-ZooKeeper-based consumers): <syntaxhighlight lang="bash">./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group</syntaxhighlight> | |||
Так получим список потребителей, которые используют ZooKeeper (not those using the Java consumer API): <syntaxhighlight lang="bash">./kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group</syntaxhighlight> | |||
== Примеры == | |||
Нет потребителей в группе (либо сервис не работает, либо настроен не правильно): | |||
<syntaxhighlight lang="bash"> | |||
~/work/kafka_2.11-1.0.1/bin$ ./kafka-consumer-groups.sh --describe --group telestat8 --bootstrap-server 192.168.1.8:9092 | |||
Note: This will not show information about old Zookeeper-based consumers. | |||
Consumer group 'telestat8' has no active members. | |||
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID | |||
megafon-sink 0 23159 23163 4 - - - | |||
</syntaxhighlight> | </syntaxhighlight> | ||
Строка 35: | Строка 57: | ||
= Построение кластера = | = Построение кластера = | ||
Предварительно нужен настроенный zookeeper, в примере используется кворум (3 сервиса), настройка тут [[Zookeeper]]. | |||
Пример kafka-cluster.yml для одного хоста, у другого будет все аналогично, за исключением KAFKA_BROKER_ID и KAFKA_ADVERTISED_HOST_NAME | Пример kafka-cluster.yml для одного хоста, у другого будет все аналогично, за исключением KAFKA_BROKER_ID и KAFKA_ADVERTISED_HOST_NAME | ||
Строка 65: | Строка 90: | ||
- /opt/docker/kafka/:/kafka/ | - /opt/docker/kafka/:/kafka/ | ||
</syntaxhighlight> | </syntaxhighlight> | ||
= Удаление топика = | |||
./kafka-topics.sh --zookeeper localhost --delete --topic topic1 | |||
= Переназначение топиков = | |||
http://kafka.apache.org/documentation/#basic_ops_automigrate | |||
сформировать файл topic-jhipster.json (я это делал вручную на основании данных topic --describe, а проще с использованием команды kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate) | |||
<syntaxhighlight> | |||
{"version":1, | |||
"partitions":[{"topic":"topic-jhipster","partition":0,"replicas":[10,20]}] | |||
} | |||
</syntaxhighlight> | |||
А затем выполнить: | |||
<syntaxhighlight> | |||
./kafka-reassign-partitions.sh --execute --zookeeper 192.168.0.121,192.168.0.103 --reassignment-json-file topic-jhipster.json | |||
</syntaxhighlight> | |||
= Удаление топика помеченного MarkedForDeletion:true = | |||
./zkCli.sh --server 127.0.0.1:2183 | |||
rmr /brokers/topics/topic1 | |||
delete /admin/delete_topics/topic1 | |||
После этого можно будет пересоздать топик. | |||
= Ошибки = | |||
Если видим ошибку: IllegalArgumentException: Magic v1 does not support record headers, failedMessage | |||
это значит у нас старая версия kafka, а мы используем более новый формат с заголовками в протоколе kafka, которые не поддерживаются. | |||
Варианты решения: | |||
1. обновить kafka | |||
2. использовать встроенные заголовки в сообщение (для Spring Cloud Stream 2.x установить headerMode в embeddedHeaders или none) | |||
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/229 | |||
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/290#issuecomment-390942592 | |||
= Режимы работы = | |||
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o | |||
at-most-once - 0, 1 и могут быть повторы. | |||
at-least-once - 1 и могут быть повторы. | |||
exactly-once - точно 1 раз. | |||
= Настройка смещения для группы = | |||
<syntaxhighlight> | |||
./kafka-consumer-groups.sh --execute --reset-offsets --to-offset 23344 --topic my-sink --group group8 --bootstrap-server 192.168.1.8:9092 | |||
</syntaxhighlight> | |||
= Ссылки = | |||
http://kafka.apache.org/documentation.html#uses | |||
https://stackoverflow.com/questions/25452369/best-option-to-put-nginx-logs-into-kafka | |||
https://habr.com/company/tinkoff/blog/342892/ | |||
https://www.confluent.io/blog/5-things-every-kafka-developer-should-know/ | |||
[[Категория:Работа]] | [[Категория:Работа]] | ||
[[Категория:Java]] | [[Категория:Java]] | ||
[[Категория:Kafka]] | [[Категория:Kafka]] |
Текущая версия на 07:59, 27 августа 2021
Список каналов
~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-topics.sh --zookeeper 172.18.0.5:2181 --list
__consumer_offsets
telestat-v2-sink
topic-jhipster
Вместо прямых IP адресов можно использовать название сервиса:
~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-topics.sh --zookeeper telestat-zookeeper:2181 --list
Запись в канал
В консоли каждая строка будет отдельным сообщением в канал
~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-console-producer.sh --broker-list 172.18.0.3:9092 --topic telestat-v2-sink
Чтение из канала
~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-console-consumer.sh --bootstrap-server 172.18.0.3:9092 --topic telestat-v2-sink --from-beginning
Информация о группах
https://kafka.apache.org/documentation/#basic_ops_consumer_lag
Список групп
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Так получим список потребителей, которые используют the Java consumer API (non-ZooKeeper-based consumers):
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
Так получим список потребителей, которые используют ZooKeeper (not those using the Java consumer API):
./kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group
Примеры
Нет потребителей в группе (либо сервис не работает, либо настроен не правильно):
~/work/kafka_2.11-1.0.1/bin$ ./kafka-consumer-groups.sh --describe --group telestat8 --bootstrap-server 192.168.1.8:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'telestat8' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
megafon-sink 0 23159 23163 4 - - -
Статьи
http://dotsandbrackets.com/highly-available-kafka-cluster-docker-ru/
https://sematext.com/blog/monitoring-kafka-on-docker-cloud/
Построение кластера
Предварительно нужен настроенный zookeeper, в примере используется кворум (3 сервиса), настройка тут Zookeeper.
Пример kafka-cluster.yml для одного хоста, у другого будет все аналогично, за исключением KAFKA_BROKER_ID и KAFKA_ADVERTISED_HOST_NAME
Можно поменять порядок хостов в KAFKA_ZOOKEEPER_CONNECT.
KAFKA_BROKER_ID можно не указывать, он настроится автоматически.
version: '2'
services:
kafka-cluster:
container_name: kafka-cluster
image: wurstmeister/kafka:1.0.0
environment:
## NB! SET KAFKA_BROKER_ID UNIQUE PER HOST
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zoo1:2183,zoo2:2183,zoo3:2183
# KAFKA_CREATE_TOPICS: "telestat-v2-sink:1:2,prozvon-sink:1:2,teledemo-v2-sink:1:2"
KAFKA_LOG_DIRS: /kafka/kafka-logs
ports:
- 9092:9092
restart: always
extra_hosts:
- "zoo1:192.168.1.8"
- "zoo2:192.168.1.7"
- "zoo3:192.168.1.6"
volumes:
- /opt/docker/kafka/:/kafka/
Удаление топика
./kafka-topics.sh --zookeeper localhost --delete --topic topic1
Переназначение топиков
http://kafka.apache.org/documentation/#basic_ops_automigrate
сформировать файл topic-jhipster.json (я это делал вручную на основании данных topic --describe, а проще с использованием команды kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate)
{"version":1,
"partitions":[{"topic":"topic-jhipster","partition":0,"replicas":[10,20]}]
}
А затем выполнить:
./kafka-reassign-partitions.sh --execute --zookeeper 192.168.0.121,192.168.0.103 --reassignment-json-file topic-jhipster.json
Удаление топика помеченного MarkedForDeletion:true
./zkCli.sh --server 127.0.0.1:2183 rmr /brokers/topics/topic1 delete /admin/delete_topics/topic1
После этого можно будет пересоздать топик.
Ошибки
Если видим ошибку: IllegalArgumentException: Magic v1 does not support record headers, failedMessage это значит у нас старая версия kafka, а мы используем более новый формат с заголовками в протоколе kafka, которые не поддерживаются. Варианты решения: 1. обновить kafka 2. использовать встроенные заголовки в сообщение (для Spring Cloud Stream 2.x установить headerMode в embeddedHeaders или none)
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/229
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/290#issuecomment-390942592
Режимы работы
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
at-most-once - 0, 1 и могут быть повторы.
at-least-once - 1 и могут быть повторы.
exactly-once - точно 1 раз.
Настройка смещения для группы
./kafka-consumer-groups.sh --execute --reset-offsets --to-offset 23344 --topic my-sink --group group8 --bootstrap-server 192.168.1.8:9092
Ссылки
http://kafka.apache.org/documentation.html#uses
https://stackoverflow.com/questions/25452369/best-option-to-put-nginx-logs-into-kafka
https://habr.com/company/tinkoff/blog/342892/
https://www.confluent.io/blog/5-things-every-kafka-developer-should-know/