实时日志分析之三:Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统。在实时数据量比较大的时候,使用kafka作为缓冲是一个不错的选择。
文档:http://orchome.com/kafka/index

环境

  • 系统:centos7
  • ip:192.168.2.230
  • ip:192.168.2.231
  • ip:192.168.2.232

准备

优化linux内核参数
参考文章 实时日志分析之二:Openresty 其中的“优化linux内核参数”部分

安装JDK

1
sudo yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel

sudo vim /etc/profile 加入如下内容

1
2
3
4
5
JAVA_HOME=/usr/lib/jvm/java
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH

载入profile环境

1
source /etc/profile

验证是否安装成功

1
java -version

设置host
在192.168.2.230服务器运行

1
hostnamectl set-hostname kafka01

在192.168.2.231服务器运行

1
hostnamectl set-hostname kafka02

在192.168.2.232服务器运行

1
hostnamectl set-hostname kafka03

在每台服务器上 sudo vi /etc/hosts 加入如下内容

1
2
3
192.168.2.230 kafka01
192.168.2.231 kafka02
192.168.2.232 kafka03

关闭防火墙(可以选择使用iptables做相应规则)

1
sudo systemctl stop firewalld.service && sudo systemctl disable firewalld.service

安装

下载解压包(每台服务器都运行)

1
2
3
wget http://mirrors.hust.edu.cn/apache/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar -zxvf kafka_2.12-0.10.2.0.tgz
cd kafka_2.12-0.10.2.0

zookeeper配置(每台服务器都运行)
vim config/zookeeper.properties 修改相应内容

1
2
3
4
5
6
7
8
9
tickTime=2000
initLimit=20
syncLimit=10
dataDir=/data/zookeeper/dataDir
dataLogDir=/data/zookeeper/dataLogDir
clientPort=2181
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888

配置目录及文件(每台服务器都运行)

1
2
3
sudo mkdir -p /data/zookeeper/dataDir
sudo mkdir -p /data/zookeeper/dataLogDir
sudo chown -R vagrant:vagrant /data # 这里的用户名使用你当前的用户名(替换掉vagrant)

在192.168.2.230服务器运行

1
echo '1' >/data/zookeeper/dataDir/myid

在192.168.2.231服务器运行

1
echo '2' >/data/zookeeper/dataDir/myid

在192.168.2.232服务器运行

1
echo '3' >/data/zookeeper/dataDir/myid

kafka配置
在192.168.2.230服务器上 vim config/server.properties 修改如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://kafka01:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=6
replication.factor=2
num.recovery.threads.per.data.dir=1
log.retention.hours=120
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
zookeeper.connection.timeout.ms=6000

在192.168.2.231服务器上 vim config/server.properties 修改如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
broker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://kafka02:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=6
replication.factor=2
num.recovery.threads.per.data.dir=1
log.retention.hours=120
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
zookeeper.connection.timeout.ms=6000

在192.168.2.232服务器上 vim config/server.properties 修改如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
broker.id=3
delete.topic.enable=true
listeners=PLAINTEXT://kafka03:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=6
replication.factor=2
num.recovery.threads.per.data.dir=1
log.retention.hours=120
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
zookeeper.connection.timeout.ms=6000

配置目录(每台服务器都运行)

1
mkdir -p /data/kafka

启动

每台服务器运行

1
2
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

启动完成后,可以在logs中查看集群是否启动成功

kafka的简单操作参照:

参考:http://orchome.com/6

  • 创建topic(创建名为test的topic,只有一个分区和一个备份)

    1
    bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test
  • 查看已创建的topic

    1
    bin/kafka-topics.sh --list --zookeeper kafka01:2181
  • 手动模拟producer

    1
    bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test
  • 手动模拟consumer

    1
    bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topic test --from-beginning
  • 查看topic详细信息

    1
    bin/kafka-topics.sh --describe --zookeeper kafka01:2181 --topic test
  • 查看group的lag等重要信息

    1
    bin/kafka-consumer-groups.sh --bootstrap-server=kafka01:9092 --describe --group=consumer_group01

bin目录下还有很多的脚本工具,可以使用类似“bin/kafka-consumer-groups.sh –help”命令来查看用法

python sdk

安装依赖:

1
pip install kafka-python

文档:https://kafka-python.readthedocs.io/en/master/usage.html

这里简单介绍一下使用python来操作kafka,做一个常用的producer和consumer

  • producer.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    # -*- coding: utf-8 -*-
    import msgpack
    from kafka import KafkaProducer
    #### 初始化
    producer = KafkaProducer(
    # 特别注意:在生产者或者消费者代码机器上必须在/etc/hosts文件中加入'kafka01','kafka02','kafka03'(保证链接没问题)
    bootstrap_servers=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'], # kafka集群
    client_id='kafka-python-producer-01', # 生产者此连接客户端的唯一标志,主要用于kafka集群log记录(可不提供,Default: ‘kafka-python-producer-#’)
    value_serializer=msgpack.dumps, # 对send的数据进行序列化(据说msgpack效率比json快10倍)
    # 0: Producer will not wait for any acknowledgment from the server.
    # 1: Wait for leader to write the record to its local log only.
    # 'all': Wait for the full set of in-sync replicas to write the record.
    acks=1, #(可不提供, default: 1 )
    compression_type='gzip', # 压缩方式,如果数据量大或者批量操作的时候,最好加上(default: None)
    batch_size=16384, # 批量buffers,越大的话需要消耗机器的内存越大,但是每次批量发送的数据越多(default: 16384)
    # This setting defaults to 0 (i.e. no delay). Setting linger_ms=5 would have the effect of reducing the number of
    # requests sent but would add up to 5ms of latency to records sent in the absense of load. Default: 0.
    linger_ms=5, # 延迟发送毫秒数。减少生产者发送次数,提高批量发送的“量”
    max_request_size=1048576, # 一次请求(批量发送)的最大数据量( Default: 1048576)
    )
    #### 发送数据
    for i in xrange(10):
    producer.send('topic01', {'name':'jack%s' % i, 'age':i})
    producer.close()
  • consumer.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    # -*- coding: utf-8 -*-
    import signal
    import msgpack
    from kafka import KafkaConsumer
    class GracefulKiller:
    kill_now = False
    def __init__(self):
    signal.signal(signal.SIGINT, self.exit_gracefully)
    signal.signal(signal.SIGTERM, self.exit_gracefully)
    def exit_gracefully(self,signum, frame):
    self.kill_now = True
    def main():
    killer = GracefulKiller()
    ####################################
    # 从topic的结束位置进行消费(使用group组) #
    ####################################
    consumer = KafkaConsumer(
    'topic01', # 消费的topic(可订阅多个: 'topic01', 'topic02', ...)
    # 特别注意:在生产者或者消费者代码机器上必须在/etc/hosts文件中加入'kafka01','kafka02','kafka03'(保证链接没问题)
    bootstrap_servers=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'], # kafka集群
    client_id='kafka-python-consumer-01', # 消费者此连接客户端的唯一标志,主要用于kafka集群log记录(可不提供,Default: ‘kafka-python-{version}’)
    group_id='consumer_group01', # 消费者组
    value_deserializer=msgpack.loads, # 对send的数据进行反序列化(据说msgpack效率比json快10倍)
    enable_auto_commit=True, # If True , the consumer’s offset will be periodically committed in the background. Default: True.
    auto_commit_interval_ms=5000, # Number of milliseconds between automatic offset commits, if enable_auto_commit is True. Default: 5000
    max_poll_records=500, # The maximum number of records returned in a single call to poll(). Default: 500
    )
    # 第一种调用方式:循环
    # for msg in consumer:
    # print msg
    while True:
    # 第二种调用方式:使用poll(),每timeout_ms拿一次数据(推荐)
    poll_data = consumer.poll(timeout_ms=1000)
    print poll_data
    if killer.kill_now:
    consumer.close() # 会先commit然后再断开连接
    break
    if __name__ == '__main__':
    main()
    print "End of the program. I was killed gracefully :)"

先在一个窗口中运行python consumer.py
在另外一个窗口中运行python producer.py
可以在第一个窗口中看到消费到的消息

kafka监控

这里介绍一款kafka的监控,kafka eagle
官方的几款监控都不是太满意,这一款监控还是相当不错,而且在不断更新,期间遇到bug还求助了作者,很快就得到回复,给作者点赞。

坚持原创技术分享,您的支持将鼓励我继续创作!

热评文章