教育行業(yè)A股IPO第一股(股票代碼 003032)

全國(guó)咨詢/投訴熱線:400-618-4000

Apache Flume timestamp和host攔截器使用

更新時(shí)間:2019年11月08日14時(shí)53分 來(lái)源:傳智播客 瀏覽次數(shù):

一、 Flume攔截器介紹

攔截器是簡(jiǎn)單的插件式組件,設(shè)置在source和channel之間。source接收到的時(shí)間,在寫入channel之前,攔截器都可以進(jìn)行轉(zhuǎn)換或者刪除這些事件。每個(gè)攔截器只處理同一個(gè)source接收到的事件。可以自定義攔截器。

flume內(nèi)置了很多攔截器,并且會(huì)定期的添加一些攔截器,下面我們學(xué)習(xí)flume內(nèi)置的,兩個(gè)經(jīng)常使用的攔截器。

1. Timestamp Interceptor(時(shí)間戳攔截器)

flume中一個(gè)最經(jīng)常使用的攔截器 ,該攔截器的作用是將時(shí)間戳插入到flume的事件報(bào)頭中。如果不使用任何攔截器,flume接受到的只有message。時(shí)間戳攔截器的配置。

參數(shù)默認(rèn)值描述type類型名稱timestamp,也可以使用類名的全路徑。preserveExisting false 如果設(shè)置為true,若事件中報(bào)頭已經(jīng)存在,不會(huì)替換時(shí)間戳報(bào)頭的值。

a1.sources.r1.interceptors = timestamp

a1.sources.r1.interceptors.timestamp.type=timestamp

a1.sources.r1.interceptors.timestamp.preserveExisting=false

2. Host Interceptor(主機(jī)攔截器)

主機(jī)攔截器插入服務(wù)器的ip地址或者主機(jī)名,agent將這些內(nèi)容插入到事件的報(bào)頭中。時(shí)間報(bào)頭中的key使用hostHeader配置,默認(rèn)是host。主機(jī)攔截器的配置參數(shù) 默認(rèn)值 描述 type 類型名稱host hostHeader host 事件投的key useIP true 如果設(shè)置為false,host鍵插入主機(jī)名 preserveExisting false 如果設(shè)置為true,若事件中報(bào)頭已經(jīng)存在,不會(huì)替換host報(bào)頭的值

a1.sources.r1.interceptors = host

a1.sources.r1.interceptors.host.type=host

a1.sources.r1.interceptors.host.useIP=false

a1.sources.r1.interceptors.timestamp.preserveExisting=true

二、 業(yè)務(wù)需求

使用flume內(nèi)置攔截器完成如下需求:

Flume 攔截器1

1. agent配置


# 03 timestamp and host interceptors work before source
a1.sources.r1.interceptors = i1
i2  # 兩個(gè)interceptor串聯(lián),依次作用于event
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i2.type = host
# flume event的頭部將添加 “hostname”:實(shí)際主機(jī)名
a1.sources.r1.interceptors.i2.hostHeader = hostname  # 指定key,value將填充為flume agent所在節(jié)點(diǎn)的主機(jī)名
a1.sources.r1.interceptors.i2.useIP = false  # IP和主機(jī)名,二選一即可

# 04 hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs: // master:9000 / flume / % Y - % m - % d /  
# hdfs sink將根據(jù)event header中的時(shí)間戳進(jìn)行替換
# hostHeader的值保持一致,hdfs sink將提取eventkeyhostnmae的值,基于該值創(chuàng)建文件名前綴
a1.sinks.k1.hdfs.filePrefix = % {hostname}  # hdfs sink將根據(jù)event header中的hostnmae對(duì)應(yīng)的value進(jìn)行替換
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollCount = 10
a1.sinks.k1.hdfs.rollSize = 1024000

# channel,memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind source,sink to channel
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

三、 驗(yàn)證攔截器效果

1. 驗(yàn)證思路

1)先將interceptor作用后的event,通過(guò)logger sink打印到console,驗(yàn)證header是否正常添加

2)修改sink為hdfs, 觀察目錄和文件的名稱是否能夠按照預(yù)期創(chuàng)建(時(shí)間戳-目錄,hostname-文件前綴)

2. 驗(yàn)證過(guò)程

1)發(fā)送header為空的http請(qǐng)求,logger sink打印event到終端,觀察event header中是否被添加了timestamp以及hostname


# 01 define agent name, source/sink/channel name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 02 source,http,jsonhandler
a1.sources.r1.type = http
a1.sources.r1.bind = node - 1
a1.sources.r1.port = 8888
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

# 03 timestamp and host interceptors work before source
a1.sources.r1.interceptors = i1
i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false

# 04 hdfs sink
a1.sinks.k1.type = logger

# channel,memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind source,sink to channel 
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

啟動(dòng)flume agent


bin/flume-ng agent -c ./conf -f ./conf/http_sink_logger_source.conf -n a1 -Dflume.root.logger=INFO,console

發(fā)送請(qǐng)求測(cè)試:


curl -X POST -d '[{"header":{},"body":"time-host-interceptor001"}]' http://node-1:8888

可以看到終端輸出的event header中已經(jīng)有了攔截器的信息

Flume 攔截器2

修改sink為hdfs, 觀察HDFS的目錄名(時(shí)間戳)和文件前綴(hostnme)

flume攔截器3


目錄名被正常替換(基于event header中的時(shí)間戳)

flume攔截器4

文件前綴被正常替換(基于event header中的hostname:實(shí)際主機(jī)名)

flume 攔截器5

文件內(nèi)容被寫入為event的body

Flume攔截器6

本文來(lái)自:傳智播客大數(shù)據(jù)培訓(xùn)學(xué)院

0 分享到:
和我們?cè)诰€交談!