前面有对用户的行为的数据采集做了一个简单的介绍,这里,可以先将这个采集的通道搭建起来,这样的话,也方便我们后续的继续展开。有能力的可以准备云服务器或者至少三台物理机,不然的话,就可以和我一样去在自己的本地机器上开虚拟机。

准备

机器配置

这里,我们需要准备三台虚拟机,我这里使用的是 Windows 自带的虚拟机 Hyper-V: 先讲下我这边的这个机器的配置:

  • hadoop101: 主节点,也就是我们的 master 节点,开的 4 核,6GB 的内存;
  • hadoop102,hadoop103: 工作节点,开的4核,4GB内存;
  • 至少 30 GB 的空间
  • 虚拟机之间网络互通,且均可以访问外网

Centos基本配置

  • 静态IP设置
  • SSH 的免密登陆
  • 非 Root 用户创建
  • 关闭防火墙
  • Java环境

Hadoop 集群环境

这里的集群环境的搭建,其实也很简单,主要的就是 8 个配置文件要正确:
core-site.xml

1
2
3
4
5
6
7
8
9
10
11
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop102:9000</value>
</property>

<!-- 指定Hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>

hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
<!-- 数据的副本数量 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 指定Hadoop辅助名称节点主机配置 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop104:50090</value>
</property>

yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!-- Site specific YARN configuration properties -->
<!-- Reducer获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop103</value>
</property>
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>

mapred-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop104:10020</value>
</property>
<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop104:19888</value>
</property>

在 hadoop-env.sh , mapred-env.sh , yarn-env.sh 要特别注意,我们需要手动的配置下 JAVA_HOME 的路径,不然就容易出问题。最后是 slaves ,里面配上我们的各个节点就行。

然后呢,我们需要将我们的 hadoop 文件目录全部分发到各个节点上去,在保证Java环境没问题的情况下,我们就可以开始进行 namenode 的格式化了。

1
hdfs namenode -format

当我们看到:

1
2
3
4
20/05/26 04:16:48 INFO namenode.FSImage: Allocated new BlockPoolId: BP-1231729618-127.0.0.1-1590481008262
20/05/26 04:16:48 INFO common.Storage: Storage directory /opt/module/hadoop-2.7.2/data/tmp/dfs/name has been successfully formatted.
20/05/26 04:16:48 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0
20/05/26 04:16:48 INFO util.ExitUtil: Exiting with status 0

的日志出现的时候,就说明我们的初始化成功了。然后我们开始启动我们的 hadoop 集群:

1
start-dfs.sh

这样的话,我们的 Hadoop 的集群环境就是搭建好了,效果如下:
Hadoop

Zookeeper环境

我这里就直接贴配置好了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/module/zookeeper-3.4.10/zkData
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.101=hadoop101:2888:3888
server.102=hadoop102:2888:3888
server.103=hadoop103:2888:3888

这里需要注意的是:

  • zkData: 自己创建的文件目录
  • myid: 位于 zkData 目录下,里面记录的值为 当前节点的唯一的 id , 如,我们在hadoop101上记录的值为:101 , 和这里的 server.101 对应。

这里写了一个群起的脚本,可自行编写,脚本的存在的意义就是方便我启动 zk 集群的。
Zookeeper

Kafka 的集群安装

这里也是直接的贴配置了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#broker的全局唯一编号,不能重复
broker.id=101
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181

这里需要注意两点:

  • broker.id: 是我们的机器的唯一的 id , 这个不可以重复
  • zookeeper.connect: 这是 Zookeeper的地址

随后,我们需要将 Kafka 的文件目录分发到各个机器上去,修改我们的 broker.id , 这样一个简单的 Kafka 的集群环境就好了。

Flume

到这里为止呢,我们的一个大体的数据通道就算好了,现在就是还需要一个采集的通道,也就是 Flume. Flume这里需要点内容,就是我们需要有拦截器,毕竟我们不想要我们的数据杂乱无章的进入到Kafka 里面去。

前面有讲到,我们的数据流从 Flume -> Kafka -> Flume -> HDFS .所以呢,这里我们规划下,hadoop101 / hadoop102 用来Flume -> Kafka 的通道 , hadoop103 用来 Kafka->Flume->HDFS 的通道。现在开始编写我们的 Flume 的 collect 脚本。

我们现看一个图:

我们可以看到图,这里我们需要注意两点,一个是拦截器,主要用于 ETL 以及类型的区分,我们将日志内容分为两部分,一个称为 启动日志,一个称为事件日志,启动日志走的是 start channel , 进入的是 start topic , 事件日志走的是 event channel , 进入的是 event topic。这样应该就比较清楚了,现在我们要做的就是 拦截器和Channel的选择。

  • 数据选选择的是:TailDir Source,它支持断点续传,它会有一个 json 文件维护当前的位置,一旦挂了,可以从这个里面找到失败的前的位置,并从新开始执行。
  • Channel使用的是 Kafka Channel,别问为什么,问了就是下一级是 Kafka.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
a1.sources=r1
a1.channels=c1 c2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/apache-flume-1.7.0/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.sun-iot.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.sun-iot.flume.interceptor.LogTypeInterceptor$Builder

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

我们现在先来写这个Flume里面的两个拦截器。
这个程序在 数据仓库

模拟数据

数据仓库模拟数据的生成

数据采集通道测试

现在我们的数据模拟,采集通道都搭建成功了,现在我们需要真实的采集下我们的数据,看看我们的采集数据能不能正常被采集到,这里我们需要验证的是: 日志采集的 log 正常产生, Kafka 正常接受到消息 , HDFS 上正常有内容。