基于ELK和Python搭建简单的监控告警系统

简介

在做完支付系统后,我搭建了两套监控系统。
一套是点评的CAT,主要用于代码级的实时统计和历史统计以及异常监控、主机监控等,如DAO、CACHE、MONGO、RPC、URL执行时间统计,应用HTTP轮询监控等,当然CAT的功能并不仅限于此,我们只是用了很少的一部分。
还有一套日志监控系统,基于非常流行的ELK Stack,也就是ElasticSearch + Logstash + Kibana,本文主要说一说这套系统的玩法。

系统架构

ELK Python监控

整个架构还是比较简单的,Logstash负责收集日志数据并推送到Kafka集群中做缓冲,再由另一套Logstash从Kafka集群中取出数据推入到Elasticsearch中存储。Python程序每隔一段时间根据关键字查询上一段时间是否有错误日志产生,如果有,就发送邮件和微信告警。

下面分别说一说各个组件使用上的一些问题。由于网上已经有很多基础讲解、权威指南的内容,我会尽量只讲干货。

Logstash

使用版本2.3.4。
Logstash(Github,Docs)用于收集日志。在file中指定tagstype主要是为了数据打到ES时的搜索。sincedb_path是记录日志读取位置的文件,使用sincedb_write_interval控制写入文件的间隔,默认是15秒。codec => multiline用于错误堆栈信息的合并,比如下边的错误日志,就会合并成一个消息。

2017061904514678558975|2017-06-19 04:51:53,043|DubboServerHandler:4431-thread-1372|扣款请求异常:
java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
    at sun.security.ssl.InputRecord.read(InputRecord.java:480)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)

贴出部分logstash的配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 从文件收集应用日志,推送到kafka
input {
file {
tags => ["pay","site"]
path => ["/data/www/logs/pay/biz/pay-failure-0?.log","/data/www/logs/pay/biz/pay-success-0?.log"]
type => "biz"
sincedb_path => "/tmp/logstash_p_sincedb_biz"
}
file {
tags => ["pay","site"]
path => ["/data/www/logs/pay/debug/debug-0?.log"]
type => "debug"
sincedb_path => "/tmp/logstash_p_sincedb_debug"
codec => multiline {
pattern => "^[\d|\\|]"
negate => true
what => "previous"
max_lines => 500
multiline_tag => ["exception_stack"]
}
}
}
output {
kafka {
bootstrap_servers => "kafka1.host:9092,kafka2.host:9092,kafka3.host:9092"
topic_id => "pay_log_topic"
}
}

启动方式如下,-r –reload-interval 30 表示每30秒扫描一次配置文件,如果有改动就重新加载配置文件。

1
nohup /usr/local/logstash-2.3.4/bin/logstash agent -l /usr/local/logstash-2.3.4/logstash_pay.log -r --reload-interval 30 -f /usr/local/logstash-2.3.4/config/pay_to_kafka.conf >/dev/null 2>&1 &

在收集日志时,也要做好日志的分类工作,哪些需要存储,哪些不需要,哪些日志该用什么标签区分出来,都需要考虑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 从kafka取日志,推送到ES
input {
kafka {
zk_connect => "kafka1.host:2181,kafka2.host:2181,kafka3.host:2181"
consumer_threads => 1
topic_id => "pay_log_topic"
# 注意同一个topic要使用同一个group_id,否则会导致重复消息(类似发布订阅模式)
group_id => "logstash_pay"
type => "pay"
}
}
output {
elasticsearch {
hosts => ["es.host"]
# es索引格式
index => "pay-%{+YYYY.MM.dd}"
idle_flush_time => 5
workers => 2
}
}

启动方式和收集应用日志的logstash类似。

Kafka

使用版本为2.10-0.8.2.2。
使用Kafka集群做数据缓冲,主要是考虑到网络波动和组件重启可能导致的服务不可用,尤其是ES和业务服务器分布在不同机房,需要走外网VPN做数据交互,网络问题的出现概率就大大增加了。
目前我们用于日志处理的Kafka和Zookeeper使用3台服务器,每个topic分别3 partitions、3 replications。

Kafka在使用上没有遇到太多问题,暂略。

ElasticSearch

使用版本为2.3.5。
由于服务器资源有限,ES暂时只使用一台服务器做数据节点。目前我们收集的日志量不是很多,每天大概20G的3000W+条日志。可以使用ES的插件Elastic HQ查看ES的内部状态。
ES在上手使用上非常简单,配置文件稍微修改就可以使用。
我们遇到的问题:

  • 为了性能上的考虑,最好使用bootstrap.mlockall: true来锁住内存避免使用swap。
  • ES会占用较多的文件句柄数,在我们的系统中是5W+,所以需要调整操作系统的文件句柄数到比较大的值,官方建议65536或以上。可通过ulimit -n 65536/etc/security/limits.conf调整。
  • ES会占用较多的内存,建议分配较多内存。我现在使用的参数是-Xms12g -Xmx24g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintClassHistogram -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:+DisableExplicitGC。如果服务器是ES专用,可以把Xms与Xmx设置成一样的。堆内存设置的太小,会导致频繁GC,严重影响性能。GC优先使用CMS,降低GC导致的应用停顿时间。

Kibana

使用版本为4.5.4。
Kibana的作用是方便用户和ES交互。
修改config/kibana.yml,将ES地址填写到elasticsearch.url后,使用nohup bin/kibana &启动即可。

数据的存储和展示都做好了,下边就是监控的部分。

Python监控程序

使用apscheduler作为定时器,定时做监控任务。除了对错误日志的监控,还写了对activemq队列消息积压情况、JAVA应用线程数的监控(基于jolokia)。这次只说日志监控。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from apscheduler.schedulers.blocking import BlockingScheduler
def job_monitor_es_error_pay():
try:
monitor_es_error.monitor()
except Exception, e:
handle_error('monitor_es_error_pay', e)
if __name__ == '__main__':
# 尝试获取锁再执行。自己实现的分布式锁,避免监控系统单点故障,主要使用https://github.com/glasslion/redlock
master_lock_util.get_lock_block()
# 创建阻塞型定时器
sched = BlockingScheduler()
# es 错误日志
es_interval = props['es_err']['interval'] # default 180
sched.add_job(job_monitor_es_error_pay, 'interval', seconds=es_interval)
sched.start()

monitor_es_error.monitor()的具体代码就不贴出来了,只说下逻辑:

  1. 获取本次查询的时间段,本次的开始时间为上次执行的结束时间,结束时间为当前时间。
  2. 确定ES索引,es的索引在每天8点才会新建(不确定其他版本是否规则相同),所以当天8点前是要查询昨天的索引的。查询关键字:tags:exception_stack AND type:error AND @timestamp:['+str(time_range[0])+' TO '+str(time_range[1])+']'。tags和type都是之前logstash推送时加入的标签,error表示错误日志,exception_stack是带有错误堆栈的日志。还可以通过AND -"关键字"的方式排除不想查找的关键字。@timestamp确定了时间段。
  3. 根据查询出的结果,选择是否报警,可选择微信或邮件报警。邮件中将前100个错误的每个错误堆栈信息前50行打印。微信只做摘要,将错误信息截取发送。邮件模板使用jinja2
  4. 发送邮件时,带有kibana的url,负责人可直接访问kibana获取相关错误信息。也可以根据错误的requestId将所有相关日志查询出来,便于分析。

总结

这套监控系统上线半年多了,虽然可能看起来简陋无比,但对于我们来说,是真正从两眼一抹黑根据用户反馈处理问题的原始部落时代进步到了透明化近乎实时地处理和响应问题的新石器时代。不仅仅是对于用户相关的问题,系统中很多隐患和重大问题,通常都会有一些前兆,只有更主动地发现问题,才能在越来越复杂地业务场景面前立于不败之地。
依据这套系统修复的bug已数不胜数(谁还没写点bug :P),做出的系统优化也非常多。比如GC时间过长问题、支付网关http连接池问题、rpc调用瞬时异常、activemq集群阻塞不消费等等问题。
因为不属于公司要求的项目,所有这些都是我用业余时间一点点做起来的。公司的服务器资源也有限,所有的服务器都是复用的,连监控程序也是跑在测试环境的,为了高可用,才做的分布式锁避免单点故障。
先到这吧。