Flume安装配置及使用
本文最后更新于46 天前,其中的信息可能已经过时,如有错误请发送邮件到2794998160@qq.com

Flume安装配置及使用

1. 概述

1.1 flume定义

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

image-20240114144609871

1.2 Flume组成架构

架构图:

image-20240114144657928

架构详解:

image-20240114144734040

组件介绍:

:one:Agent

​ Agent是一个JVM进程,它以事件的形式将数据从源头送至目的,是Flume数据传输的基本单元

​ Agent主要有3个部分组成,Source、Channel、Sink。

:two:Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

:three:Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

​ Flume自带两种Channel:Memory Channel和File Channel。

Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

:four:Sink

Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。

​ Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

:five:Event

​ 传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。

1.3 Flume拓扑结构

Flume的拓扑结构如图1-3、1-4、1-5和1-6所示:

图1-3 Flume Agent连接:

image-20240114145418597

图1-4 单source,多channel、sink:

image-20240114145425994

图1-5 Flume负载均衡:

image-20240114145522248

图1-6 Flume Agent聚合

image-20240114145705707

1.4 Flume Agent内部原理

image-20240114145814594

2. 快速入门

2.1 Flume安装地址

Flume官网地址: http://flume.apache.org/

文档查看地址: http://flume.apache.org/FlumeUserGuide.html

下载地址: http://archive.apache.org/dist/flume/

2.2 安装部署

2.2.1 将所需包解压重命名

[root@master ~]# tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
[root@master ~]# mv /opt/module/apache-flume-1.9.0-bin/ /opt/module/apache-flume-1.9.0

2.2.2 添加环境变量

[root@master ~]# vi /etc/profile

添加以下内容:

#FLUME_HOME
export FLUME_HOME=/opt/module/apache-flume-1.9.0
export PATH=PATH:FLUME_HOME/bin

2.2.3 修改配置

修改flume-env.sh 指定JDK

[root@master ~]# cp FLUME_HOME/conf/flume-env.sh.templateFLUME_HOME/conf/flume-env.sh
[root@master ~]# vi $FLUME_HOME/conf/flume-env.sh

修改如下:

image-20240114150616554

2.2.4 解决guava冲突问题

将hadoop的guava包替换掉flume的guava包

[root@master ~]# rm -rf FLUME_HOME/lib/guava-11.0.2.jar
[root@master ~]# cpHADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jar $FLUME_HOME/lib/

查看结果:
[root@master ~]# ll /opt/module/apache-flume-1.9.0/lib/ |grep guava
-rw-r--r-- 1 root root 2747878 Jan 14 07:09 guava-27.0-jre.jar

运行命令(配置文件需自己进行编写):

flume-ng agent -n a1 -c FLUME_HOME/conf -fFLUME_HOME/jobs/namenode-hdfs.conf -Dflume.root.logger=INFO,console

命令解析:

  • -n/–name 指定Agent的名称。(必填)

  • -c/–conf 指定配置文件放在什么目录

  • -f/–conf-file指定配置文件,必须在–c参数定义的目录下。(必填)

  • -Dflume.root.logger=INFO,console 仅为 debug使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端,将日志输入到控制台上

#端口测试 netcat_flume_console.comf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 9999

a1.sinks.k1.type =logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#使用flume将数据导入到hdfs中

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = spooldir

a1.sources.s1.spoolDir  该目录下的文件会传输到HDFS

a1.sources.s1.spoolDir = /opt/module/flume/tmpdata

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume/logs

a1.channels.c1.type = memory

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

传输Hadoop日志(namenode)到hdfs:star:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

#读取位置
#定义sourece类型为exec可执行命令的
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /opt/module/hadoop/logs/hadoop-root-datanode-master.log

#存储位置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume

#使用在内存中缓冲事件的通道

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将source和sink绑定到channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

查看HDFS中/tmp/flume目录下生成的内容,将查看命令及结果(至少5条结果

hdfs dfs -ls /tmp/flume

image-20240722170111746

报错解决:

image-20230322121147277

原因:guava冲突

解决:将/opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar 拷贝到 flume/lib 中,并删除或把自带的 guava-11.0.2.jar 改名

cp HADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jarFLUME_HOME/lib

3. 企业开发案例

3.1 监控端口数据官方案例

3.1.1 案例需求:

​ 首先,Flume监控本机44444端口,然后通过telnet工具向本机44444端口发送消息,最后Flume将监听的数据实时显示在控制台

3.1.2 需求分析

image-20240114151624267

3.1.3 实现步骤:

1.安装telnet工具

[root@master ~]# yum install telnet -y

2.判断44444端口是否被占用

[root@master ~]# ss -tunlp |grep 44444

返回空值则表示没有被占用:

image-20240114152740603

功能描述:ss命令是一个监控TCP/IP网络的非常有用的工具,它可以显示路由表、实际的网络连接以及每一个网络接口设备的状态信息。

3.创建Flume Agent配置文件flume-telnet-logger.conf

(1) 在flume目录下创建job文件夹并进入job文件夹

[root@master ~]# mkdir FLUME_HOME/job
[root@master ~]# cdFLUME_HOME/job

(2) 在job文件夹下创建Flume Agent配置文件flume-telnet-logger.conf

[root@master job]# vim flume-telnet-logger.conf

添加以下内容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注:配置文件来源于官方手册http://flume.apache.org/FlumeUserGuide.html

本地官方文档:/opt/module/apache-flume-1.9.0/docs/FlumeUserGuide.html

参数解释:

image-20240114153507695

:star:注意:一个sources可以对多个channels,一个sinks只能对接一个channel

4.开启flume监听端口

[root@master job]# flume-ng agent -n a1 -c FLUME_HOME/conf -fFLUME_HOME/job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console

这是用于启动Flume代理(agent)的命令,以下是每个参数的解释:

  • flume-ng:Flume的命令行工具。
  • agent:指定要运行的Flume代理。
  • -n a1:指定代理的名称为a1。这是代理的唯一标识符。
  • -c $FLUME_HOME/conf:指定Flume配置文件的目录。这里使用了环境变量 $FLUME_HOME,它应该设置为 Flume 的主目录。
  • -f $FLUME_HOME/jobs/namenode-hdfs.conf:指定Flume代理使用的配置文件的路径。这个配置文件描述了数据流的配置,例如来源、通道和目的地。
  • -Dflume.root.logger=INFO,console:设置Flume代理的日志级别和日志输出方式。在这个例子中,日志级别设置为INFO,并将日志输出到控制台(console)。

运行结果如下:

image-20240114154810826

5.使用telnet工具向本机的44444端口发送内容

[root@master ~]# telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
OK
this is bigdata
OK

6.在Flume监听页面观察接收数据情况

结果如下:

2024-01-14 07:49:43,343 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }
2024-01-14 07:49:59,039 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 68 69 73 20 69 73 20 62 69 67 64 61 74 61 0D this is bigdata. }

image-20240114155118565

3.2 实时读取本地文件到HDFS案例(缺少jar包待测试)

3.2.1 案例需求:

​ 实时监控Hive日志,并上传到HDFS中

3.2.2 需求分析:

image-20240114155345612

3.2.3 实现步骤:

1.Flume要想将数据输出到HDFS,必须持有Hadoop相关jar包

将commons-configuration-1.6.jar、

hadoop-auth-2.7.2.jar、

hadoop-common-2.7.2.jar、

hadoop-hdfs-2.7.2.jar、

commons-io-2.4.jar、

htrace-core-3.1.0-incubating.jar

拷贝到/opt/module/flume/lib文件夹下

2.创建flume-file-hdfs.conf文件

[root@master job]# vim flume-file-hdfs.conf

添加以下内容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://master:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

参数解释:

image-20240114160805326

3.执行监控配置

[root@master apache-flume-1.9.0]# flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

4.开启Hadoop和Hive并操作Hive产生日志

[root@master hadoop]sbin/start-dfs.sh
[root@master hadoop] sbin/start-yarn.sh

[root@master hive]$ bin/hive
hive (default)>

5.在HDFS上查看文件。

image-20240114161137794

3.3 实时读取目录文件到HDFS案例

3.3.1 案例需求:

​ 使用Flume监听整个目录的文件

3.3.2 需求分析

image-20240114161315952

3.3.3 实现步骤

1.创建配置文件flume-dir-hdfs.conf

[root@master job]# mkdir /opt/module/apache-flume-1.9.0/upload
[root@master job]# vim flume-dir-hdfs.conf

添加以下内容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/apache-flume-1.9.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://master:9000/apache-flume-1.9.0/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

参数详解:

image-20240114161617490

2.启动监控文件夹命令

[root@master job]# flume-ng agent -n a3 -c FLUME_HOME/conf -fFLUME_HOME/job/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console

说明: 在使用Spooling Directory Source时

  • 不要在监控目录中创建并持续修改文件
  • 上传完成的文件会以.COMPLETED结尾
  • 被监控文件夹每500毫秒扫描一次文件变动

3.向upload文件夹中添加文件

[root@master upload]# touch hj.txt
[root@master upload]# touch app.txt
[root@master upload]# touch hyp.txt

查看flume日志输出:

image-20240114163056295

4.查看HDFS上的数据

image-20240114163156670

5.等待1s,再次查询upload文件夹

[root@master upload]# ll
total 0
-rw-r--r-- 1 root root 0 Jan 14 08:28 app.txt.COMPLETED
-rw-r--r-- 1 root root 0 Jan 14 08:28 hj.txt.COMPLETED
-rw-r--r-- 1 root root 0 Jan 14 08:28 hyp.txt.COMPLETED

3.4 单数据源多出口案例(选择器)

单Source多Channel、Sink如下图所示:

image-20240114163500795

3.4.1 案例需求:

​ 使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。

3.4.2 需求分析

image-20240114163605599

3.4.3 实现步骤:

1.准备工作

在/opt/module/flume/job目录下创建group1文件夹

[root@master job]# mkdir $FLUME_HOME/job/group1

在/opt/module/datas/目录下创建flume3文件夹

mkdir -p /opt/module/datas/flume3

2.创建flume-file-flume.conf

​ 配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume-flume-hdfs和flume-flume-dir。

[root@master job]# cd group1/
[root@master group1]# vim flume-file-flume.conf

添加以下内容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/apache-hive-3.1.2/logs/hiveServer2.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = master
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

:Avro是由Hadoop创始人Doug Cutting创建的一种语言无关的数据序列化和RPC框架。

:RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

3.创建flume-flume-hdfs.conf

配置上级Flume输出的Source,输出是到HDFS的Sink。

[root@master group1]# vim flume-flume-hdfs.conf

添加以下内容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = master
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://master:9000/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k1.hdfs.minBlockReplicas = 1

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

4.创建flume-flume-dir.conf

[root@master group1]# vim flume-flume-dir.conf

添加以下内容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = master
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

5.执行配置文件

别开启对应配置文件:flume-flume-dirflume-flume-hdfsflume-file-flume

1.
flume-ng agent -c FLUME_HOME/conf -n a3 -fFLUME_HOME/job/group1/flume-flume-hdfs.conf

image-20240114174121978

2.
flume-ng agent -c FLUME_HOME/conf -n a2 -fFLUME_HOME/job/group1/flume-flume-hdfs.conf

image-20240114174151299

3.
flume-ng agent -c FLUME_HOME/conf -n a1 -fFLUME_HOME/job/group1/flume-file-flume.conf

image-20240114174226938

6.启动hive

[root@master ~]# hive
hive> show databases;
OK
default
Time taken: 0.469 seconds, Fetched: 1 row(s)

7.检查HDFS上的数据

image-20240114174659992

8.检查/opt/module/datas/flume3目录中数据

[root@master flume3]$ ll
总用量 8
-rw-rw-r--. 1 root root 5942 5月  22 00:09 1526918887550-3

3.4.4 大数据赛项数据采集

3.4.1 电商数据:
3.4.4.1.1 任务一:
    在主节点使用Flume采集实时数据生成器10050端口的socket数据,将数据存入到Kafka的Topic中(Topic名称为order,分区数为4),使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下;

1.创建bigdata.conf

[root@master job]# vim bigdata.conf

添加以下配置:

# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.1.10
a1.sources.r1.port = 10050
# 这条可忽略
a1.sources.r1.max-line-length = 102400

# 描述和配置kafka sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = bigdata
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.kafka.flumeBatchSize = 200
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

:star::star: 配置参考/opt/module/apache-flume-1.9.0/docs/FlumeUserGuide.html

定义agent sources channel参考:

image-20240219145524126

定义kafka sinks:

image-20240219145733401

启动zookeeper 和 kafka服务,并创建名为bigdata的topic

启动方式1(脚本启动):

1.启动zookeeper服务
[root@master ~]# zk.sh start

2.启动kafka服务
[root@master job]# kafka.sh start

启动方式2(命令行启动):

所有节点执行:
1.启动zookeeper服务
    zkServer.sh start

2.启动kafka服务
    kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

image-20240114223439773

创建topic:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 3 --replication-factor 3 --topic bigdata

查看topic:

kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181

image-20240114225212291

执行配置文件:

flume-ng agent -n a1 -c FLUME_HOME/conf -fFLUME_HOME/job/bigdata.conf -Dflume.root.logger=INFO,console

执行结果如下:

image-20240219151912630

消费者消费消息:

kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic bigdata --from-beginning

运行结果:

image-20240219152257146

运行模拟数据脚本Python脚本:

目录如下:

image-20240219152423173

编辑Python脚本,修改ip

image-20240219152522206

修改后运行Python脚本:

gxjzy@gxjzy:~/桌面/数据采集$ sudo python3 test2.py

结果如下:

image-20240219152630307

观察kafka消费者:

image-20240219152704419

完成

使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的结果截图:

kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic order --max-messages 2

image-20240722173635683

3.4.4.1.2 任务二:
    采用多路复用模式,Flume接收数据注入kafka 的同时,将数据备份到HDFS目录/user/test/flumebackup下,将查看备份目录下的第一个文件的前2条数据的命令与结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下。

多路复用架构图:

image-20240219164419540

1.创建bigdata_Multiplexing.conf多路复用

拷贝前面编写的配置文件,新加一个hdfs的sinks:

[root@master job]# cp bigdata.conf bigdata_Multiplexing.conf

修改后内容如下:

# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.1.10
a1.sources.r1.port = 10050
a1.sources.r1.max-line-length = 102400

# 描述和配置sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = bigdata
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.kafka.flumeBatchSize = 200
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

# 描述和配置sink组件:k2
## 指定了数据接收组件类型为 HDFS
a1.sinks.k2.type = hdfs
## 指定了写入 HDFS 的路径
a1.sinks.k2.hdfs.path = hdfs://master:9000/user/test/flumebackup
## 指定了每批次写入 HDFS 的记录数为 100
a1.sinks.k2.hdfs.batchSize = 100
##指定了在滚动写入 HDFS 之前等待的最大文件大小为 10000 字节
a1.sinks.k2.hdfs.rollSize = 10000

# 描述和配置channel组件,此处使用是内存缓存的方式 c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# 描述和配置channel组件,此处使用是内存缓存的方式 c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000

# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

主要添加配置如下:

image-20240219192902016

参考:

image-20240229163920112

启动flume服务:

flume-ng agent -n a1 -c FLUME_HOME/conf -fFLUME_HOME/job/bigdata_Multiplexing.conf -Dflume.root.logger=INFO,console

运行结果:

image-20240219192723639

查看备份目录下的第一个文件的前2条数据的命令与结果截图:

image-20240219192431005

hdfs dfs -cat /usr/test/flumebackup/FlumeData.1708341766181 | head -n 2

消费者消费消息:

kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic bigdata --from-beginning

运行结果:

image-20240219193028843

数据采集完成,接下来使用fink进行实时数据处理

3.4.2 工业数据:
3.4.2.1 任务一:
1、在主节点使用Flume采集/data_log目录下实时日志文件中的数据,将数据存入到Kafka的Topic中(Topic名称分别为ChangeRecord、ProduceRecord和EnvironmentData,分区数为4),将Flume采集ChangeRecord主题的配置截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下;

创建topic:

partitions: 分区

replication-factor: 副本

ChangeRecord:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 4 --replication-factor 1 --topic ChangeRecord

ProduceRecord:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 4 --replication-factor 1 --topic ProduceRecord

EnvironmentData:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 4 --replication-factor 1 --topic EnvironmentData

创建后的topic如下:

[root@bigdata1 data_log]# kafka-topics.sh --zookeeper localhost:2181 --list
ChangeRecord
EnvironmentData
ProduceRecord

image-20240228103535104

  创建industry.conf:

[root@bigdata1 job]# vim industry.conf

添加以下内容:

# 定义这个agent中各组件的名字
a1.sources = r1 r2 r3
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3

# 定义sources r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile =/opt/module/flume-1.9.0/changerecord/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data_log/.*changerecord.csv
# r2
a1.sources.r2.type = TAILDIR
a1.sources.r2.positionFile = /opt/module/flume-1.9.0/producerecord/taildir_position.json
a1.sources.r2.filegroups = f1
a1.sources.r2.filegroups.f1 = /data_log/.*producerecord.csv
# r3
a1.sources.r3.type = TAILDIR
a1.sources.r3.positionFile =/opt/module/flume-1.9.0/environmentdata/taildir_position.json
a1.sources.r3.filegroups = f1
a1.sources.r3.filegroups.f1 = /data_log/.*environmentdata.csv

# 定义sinks k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ChangeRecord
a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# k2
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = ProduceRecord
a1.sinks.k2.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
# k3
a1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k3.kafka.topic = EnvironmentData
a1.sinks.k3.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k3.kafka.flumeBatchSize = 20
a1.sinks.k3.kafka.producer.acks = 1
a1.sinks.k3.kafka.producer.linger.ms = 1

# 定义channel c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 10000
# c3
a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 10000

# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2

a1.sources.r3.channels = c3
a1.sinks.k3.channel = c3

新增参数:

image-20240228102128393

运行flume脚本:

flume-ng agent -n a1 -c FLUME_HOME/conf -fFLUME_HOME/job/industry.conf -Dflume.root.logger=INFO,console

运行结果如下:

image-20240229202836977

3.4.2.2 任务二:
2、编写新的Flume配置文件,将数据备份到HDFS目录/user/test/flumebackup下,要求所有主题的数据使用同一个Flume配置文件完成,将Flume的配置截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下。

flume配置内容

# 定义这个agent中各组件的名字
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = TAILDIR
a2.sources.r1.positionFile = /opt/module/flume-1.9.0/tail_dir.json
a2.sources.r1.filegroups = f1 f2 f3
#f1
a2.sources.r1.filegroups.f1 = /data_log/.*producerecord.csv
a2.sources.r1.headers.f1.headerKey1 = producerecord
#f2
a2.sources.r1.filegroups.f2 = /data_log/.*changerecord.csv
a2.sources.r1.headers.f2.headerKey1 = changerecord
#f3
a2.sources.r1.filegroups.f3 = /data_log/.*environmentdata.csv
a2.sources.r1.headers.f3.headerKey1 = environmentdata
a2.sources.r1.fileHeader = true

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs:///user/test/flumebackup/%Y%m%d/%H/%{headerKey1}
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

运行

 flume-ng agent -n a2 -c FLUME_HOME/conf -fFLUME_HOME/jobs/hdfs.conf -Dflume.root.logger=INFO,console

结果如下:

image-20240229210617647

发现数据已经在备份,到hdfs查看如下:

image-20240229210721984

查看数据详情:

image-20240229210800386

hdfs dfs -cat /user/test/flumebackup/20240723/15/changerecord/upload-.1721718299144 | head -n2

完成

3.4.3 河南2023年省赛电商数据采集(maxwell)
3.4.3.1 使用多路复用同时完成(建议)
1.在主节点使用flume采集实时数据生成器25001端口数据的socket数据,将数据存入kafka的topic中(ods_mall_log,分区为2),使用kafka的自带的消费者消费ods_mall_log中的数据,并查看topic中的前1条数据结果,将查看命令与结果完整截图到答题卡

2.实时脚本启动后,在主节点进入到mwell-1.29.0的解压后目承下《在/opt/module下),配置相关文件并启动,读取主节点MlySQlL数据的binlog日志(MySQL. 的binlog相关配置已完毕,只需要关注ds_realtime_db 数据库的表)到Kafka的Topie中(Topie 名称为 ods_mall_data. 分区数为2 )。使用kafka 自带的消费者消费ods_mall_data (Topie)中的数据,查看Topie 中的前1条数据的结果,将查看命令与结果完整的截图站贴至客户端桌面

1.创建topic

截图要点,指令单独截图,查看创建的结果单独截图

1.任务一
kafka-topics.sh --create --zookeeper bigdata1:2181,bigdata2:2181,bigdata3:2181 --partitions 2 --replication-factor 1 --topic ods_mall_log
2.任务二
kafka-topics.sh --create --zookeeper bigdata1:2181,bigdata2:2181,bigdata3:2181 --partitions 2 --replication-factor 1 --topic ods_mall_data

2.编写flume多路复用

# 定义这个agent中各组件的名字
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 25001

# r2
a1.sources.r2.type = exec
a1.sources.r2.command = /opt/module/maxwell-1.29.0/bin/maxwell --user=root --password=123456 --host=192.168.45.13 --port=3306 

# 描述和配置kafka sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ods_mall_log
a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k1.kafka.flumeBatchSize = 200
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

#k2
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = ods_mall_data
a1.sinks.k2.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k2.kafka.flumeBatchSize = 200
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
a1.sinks.k2.kafka.producer.compression.type = snappy

# 描述和配置channel组件,此处使用是内存缓存的方式
# c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000

# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2

运行flume脚本

flume-ng agent -n a1 -c FLUME_HOME/conf -fFLUME_HOME/jobs/test.conf -Dflume.root.logger=INFO,console

image-20240313115746824

运行socket脚本

[root@bigdata1 data_log]# ./gen_ds_data_to_socket 

结果如下:

image-20240313165932601

查看topic ods_mall_log前一条数据:

kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic ods_mall_log --max-messages 1

结果如下:

image-20240313170419522

查看topic ods_mall_data 前一条数据

kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic ods_mall_data  --max-messages 1

image-20240313170616804

文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇