介绍
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
两个版本区别:
- flume-og(<0.10) 采用master结构 zookeeper管理 同一个线程读写操作,会阻塞
- flume-ng(>=0.10) 取消了master结构和zookeeper管理 不同的线程进行数据的读写操作
版本:
flume-ng-1.5.0-cdh5.3.6
flume结构:
以Agent为基本单位 一个Agent可以包括 source/channel/sink 三种组件都可以有多个
source:接收外部数据,传递给channel
avro thrifft exec kafka netcat
<agent-name>.sources=source_names
<agent-name>.sources.<source_name>.type=指定类型
<agent-name>.sources.<source_name>.channel=channels
...其他对应source类型需要的参数
channel:数据传输通道(数据传输和保存等功能)
memory jdbc kafka file
<agent-name>.channels=channel_names
<agent-name>.channels.<channel_name>.type=指定类型
...其他对应channel类型需要的参数
sink:发送flume接收到的数据按指定格式输出到目的地
hdfs hive file hbase avro thrift logger
<agent-name>.sinks=sink_names
<agent-name>.sinks.<sink_name>.type=指定类型
<agent-name>.sinks.<sink_name>.channel=<channel_name>
...其他对应sink类型需要的参数
三类结构:
单agent结构,多agent链式结构,多路复用agent结构
安装
下载地址
http://archive.cloudera.com/cdh5/cdh/5/
配置:
tar -zxvf flume-ng-1.5.0-cdh5.3.6.tar.gz
sudo ln -s /opt/softs/apache-flume-1.5.0-cdh5.3.6-bin/ flume
sudo vi /etc/profile
# FLUME
export FLUME_HOME=/opt/flume
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile
cp $FLUME_HOME/conf/flume-env.sh.template $FLUME_HOME/conf/flume-env.sh
vi $FLUME_HOME/conf/flume-env.sh
export JAVA_HOME=/opt/jdk
export JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
验证:
flume-ng version
eg1:netcat监听客户端的请求,使用memory channel作为数据的传输通道,使用logger sink打听监听到的信息
mkdir $FLUME_HOME/example
vi $FLUME_HOME/example/test1.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=netcat
a1.sources.r1.bind=192.168.113.111
a1.sources.r1.port=4444
a1.sources.r1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
启动命令:flume-ng agent --conf ./conf/ --conf-file ./example/test1.conf --name a1 -Dflume.root.logger=INFO,console
测试成功:
telnet 192.168.113.111 4444
若未安装:
rpm -qa telnet-server
yum install telnet-server
rpm -qa telnet
yum install telnet
service xinetd restart
eg2:nginx作为日志服务器,通过exec source监听nginx的日志文件,使用memory channel作为数据传输通道,使用hdfs sink将数据存储在hdfs上。
vi $FLUME_HOME/example/test2.conf
agent.sources=r1
agent.channels=c1
agent.sinks=k1
# common
agent.sources.r1.channels=c1
agent.sinks.k1.channel=c1
# sources config
agent.sources.r1.type=exec
agent.sources.r1.command=tail -F /var/log/nginx/80.log
# channels config
agent.channels.c1.type=memory
agent.channels.c1.capacity=1000
agent.channels.c1.transactionCapacity=1000
agent.channels.c1.byteCapacityBufferPercentage=20
agent.channels.c1.byteCapacity=1000000
agent.channels.c1.keep-alive=60
# sinks config
agent.sinks.k1.type=hdfs
agent.sinks.k1.channel=c1
agent.sinks.k1.hdfs.path=hdfs://192.168.113.111:9000/logs/%Y/%m/%d
agent.sinks.k1.hdfs.fileType=DataStream
agent.sinks.k1.hdfs.filePrefix=BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReolicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.sollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=10
agent.sinks.k1.hdfs.batchSize=1
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round=true
agent.sinks.k1.hdfs.roundValue=2
agent.sinks.k1.hdfs.roundUnit=minute
agent.sinks.k1.hdfs.useLocalTimeStamp=true
sudo vi /etc/nginx/nginx.conf
log_format lf '$remote_addr^A$msec^A$http_host^A$request_uri';
#include /etc/nginx/conf.d/*.conf;
server{
listen 80;
server_name lvmama,0.0.0.0;
location ~ .*(BfImg)\.(gif)$ {
default_type image/gif;
access_log /var/log/nginx/80.log lf;
root /etc/nginx/www/source;
}
}
sudo mkdir -p /etc/nginx/www/source
tail -F /var/log/nginx/80.log
将以下文件移动到$FLUME_HOME/lib下:
commons-configuration-1.6.jar
hadoop-common-2.5.0.jar
hadoop-hdfs-2.5.0.jar
hadoop-auth-2.5.0.jar
flume-ng agent --conf ./conf/ --conf-file ./example/test2.conf --name agent -Dflume.root.logger=INFO,console
eg3:监控目录
cp $FLUME_HOME/conf/flume-conf.properties.template $FLUME_HOME/conf/flume-conf.properties
vi $FLUME_HOME/conf/flume-conf.properties
# agent1表示代理名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
# 配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/usr/local/logs
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
# 配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/usr/local/logs_tmp_cp
agent1.channels.channel1.dataDirs=/usr/local/logs_tmp
# 配置sink1
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://sparkproject1:9000/logs
agent1.sinks.sink1.hdfs.fileType=DataStream
agent1.sinks.sink1.hdfs.writeFormat=TEXT
agent1.sinks.sink1.hdfs.rollInterval=1
agent1.sinks.sink1.channel=channel1
agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d
mkdir /usr/local/logs
hdfs dfs -mkdir /logs
flume-ng agent -n agent1 -c conf -f $FLUME_HOME/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console
新建一份文件,移动到/usr/local/logs目录下(flume会自动上传到HDFS的/logs目录)
问题总结:
问题
nginx open() “” failed (13: Permission denied), client
解决
问题
Failed to retrieve data from op=GET_BLOCK_LOCATIONS: No Transport
解决
问题
flume突然挂掉,hdfs中产生临死文件,跑MapReduce程序读该文件的时候,可能会出现异常!