走过的最长的套路,就是面试官的套路。

FlumeCloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

Flume 的主主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到 HDFS

组成

这是官网给出的Flume的架构图:
Archieve

现在我们根据这个架构图说下Flume里面的组成。
Archieve
Source

  • Spooling Directory
  • Exec
  • Syslog
  • Avro
  • Netcat

Channel

  • Channel 是位于 SourceSink 之间的缓冲区;
  • Flume 自带两种Channel: Memory Channel , File Channel.
  • Memory Channel: 基于内存缓存,在不需要关心数据丢失的情景下适用;
  • File ChannelFlume 的持久化 Channel , 系统宕机不会丢失数据。

Sink

  • HDFS
  • Kafka
  • Logger
  • Avro
  • File
  • 自定义

Put 事物

  • doPut: 将批数据写入到临时缓冲去putList
  • doCommit: 检查 Channel 内存队列是都足够合并
  • doRollBack: Channel 内存队列空间不足,回滚数据

Take 事物

  • doTake: 先将数据读取到缓冲去takeList
  • doCommit: 如果数据全部发送成功,则清除缓冲区takeList
  • doRollBack: 数据发送过程中如果出现异常,rollBack将临时缓冲区 takeList 中的数据归还给内存队列。

面试中需要答道以下几点:

  • Taildir Source:断点续传、多目录。Flume1.6以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。

  • File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。

  • Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。

  • Kafka Channel:减少了 FlumeSink 阶段,提高了传输效率。

  • SourceChannelPut 事务

  • ChannelSinkTake 事务

Flume 拦截器

拦截器可以分为两种,ETL拦截器(做清洗数据) 和 类型拦截器(区分类型)
使用两个拦截器的目的在于解耦,便于模块话开发和可移植性高,当然,两个拦截器会对性能造成影响。

自定义拦截器的步骤

  • 实现 Interceptor
  • 重写 4 个方法:
    • initialize: 初始化
    • public Event intercept(Event event) 处理单个 Event
    • public List<Event> intercept(List<Event> events) 处理多个 Event
    • close
  • 静态内部类,实现 Interceptor.Builder

Flume 的Channel 的选择器

所谓的 Channel Selectors 其实就是让不同项目日志通过不同的 Channel 到达不同的 Sink 中去。官方提供了两种 Channel Selectors:

  • Replicating Channel Selector: 将 Source 过来的所有的 Events 发往所有的 Sink;
  • Multiplexing Channel Selector: 将 Source 过来的 Events 选择性的发往不同的 Sink.

Flume 的监控器

  • Ganglia
  • 自定义监控器

Flume的数据丢失

看使用的Channel是什么类型的Channel.

  • Memory Channel: 速度块,但是是基于内存的,容易造成数据丢失;
  • File Channel: 数据可以保存在 File 里面,数据传输本身就具备事物性,不会造成数据的丢失。

Flume 内存

  • 开发中在flume-env.sh中设置JVM heap为4G或更高,部署在单独的服务器上(4核8线程16G内存)
  • -Xmx-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc

FileChannel 的优化

  • 通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
    这个官网有说明:
    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
  • checkpointDirbackupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

HDFS Sink 小文件的处理

HDFS存入大量小文件,有什么影响

元数据层

每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命.

计算层面

默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

小文件的处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollIntervalhdfs.rollSizehdfs.rollCount
基于以上hdfs.rollInterval=3600hdfs.rollSize=134217728hdfs.rollCount =0hdfs.roundValue=3600hdfs.roundUnit= second几个参数综合作用,效果如下:
(1)tmp文件在达到128M时会滚动生成正式文件
(2)tmp文件创建超3600秒时会滚动生成正式文件
举例:在2018-01-01 05:23的时侯sink接收到数据,那会产生如下tmp文件:
/flume/20180101/flume.201801010520.tmp
即使文件内容没有达到128M,也会在06:23时滚动生成正式文件