kafka.md

in with 0 comment

kafka

主要内容:

  1. maven配置
  2. application.properties的配置
  3. 生产者的使用
  4. 消费者的使用
  5. 参考文档
  6. 附件

详细内容:

请看下面配置:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
   </dependency>
   <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
 </dependency>


maven配置

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.0.8.RELEASE</version>
   <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
   <java.version>1.8</java.version>
</properties>
<dependencies>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
   </dependency>
   <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
   </dependency>
   <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka-test</artifactId>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.62</version>
   </dependency>
</dependencies>
<build>
   <plugins>
      <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
   </plugins>
</build>

application.properties的配置

#Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster.
spring.kafka.bootstrap-servers=10.80.152.211:9092
#ID to pass to the server when making requests. Used for server-side logging
spring.kafka.producer.client-id=produce1
#Unique string that identifies the consumer group to which this consumer belongs.
spring.kafka.consumer.group-id=personGroup

生产者的使用

/**
* kafka 生产服务
*
* @author song.cao
* @date 2019/11/8 19:48
*/
@Service
public class KafkaProducerService {
    private final KafkaTemplate kafkaTemplate;
    @Autowired
    public KafkaProducerService(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    /**
     * 将发送消息到kafka
     * <p>
     * 这里将消息message转化为json字符串传入kafka topic
     *
     * @param kafkaTopic kafka topic
     * @param message    发送的消息
     */
    public void sendMessage(String kafkaTopic, Object message) {
        kafkaTemplate.send(kafkaTopic, JSONObject.toJSONString(message));
    }
}

消费者的使用

/**
* kafka 消费服务
*
* @author song.cao
* @date 2019/11/8 19:49
*/
@Slf4j
@Service
public class KafkaConsumerService {
    @KafkaListener(topics = "test")
    public void processMessage(String content) {
        log.info("the content is: {}", content);
    }
}

参考文档

https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/
https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/
http://kafka.apache.org/documentation.html

[KAFKA 使用规范]

查看topic列表

./kafka-consumer-groups.sh --bootstrap-server 10.81.64.150:9092 --list

开通topic命令

./kafka-topics.sh --create --zookeeper 10.81.163.232:2181 --replication-factor 1 --partitions 5 --topic uke_stu_assign_source

删除topic

./kafka-topics.sh --delete --zookeeper 10.81.163.232:2181 --topic real_time_wx_message_new

消费kafka


./kafka-console-consumer.sh --bootstrap-server 10.81.64.150:9092 --topic zm_chat_socket_online_user_new_product

kafka重启命令

1.杀掉kafka进程
2.启动命令如下
/usr/kafka_2.11-1.0.0/bin/kafka-server-start.sh -daemon /usr/kafka_2.11-1.0.0/config/server.properties

[Kafka监控整理]

kafka监控:
ECS基础资源监控 CPU使用率 内存使用率 磁盘使用率 (80%告警通知)
应用关注点 topic生产流量 消费流量 分区数量 副本数量 集群状态 每个Consumer Group消息堆积情况(图表展现)

实例的监控项:
kafka单点应用存活监控(down状态告警通知)
kafka 集群状态监控(状态异常告警通知)
实例消息生产流量(bytes/s)(图表展示)
实例消息消费流量(bytes/s)(图表展示)

Topic 的监控项:
Topic 消息生产流量(bytes/s)(图表展示)
Topic 消息消费流量(bytes/s)(图表展示)

Consumer Group 的监控项:
Consumer Group 未消费消息总数(个)(图表展示)
Consumer Group id的连接IP地址 (图表展示)

[logstash优化]

一、性能故障排除
Logstash建议在修改配置项以提高性能的时候,每次只修改一个配置项并观察其性能和资源消耗(cpu、io、内存)。性能检查项包括:

1、检查input和output设备
日志存储的速度和它所连接的服务的速度一样快。日志存储只能像输入和输出目的地一样快速地消耗和生成数据!

1.1、检查系统指标

1)、CPU

启动后需要检查CPU是否被大量使用,检查命令top -H(检查所有的进程,特别关注logstash的进程使用资源情况)。如果CPU使用率一直很高(我遇到过cpu使用率一直为200%左右),则直接查询下面检查jvm堆内存信息。

2)、Memory

Logstash运行在java虚拟机上,所以依赖为其分配的堆内存。如果应用程序使用的总内存超过物理内存,则会导致内存交换(可参见es文档中的内存交换部分)。

3)、io

1.2、磁盘io

需要定时进行检查磁盘饱和度:Logstash的很多文件处理插件可能会导致磁盘饱和;由于配置或位置原因可能导致logstash写入大量的error日志,导致磁盘饱和;所以在linux上建议使用iostat, dstat等共计进行监控。

1.3、网络io

Logstash的input、output插件的执行可能导致大量的网络io的堆积,可以使用dstat or iftop等工具进行监控。

2、检查jvm堆
1)、如果堆大小过低,CPU利用率通常会通过屋顶,从而导致JVM不断地进行垃圾收集。

2)、快速检查这个问题的方法是将堆大小增加一倍,看看性能是否有所改善。不要将堆大小增加到物理内存的数量。至少为操作系统和其他进程留出1 GB的空间。

3)、使用专门的工具对jvm进行精确的监控

4)、一定要确保将最小(Xms)和最大值(Xmx)堆分配大小设置为相同的值,以防止堆在运行时调整大小,这是一个非常昂贵的过程(我使用yum安装之后的值就不相等,再加上分配的内存过小,导致CPU一致非常的高)。

3、检查工作线程设置
1)、可以使用-w以增加output和filter的工作线程数(尽量设置为cpu核数的倍数)。

2)、每个output插件都可以单独设置其线程数(提高插件细性能),但是设置值不要大于pipeline的工作线程数。

3)、可以配置pipeline.batch.size参数,比如当你使用了Elasticsearch的output插件的时候,经常会使用批量操作。

二、调优和剖析日志记录的性能
1、配置参数调优
可以通过调节以下配置项,对性能进行调优:

1)、pipeline.workers
该参数可控制output或filter插件的工作线程数(只能设置正整数),当发现事件正在备份或CPU没有饱和,则可以增加工作线程,以提高性能。

2)、pipeline.batch.size
设置批量执行event的最大值,该值是用于input的批量处理事件值,再打包发送给filter和output,增加该值可以在一定范围内提高性能,但是需要增加额外的内存开销。

3)、pipeline.batch.delay
批处理的最大等待值(input需要按照batch处理的最大值发送到消息队列,但是不能一直等,所以需要一个最大的超时机制)。

2、默认管道设置的修改建议
1)、管道中所有运行中的event都与配置参数:pipeline.workers和pipeline.batch.size有关,而间歇接收大型事件的管道需要足够的内存来处理这些峰值,需要相应的设置LS_HEAP_SIZE值。

2)、测量每一个参数的变化,以确保它增加而不是减少性能。

3)、确保logstash额外留足够的内存,以应对突然增加的事件消耗。

4)、一般pipeline.workers的数量要大于Cpu的核心数,因为output的io等待会消耗大量的空闲时间。

5)、Java中的线程有名称,您可以使用jstack、top和VisualVM图形工具来确定给定线程使用哪些资源。

6)、在Linux平台上,logstash用一些描述性的东西来标记所有的线程。例如,输入显示为基础。

3、JVM heap
堆内存对logstash来说的非常重要的,一般要求将初始值和最大值设置为一致(防止动态调整堆内存大小的消耗),并且需要对堆内存进行调整,调整的同时需要使用VisualVM工具来对堆进行检测。特别的Monitor 窗格,对于检查堆分配是否足以满足当前工作负载非常有用。

三、总结

  1. 升级logstash版本 1.7 -> 2.2
    2.2版本之后的logstash优化了input,filter,output的线程模型。
  2. 增大 filter和output worker 数量 通过启动参数配置 -w 48 (等于cpu核数)
    logstash正则解析极其消耗计算资源,而我们的业务要求大量的正则解析,因此filter是我们的瓶颈。官方建议线程数设置大于核数,因为存在I/O等待。考虑到我们当前节点同时部署了ES节点,ES对CPU要求性极高,因此设置为等于核数。
  3. 增大 woker 的 batch_size 150 -> 3000 通过启动参数配置 -b 3000
    batch_size 参数决定 logstash 每次调用ES bulk index API时传输的数据量,考虑到我们节点机256G内存,应该增大内存消耗换取更好的性能。
  4. 增大logstash 堆内存 1G -> 16G
    logstash是将输入存储在内存之中,worker数量 * batch_size = n * heap (n 代表正比例系数)

[Proxy]

username:

password:

proxy_ip: 172.21.164.78

port:

全局代理

$ vim /etc/profile
export http_proxy="http://username:password@proxy_ip:port"
export https_proxy="http://username:password@proxy_ip:port"
$ source /etc/profile

yum代理设置

$ vim /etc/yum.conf
# Proxy
proxy=http://username:password@proxy_ip:port/

wget的代理设置

$ vim /etc/wgetrc
# Proxy
http_proxy=http://username:password@proxy_ip:port/
https_proxy=http://username:password@proxy_ip:port/
ftp_proxy=http://username:password@proxy_ip:port/

npm的代理设置


npm config set proxy http://username:password@server:port
npm confit set https-proxy http://username:password@server:port

pip的代理设置


pip install -r requirements.txt --proxy=http://username:password@server:port

数据流:filebeat >> kafka >> gohangout >> Elasticsearch >> kibana

[Nginx 日志采集]

1.修改nginx.conf配置文件日志格式

http {
    log_format  jsonlog escape=json '{"@timestamp":"$time_iso8601","@source":"$server_addr","hostname":"$hostname","ip":"$http_x_forwarded_for","client":"$remote_addr","request_method":"$request_method","scheme":"$scheme","domain":"$server_name","referer":"$http_referer","request":"$request_uri","input_bytes":"$request_length","output_bytes":"$bytes_sent","status": $status,"responsetime":$request_time,"http_user_agent":"$http_user_agent","postdata":"$request_body","referer":"$http_referer"}';
 
    access_log  /var/log/nginx/access.log  jsonlog;

2.修改域名nginx的conf配置文件添加日志记录

server {
        listen 80;
        server_name app-gateway-op.zmlearn.com;
        #以下设置nginx日志以日期结尾
        if ($time_iso8601 ~ "^(\d{4})-(\d{2})-(\d{2})") {
            set $date $1$2$3;
        }
 
        access_log /opt/nginx_log/{appid}/app-gateway-op.zmlearn.com_access-$date.log jsonlog;
        # 命名规范: 域名.access-年月日.log

3.配置crond做定期的本地日志清理,本地保留三天日志

30 1 * * * find /opt/nginx_log/* -type f -mtime +3 -exec truncate -s 0 {} \;

tomcat日志采集统计接入架构日志切割包,链接如下

[评估服务器CPU性能脚本]

项目地址:

https://github.com/aliyun/byte-unixbench.git

直接跑shell脚本,脚本如下:

#! /bin/bash
#==============================================================#
# Description: Unixbench script, copy from :https://raw.githubusercontent.com/teddysun/across/master/unixbench.sh, change to v5.1.5 #
# Author: Teddysun <i@teddysun.com> #
# Intro: https://teddysun.com/245.html #
#==============================================================#
cur_dir=/opt/unixbench

# Check System
[[ $EUID -ne 0 ]] && echo 'Error: This script must be run as root!' && exit 1
[[ -f /etc/redhat-release ]] && os='centos'
[[ ! -z "`egrep -i debian /etc/issue`" ]] && os='debian'
[[ ! -z "`egrep -i ubuntu /etc/issue`" ]] && os='ubuntu'
[[ "$os" == '' ]] && echo 'Error: Your system is not supported to run it!' && exit 1

# Install necessary libaries
if [ "$os" == 'centos' ]; then
yum -y install make automake gcc autoconf gcc-c++ time perl-Time-HiRes
else
apt-get -y update
apt-get -y install make automake gcc autoconf time perl
fi

# Create new soft download dir
mkdir -p ${cur_dir}
cd ${cur_dir}

# Download UnixBench5.1.5
if [ -s UnixBench-5.1.5.tar.gz ]; then
echo "UnixBench-5.1.5.tar.gz [found]"
else
echo "UnixBench-5.1.5.tar.gz not found!!!download now..."
if ! wget -c https://github.com/aliyun/byte-unixbench/releases/download/v5.1.5/UnixBench-5.1.5.tar.gz; then
echo "Failed to download UnixBench-5.1.5.tar.gz, please download it to ${cur_dir} directory manually and try again."
exit 1
fi
fi
tar -zxvf UnixBench-5.1.5.tar.gz && rm -f UnixBench-5.1.5.tar.gz
cd UnixBench/

#Run unixbench
make
./Run

echo
echo
echo "======= Script description and score comparison completed! ======= "
echo
echo

脚本结果