flume安装与配置
flume安装与配置
小植配置前必看
本帖的主要目的是用最平实的语言描述配置的进程,同时在每个章节为可能涉及的知识点提供相应的网站以供读者学习。最重要的还是给我自己留个配置的日记。这样Linux玩崩了也能找到回家的路。
flume概述
- Apache Flume是一个分布式,可靠且可用的系统,用于有效地从许多不同的source收集,聚合和移动大量日志数据到集中式数据存储。
- Apache Flume的使用不仅限于日志数据聚合。由于数据source是可定制的,因此Flume可用于传输大量event 数据,包括但不限于网络流量数据,社交媒体生成的数据,电子邮件消息以及几乎任何可能的数据source。
- Apache Flume是Apache Software Foundation的顶级项目。
flume的架构
- 数据流模型
Flume event 被定义为具有字节有效负载和可选字符串属性集的数据流单元。Flume agent 是一个(JVM)进程,它承载event 从外部source流向下一个目标(跃点)的组件。 - 交互过程
Flume source消耗由外部 source(如Web服务器)传递给它的 event 。外部source以目标Flume source识别的格式向Flume发送event 。例如,Avro Flume source可用于从Avro客户端或从Avrosink发送event 的流中的其他Flume agent 接收Avroevent 。可以使用Thrift Flume Source定义类似的流程,以接收来自Thrift Sink或Flume Thrift Rpc客户端或Thrift客户端的event ,这些客户端使用Flume thrift协议生成的任何语言编写。当Flume source接收event 时,它将其存储到一个或多个channels 。该channel是一个被动存储器,可以保持event 直到它被Flume sink消耗。文件channel就是一个例子 - 它由本地文件系统支持。sink从channel中移除event 并将其放入外部存储库(如HDFS(通过Flume HDFS sink))或将其转发到流中下一个Flume agent (下一跳)的Flume source。给定 agent 中的source和sink与channel中暂存的event 异步运行。 - 复杂的流程
Flume允许用户构建多跳流,其中event 在到达最终目的地之前经过多个 agent 。它还允许fan-in 和fan-out,上下文路由和故障跳跃的备份路由(故障转移)。 - 可靠性
event 在每个 agent 的channel中进行。然后将event 传递到流中的下一个 agent 或终端存储库(如HDFS)。只有将event 存储在下一个 agent 的channel或终端存储库中后,才会从channel中删除这些event 。这就是Flume中的单跳消息传递语义如何提供流的端到端可靠性。
Flume使用事务方法来保证event 的可靠传递。source和sink分别在事务中封装由channel 提供的事务中放置或提供的event 的存储/检索。这可确保event 集在流中从一个点到另一个点可靠地传递。在多跳流的情况下,来自前一跳的sink和来自下一跳的source都运行其事务以确保数据安全地存储在下一跳的channel 中。 - 可恢复性
event 在channel中进行,该channel管理从故障中恢复。Flume支持由本地文件系统支持的持久文件channel。还有一个内存channel,它只是将event 存储在内存中的队列中,这更快,但是当 agent 进程死亡时仍然留在内存channel中的任何event 都无法恢复。
配置步骤
配置flume
下载地址:https://flume.apache.org/download.html
- 解压至指定目录
1
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers
- 更改解压包名
1
mv apache-flume-1.6.0-bin flume-1.6.0
- 别忘了给
flume
目录权限,我这里直接给主目录全部权限1
chmod -R 777 /export
- 进入
flume
的conf
里1
cd /export/servers/flume-1.6.0/conf
- 修改
flume-env.sh
文件,修改之前先把模板flume-env.sh.template
复制并更名1
cp flume-env.sh.template flume-env.sh
vim flume-env.sh
进去将export JAVA_OPTS
那行的注释去掉1
vim flume-env.sh
测试采集日志信息上传HDFS上
- 进到flume目录下
1
sudo vi dir-hdfs.conf
- 插入以下内容,注意格式注意
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49#定义三大组件的名称
ag1.sources = source1
ag1.sinks = sink1
ag1.channels = channel1
#配置source组件
ag1.sources.source1.type = spooldir
ag1.sources.source1.spoolDir = /home/hadoop/log/
ag1.sources.source1.fileSuffix=.FINISHED
ag1.sources.source1.inputCharset=utf-8
ag1.sources.source1.deserializer.maxLineLength=5120
#配置sink组件
ag1.sinks.sink1.type = hdfs
ag1.sinks.sink1.hdfs.path = hdfs://192.168.100.128:9000/access_log/%y-%m-%d/%H-%M
ag1.sinks.sink1.hdfs.filePrefix = app_log
ag1.sinks.sink1.hdfs.fileSuffix = .log
ag1.sinks.sink1.hdfs.batchSize= 100
ag1.sinks.sink1.hdfs.fileType = DataStream
ag1.sinks.sink1.hdfs.writeFormat = Text
## roll:滚动切换:控制写文件的切换规则
##按文件体积(字节)来切
ag1.sinks.sink1.hdfs.rollSize = 512000
##按event条数切
ag1.sinks.sink1.hdfs.rollCount = 1000000
##按时间间隔切换文件
ag1.sinks.sink1.hdfs.rolllnterval = 60
##控制生成目录的规则(round回滚)
#启用回滚
ag1.sinks.sink1.hdfs.round = true
#回滚值
ag1.sinks.sink1.hdfs.roundValue = 10
#单位
ag1.sinks.sink1.hdfs.roundUnit = minute
#是否使用本地时间戳
ag1.sinks.sink1.hdfs.useLocalTimeStamp = true
# channel组件配置
ag1.channels.channel1.type = memory
## event条数
ag1.channels.channel1.capacity = 500000
##flume事务控制所需要的缓存容量600条event
ag1.channels.channel1.transactionCapacity = 600
#绑定source、channel和sink之间的连接
ag1.sources.source1.channels = channel1
ag1.sinks.sink1.channel = channel1
红框内的一定要和hadoop
的core-site.xml
写的端口号一样,ip地址填写自己实际的 - 启动hadoop集群
1
start-all.sh
- 进入
flume
根目录输入以下命令1
bin/flume-ng agent -c conf/ -f dir-hdfs.conf -n ag1 -Dflume.root.logger=INFO,console
出现这个不是卡,而是启动成功了
测试能否采集信息到hdfs - 进入网页查看,hdfs已经创建了名为的目录
采集到的信息成功上传到hdfs
点击进入下载查看
观察flume中日志信息,查看采集后的文件是否加了后缀 .FINISHED
flume实例
- 创建项目
项目结构
创建类包com.mapreduce
(类包名可自定义)然后再里面新建Java类 - 导入相关jar包
- 编程代码
1
2
3
4
5
6
7
8
9
10
11
12public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42package com.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
public class WordCountAPP {
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39package com.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountSubmitter {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountAPP.class);
job.setMapperClass(WordCountAPP.WordCountMapper.class);
job.setCombinerClass(WordCountAPP.WordCountReducer.class);
job.setReducerClass(WordCountAPP.WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
} - 将项目打包成jar包
- 导出jar包
当进度条走完之后,在项目的目录中会多出一个out文件夹,里面存放的就是使用IDEA导出的jar包
上传jar包,这里我用最“暴力省时”的方法,直接用xftp
工具拖过去 - 编辑测试数据并移动到
log
目录下
测试前请自行检查是否已经成功开启flume1
vim testFile
此时flume提示上传成功1
mv testFile /home/hadoop/log
在data目录下输入jar命令:1
hadoop jar flume_test.jar com.mapreduce.WordCountSubmitter /access_log/22-07-26/21-40/app_log.1658843108283.log /flume/output
flume_test.jar
: 所执行的jar包com.mapreduce.WordCountSubmitter
: main类/access_log/22-07-26/21-40/app_log.1658843108283.log
: 所分析文件的路径/flume/output
: 分析结果输出的路径
执行成功图:
查看结果
评论
匿名评论隐私政策
✅ 你无需删除空行,直接评论以获取最佳展示效果