Kafka

Материал из Home Wiki
Перейти к навигации Перейти к поиску

Категория:Работа

Список каналов

~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-topics.sh --zookeeper 172.18.0.5:2181 --list
__consumer_offsets
telestat-v2-sink
topic-jhipster

Вместо прямых IP адресов можно использовать название сервиса:

~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-topics.sh --zookeeper telestat-zookeeper:2181 --list


Запись в канал

В консоли каждая строка будет отдельным сообщением в канал

~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-console-producer.sh --broker-list 172.18.0.3:9092 --topic telestat-v2-sink

Чтение из канала

~/work/kafka_2.11-0.10.1.1/bin$ ./kafka-console-consumer.sh --bootstrap-server 172.18.0.3:9092 --topic telestat-v2-sink --from-beginning

Информация о группах

https://kafka.apache.org/documentation/#basic_ops_consumer_lag

Список групп

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Так получим список потребителей, которые используют the Java consumer API (non-ZooKeeper-based consumers):

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

Так получим список потребителей, которые используют ZooKeeper (not those using the Java consumer API):

./kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group

Примеры

Нет потребителей в группе (либо сервис не работает, либо настроен не правильно):

~/work/kafka_2.11-1.0.1/bin$ ./kafka-consumer-groups.sh  --describe --group telestat8  --bootstrap-server 192.168.1.8:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'telestat8' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
megafon-sink                   0          23159           23163           4          -                                                 -                              -

Статьи

http://dotsandbrackets.com/highly-available-kafka-cluster-docker-ru/

https://sematext.com/blog/monitoring-kafka-on-docker-cloud/

Построение кластера

Предварительно нужен настроенный zookeeper, в примере используется кворум (3 сервиса), настройка тут Zookeeper.

Пример kafka-cluster.yml для одного хоста, у другого будет все аналогично, за исключением KAFKA_BROKER_ID и KAFKA_ADVERTISED_HOST_NAME

Можно поменять порядок хостов в KAFKA_ZOOKEEPER_CONNECT.

KAFKA_BROKER_ID можно не указывать, он настроится автоматически.

version: '2'
services:
    kafka-cluster:
        container_name: kafka-cluster
        image: wurstmeister/kafka:1.0.0
        environment:
            ## NB! SET KAFKA_BROKER_ID UNIQUE PER HOST
            KAFKA_BROKER_ID: 1
            KAFKA_ADVERTISED_HOST_NAME: kafka1
            KAFKA_ADVERTISED_PORT: 9092
            KAFKA_ZOOKEEPER_CONNECT: zoo1:2183,zoo2:2183,zoo3:2183
            # KAFKA_CREATE_TOPICS: "telestat-v2-sink:1:2,prozvon-sink:1:2,teledemo-v2-sink:1:2"
            KAFKA_LOG_DIRS: /kafka/kafka-logs
        ports:
            - 9092:9092
        restart: always
        extra_hosts:
            - "zoo1:192.168.1.8"
            - "zoo2:192.168.1.7"
            - "zoo3:192.168.1.6"
        volumes:
            - /opt/docker/kafka/:/kafka/

Удаление топика

./kafka-topics.sh --zookeeper localhost --delete --topic topic1


Переназначение топиков

http://kafka.apache.org/documentation/#basic_ops_automigrate

сформировать файл topic-jhipster.json (я это делал вручную на основании данных topic --describe, а проще с использованием команды kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate)

{"version":1,
    "partitions":[{"topic":"topic-jhipster","partition":0,"replicas":[10,20]}]
}

А затем выполнить:

./kafka-reassign-partitions.sh --execute --zookeeper 192.168.0.121,192.168.0.103 --reassignment-json-file topic-jhipster.json


Удаление топика помеченного MarkedForDeletion:true

./zkCli.sh --server 127.0.0.1:2183 rmr /brokers/topics/topic1 delete /admin/delete_topics/topic1

После этого можно будет пересоздать топик.

Ошибки

Если видим ошибку: IllegalArgumentException: Magic v1 does not support record headers, failedMessage это значит у нас старая версия kafka, а мы используем более новый формат с заголовками в протоколе kafka, которые не поддерживаются. Варианты решения: 1. обновить kafka 2. использовать встроенные заголовки в сообщение (для Spring Cloud Stream 2.x установить headerMode в embeddedHeaders или none)

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/229

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/290#issuecomment-390942592

Режимы работы

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

at-most-once - 0, 1 и могут быть повторы.

at-least-once - 1 и могут быть повторы.

exactly-once - точно 1 раз.

Настройка смещения для группы

./kafka-consumer-groups.sh --execute --reset-offsets --to-offset 23344 --topic my-sink  --group group8  --bootstrap-server 192.168.1.8:9092

Ссылки

http://kafka.apache.org/documentation.html#uses

https://stackoverflow.com/questions/25452369/best-option-to-put-nginx-logs-into-kafka

https://habr.com/company/tinkoff/blog/342892/

https://www.confluent.io/blog/5-things-every-kafka-developer-should-know/