flume安装与配置

配置前必看

本帖的主要目的是用最平实的语言描述配置的进程,同时在每个章节为可能涉及的知识点提供相应的网站以供读者学习。最重要的还是给我自己留个配置的日记。这样Linux玩崩了也能找到回家的路。

  1. 本帖主要涉及的目录一共有三个,如有需要,在根目录/下创建export目录存放以下三个目录(建议)。(解压目录)servers、(软件安装目录)software、(数据/日志目录)data
  2. 本帖基于jdk1.8.0_181hadoop-2.7.5配置,因此配置本帖软件请先安装配置好前置软件
  3. 如若配置中报错,请第一时间查看前置软件是否正确安装配置并正常运行其次查看是否兼容性问题。

flume概述

  1. Apache Flume是一个分布式,可靠且可用的系统,用于有效地从许多不同的source收集,聚合和移动大量日志数据到集中式数据存储。
  2. Apache Flume的使用不仅限于日志数据聚合。由于数据source是可定制的,因此Flume可用于传输大量event 数据,包括但不限于网络流量数据,社交媒体生成的数据,电子邮件消息以及几乎任何可能的数据source。
  3. Apache Flume是Apache Software Foundation的顶级项目。
flume的架构
  1. 数据流模型
      Flume event 被定义为具有字节有效负载和可选字符串属性集的数据流单元。Flume agent 是一个(JVM)进程,它承载event 从外部source流向下一个目标(跃点)的组件。
    image-20220726155701240
  2. 交互过程
      Flume source消耗由外部 source(如Web服务器)传递给它的 event 。外部source以目标Flume source识别的格式向Flume发送event 。例如,Avro Flume source可用于从Avro客户端或从Avrosink发送event 的流中的其他Flume agent 接收Avroevent 。可以使用Thrift Flume Source定义类似的流程,以接收来自Thrift Sink或Flume Thrift Rpc客户端或Thrift客户端的event ,这些客户端使用Flume thrift协议生成的任何语言编写。当Flume source接收event 时,它将其存储到一个或多个channels 。该channel是一个被动存储器,可以保持event 直到它被Flume sink消耗。文件channel就是一个例子 - 它由本地文件系统支持。sink从channel中移除event 并将其放入外部存储库(如HDFS(通过Flume HDFS sink))或将其转发到流中下一个Flume agent (下一跳)的Flume source。给定 agent 中的source和sink与channel中暂存的event 异步运行。
  3. 复杂的流程
      Flume允许用户构建多跳流,其中event 在到达最终目的地之前经过多个 agent 。它还允许fan-in 和fan-out,上下文路由和故障跳跃的备份路由(故障转移)。
  4. 可靠性
      event 在每个 agent 的channel中进行。然后将event 传递到流中的下一个 agent 或终端存储库(如HDFS)。只有将event 存储在下一个 agent 的channel或终端存储库中后,才会从channel中删除这些event 。这就是Flume中的单跳消息传递语义如何提供流的端到端可靠性。
      Flume使用事务方法来保证event 的可靠传递。source和sink分别在事务中封装由channel 提供的事务中放置或提供的event 的存储/检索。这可确保event 集在流中从一个点到另一个点可靠地传递。在多跳流的情况下,来自前一跳的sink和来自下一跳的source都运行其事务以确保数据安全地存储在下一跳的channel 中。
  5. 可恢复性
      event 在channel中进行,该channel管理从故障中恢复。Flume支持由本地文件系统支持的持久文件channel。还有一个内存channel,它只是将event 存储在内存中的队列中,这更快,但是当 agent 进程死亡时仍然留在内存channel中的任何event 都无法恢复。
配置步骤

配置flume

下载地址:https://flume.apache.org/download.html

  1. 解压至指定目录
    1
    tar -zxvf apache-flume-1.6.0-bin.tar.gz  -C /export/servers
  2. 更改解压包名
    1
    mv apache-flume-1.6.0-bin flume-1.6.0
  3. 别忘了给flume目录权限,我这里直接给主目录全部权限
    1
    chmod -R 777 /export
  4. 进入flumeconf
    1
    cd /export/servers/flume-1.6.0/conf
  5. 修改flume-env.sh文件,修改之前先把模板flume-env.sh.template复制并更名
    1
    cp flume-env.sh.template flume-env.sh
    image-20220702120419154
  6. vim flume-env.sh进去将export JAVA_OPTS那行的注释去掉
    1
    vim flume-env.sh
    image-20220702120450228

测试采集日志信息上传HDFS上

  1. 进到flume目录下
    1
    sudo vi dir-hdfs.conf
  2. 插入以下内容,注意格式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    #定义三大组件的名称
    ag1.sources = source1
    ag1.sinks = sink1
    ag1.channels = channel1

    #配置source组件
    ag1.sources.source1.type = spooldir
    ag1.sources.source1.spoolDir = /home/hadoop/log/
    ag1.sources.source1.fileSuffix=.FINISHED
    ag1.sources.source1.inputCharset=utf-8
    ag1.sources.source1.deserializer.maxLineLength=5120

    #配置sink组件
    ag1.sinks.sink1.type = hdfs
    ag1.sinks.sink1.hdfs.path = hdfs://192.168.100.128:9000/access_log/%y-%m-%d/%H-%M
    ag1.sinks.sink1.hdfs.filePrefix = app_log
    ag1.sinks.sink1.hdfs.fileSuffix = .log
    ag1.sinks.sink1.hdfs.batchSize= 100
    ag1.sinks.sink1.hdfs.fileType = DataStream
    ag1.sinks.sink1.hdfs.writeFormat = Text

    ## roll:滚动切换:控制写文件的切换规则
    ##按文件体积(字节)来切
    ag1.sinks.sink1.hdfs.rollSize = 512000
    ##按event条数切
    ag1.sinks.sink1.hdfs.rollCount = 1000000
    ##按时间间隔切换文件
    ag1.sinks.sink1.hdfs.rolllnterval = 60

    ##控制生成目录的规则(round回滚)
    #启用回滚
    ag1.sinks.sink1.hdfs.round = true
    #回滚值
    ag1.sinks.sink1.hdfs.roundValue = 10
    #单位
    ag1.sinks.sink1.hdfs.roundUnit = minute
    #是否使用本地时间戳
    ag1.sinks.sink1.hdfs.useLocalTimeStamp = true

    # channel组件配置
    ag1.channels.channel1.type = memory
    ## event条数
    ag1.channels.channel1.capacity = 500000
    ##flume事务控制所需要的缓存容量600条event
    ag1.channels.channel1.transactionCapacity = 600

    #绑定source、channel和sink之间的连接
    ag1.sources.source1.channels = channel1
    ag1.sinks.sink1.channel = channel1
    注意
    image-20220702120708678
    红框内的一定要和hadoopcore-site.xml写的端口号一样,ip地址填写自己实际的
    image-20220702120802421
  3. 启动hadoop集群
    1
    start-all.sh
    image-20220702124328195
  4. 进入flume根目录输入以下命令
    1
    bin/flume-ng agent -c conf/ -f dir-hdfs.conf -n ag1 -Dflume.root.logger=INFO,console
    image-20220702121020414
    出现这个不是卡,而是启动成功了
    image-20220702124534763
    测试能否采集信息到hdfs
    image-20220702123348363
  5. 进入网页查看,hdfs已经创建了名为的目录
    image-20220702124718046
    采集到的信息成功上传到hdfs
    image-20220702124746580
    点击进入下载查看
    image-20220702124955263
    image-20220702125017522
    观察flume中日志信息,查看采集后的文件是否加了后缀 .FINISHED
    image-20220701163522074
flume实例
  1. 创建项目
    image-20220726195552504
    项目结构
    创建类包com.mapreduce(类包名可自定义)然后再里面新建Java类
    image-20220726204721543
  2. 导入相关jar包
    image-20220726200515456
  3. 编程代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
    throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while(itr.hasMoreTokens()) {
    this.word.set(itr.nextToken());
    context.write(this.word, one);
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values,
    Reducer<Text, IntWritable, Text, IntWritable>.Context context)
    throws IOException, InterruptedException {
    int sum = 0;
    IntWritable val;
    for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
    val = (IntWritable)i$.next();
    }
    this.result.set(sum);
    context.write(key, this.result);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package com.mapreduce;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;

    public class WordCountAPP {

    public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
    throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while(itr.hasMoreTokens()) {
    this.word.set(itr.nextToken());
    context.write(this.word, one);
    }
    }
    }

    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values,
    Reducer<Text, IntWritable, Text, IntWritable>.Context context)
    throws IOException, InterruptedException {
    int sum = 0;
    IntWritable val;
    for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
    val = (IntWritable)i$.next();
    }
    this.result.set(sum);
    context.write(key, this.result);
    }
    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package com.mapreduce;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.examples.WordCount;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;


    public class WordCountSubmitter {

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
    if(otherArgs.length < 2) {
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
    System.exit(2);
    }

    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCountAPP.class);
    job.setMapperClass(WordCountAPP.WordCountMapper.class);
    job.setCombinerClass(WordCountAPP.WordCountReducer.class);
    job.setReducerClass(WordCountAPP.WordCountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    for(int i = 0; i < otherArgs.length - 1; ++i) {
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

    System.exit(job.waitForCompletion(true)?0:1);
    }
    }
  4. 将项目打包成jar包
    image-20220727130708110
    image-20220726212129871
  5. 导出jar包
    image-20220726211223859
    当进度条走完之后,在项目的目录中会多出一个out文件夹,里面存放的就是使用IDEA导出的jar包
    image-20220726212415784
    image-20220726212605906
    上传jar包,这里我用最“暴力省时”的方法,直接用xftp工具拖过去
    image-20220726213440942
    image-20220726213609942
  6. 编辑测试数据并移动到log目录下
    测试前请自行检查是否已经成功开启flume
    1
    vim testFile
    1
    mv testFile /home/hadoop/log
    此时flume提示上传成功
    image-20220726214610340
    image-20220726214637844
    在data目录下输入jar命令:
    1
    hadoop jar flume_test.jar com.mapreduce.WordCountSubmitter /access_log/22-07-26/21-40/app_log.1658843108283.log /flume/output
    flume_test.jar: 所执行的jar包
    com.mapreduce.WordCountSubmitter: main类
    /access_log/22-07-26/21-40/app_log.1658843108283.log: 所分析文件的路径
    /flume/output: 分析结果输出的路径
    执行成功图:
    image-20220727130834150
    image-20220727130901363
    查看结果
    image-20220727123456943