关于Flume-ng那些事(三)

  • A+
所属分类:日志系统

agentA  (10.1.124.197)agentB(10.1.124.196)agent(10.1.124.198)

测试一

agentA  = > agentB 从客户端推送数据到 agentB ,更改channel 为jdbc 保证events 数据可靠性。

## weblog agent config   agent A 配置

#List sources, sinks and channels in the agent
weblog-agent.sources = avro-sour
weblog-agent.sinks = avro-forward-sink
weblog-agent.channels = jdbc-channel
#define the flow
#webblog-agent sources config
weblog-agent.sources.avro-sour.channels = jdbc-channel
weblog-agent.sources.avro-sour.type = avro
weblog-agent.sources.avro-sour.bind = 0.0.0.0
weblog-agent.sources.avro-sour.port = 41414
#avro sink properties
weblog-agent.sinks.avro-forward-sink.channel = jdbc-channel
weblog-agent.sinks.avro-forward-sink.type = avro
weblog-agent.sinks.avro-forward-sink.hostname = 10.1.124.196
weblog-agent.sinks.avro-forward-sink.port = 41414
#channels config
#weblog-agent.channels.jdbc-channel.type = memory
weblog-agent.channels.jdbc-channel.type = jdbc

agnetB 配置:

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

agent A  上执行:flume-ng agent -n agent1 -c /opt/apps/flume-ng/conf/ -f flume.conf  &

测试二

  根据测试一继续测试events 可靠性,stop掉agentB上flume agent 服务。
在agent A上测试写入数据。 start agentB上flume agent服务。agentA上的events 同步到agentB上。

测试三

 agentA 使用exec 执行tail 作为 sources  ,使用agentB 和agentC作为收集数据点。

|---------------agentB
agentA
|---------------agentC

测试内容分别将agentA的souces使用exec 执行tail 命令 读取nginx 日志,分别通过两个sinks 转发event 到agentB,agentC,测试负载均衡和可靠性
agentB、agentC配置文件同测试一的AgentB。agentA配置文件如下:

#List sources, sinks and channels in the agent
weblog-agent.sources = tail
weblog-agent.sinks = avro-forward-sink01 avro-forward-sink02
weblog-agent.channels = jdbc-channel
#define the flow
#webblog-agent sources config
weblog-agent.sources.tail.channels = jdbc-channel
weblog-agent.sources.tail.type = exec
weblog-agent.sources.tail.command = tail -f /opt/apps/nginx/logs/access.log
#avro sink properties
weblog-agent.sinks.avro-forward-sink01.channel = jdbc-channel
weblog-agent.sinks.avro-forward-sink01.type = avro
weblog-agent.sinks.avro-forward-sink01.hostname = 10.1.124.196
weblog-agent.sinks.avro-forward-sink01.port = 41414
#avro sink 02
weblog-agent.sinks.avro-forward-sink02.channel = jdbc-channel
weblog-agent.sinks.avro-forward-sink02.type = avro
weblog-agent.sinks.avro-forward-sink02.hostname = 10.1.124.198
weblog-agent.sinks.avro-forward-sink02.port = 41414
#channels config
#weblog-agent.channels.jdbc-channel.type = memory
weblog-agent.channels.jdbc-channel.type = jdbc
weblog-agent.channels.jdbc-channel.sysprop.user.home = /tmp/flumedb

agnetA上执行:flume-ng agent -n weblog-agent -c /opt/apps/flume-ng/conf/ -f flume.conf &

agnetB,C上执行:flume-ng agent -n agent1 -c /opt/apps/flume-ng/conf/ -f flume.conf  &
测试结果: agentA上event会分别写到agentB和agentC,类似于lvs rr模式,stop  agentC上的flume agent
后agentA上的event会写到agentB上,恢复agentC后,agentA上的event重新轮询写到agentB和C上。

测试四
agentB和agentC做agentA sink的HA方法,该配置使用Flume Sink Processors 。

可以观察下该agnetA配置文件同测试三的差别

#List sources, sinks and channels in the agent
weblog-agent.sources = tail
weblog-agent.sinks = avro-forward-sink01 avro-forward-sink02
weblog-agent.channels = jdbc-channel
#define the flow
#webblog-agent sources config
weblog-agent.sources.tail.channels = jdbc-channel
weblog-agent.sources.tail.type = exec
weblog-agent.sources.tail.command = tail -f /opt/apps/nginx/logs/access.log
#avro sink properties
weblog-agent.sinks.avro-forward-sink01.channel = jdbc-channel
weblog-agent.sinks.avro-forward-sink01.type = avro
weblog-agent.sinks.avro-forward-sink01.hostname = 10.1.124.196
weblog-agent.sinks.avro-forward-sink01.port = 41414
#avro sink 02
weblog-agent.sinks.avro-forward-sink02.channel = jdbc-channel
weblog-agent.sinks.avro-forward-sink02.type = avro
weblog-agent.sinks.avro-forward-sink02.hostname = 10.1.124.198
weblog-agent.sinks.avro-forward-sink02.port = 41414
#sink FailoverSink
weblog-agent.sinkgroups=group1
weblog-agent.sinkgroups.group1.sinks = avro-forward-sink01 avro-forward-sink02
weblog-agent.sinkgroups.group1.processor.type = failover
weblog-agent.sinkgroups.group1.processor.priority.avro-forward-sink01 = 5
weblog-agent.sinkgroups.group1.processor.priority.avro-forward-sink02 = 10
#channels config
#weblog-agent.channels.jdbc-channel.type = memory
weblog-agent.channels.jdbc-channel.type = jdbc
weblog-agent.channels.jdbc-channel.sysprop.user.home = /tmp/flumedb

定义一个sinkgroups 组。该组中成员只有一个进行sinks工作。priority 值越大,代表优先级越高。 priority值相同时,取最后出现的成员。

测试结果:avro-forward-sink02  提供sink工作。sotp掉agentC 上flume agent   ,agentA上的event 使用avro-forward-sink01    发送到agentB上
恢复agentC上的flume agent ,相应event从新发送到agentC。

测试五

agentC----|
| ====> agentB   

agentA-----|

测试内容,汇聚,配置文件同
agentA & agentC

## weblog agent config agentA & agentC

#List sources, sinks and channels in the agent
weblog-agent.sources = avro-sour
weblog-agent.sinks = avro-forward-sink
weblog-agent.channels = jdbc-channel
#define the flow
#webblog-agent sources config
weblog-agent.sources.avro-sour.channels = jdbc-channel
weblog-agent.sources.avro-sour.type = avro
weblog-agent.sources.avro-sour.bind = 0.0.0.0
weblog-agent.sources.avro-sour.port = 41414
#weblog-agent.sources.avro-sour.restart = true
#avro sink properties
weblog-agent.sinks.avro-forward-sink.channel = jdbc-channel
weblog-agent.sinks.avro-forward-sink.type = avro
weblog-agent.sinks.avro-forward-sink.hostname = 10.1.124.196
weblog-agent.sinks.avro-forward-sink.port = 41414
#channels config
#weblog-agent.channels.jdbc-channel.type = memory
weblog-agent.channels.jdbc-channel.type = jdbc

agentB

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

测试结果,汇聚正常。

测试六 

测试内容,sources select type  replicating  ,我称为日志raid1。将agentA 的event 写到agentB和C

agentA 配置文件如下:

#List sources, sinks and channels in the agent

weblog-agent.sources = tail

weblog-agent.sinks = avro-forward-sink01 avro-forward-sink02

weblog-agent.channels = jdbc-channel01 jdbc-channel02

#define the flow

#webblog-agent sources config

weblog-agent.sources.tail.channels = jdbc-channel01 jdbc-channel02

weblog-agent.sources.tail.type = exec

weblog-agent.sources.tail.command = tail -f /opt/apps/nginx/logs/access.log

weblog-agent.sources.tail.selector.type = replicating

#avro sink properties

weblog-agent.sinks.avro-forward-sink01.channel = jdbc-channel01

weblog-agent.sinks.avro-forward-sink01.type = avro

weblog-agent.sinks.avro-forward-sink01.hostname = 10.1.124.196

weblog-agent.sinks.avro-forward-sink01.port = 41414

#avro sink 02

weblog-agent.sinks.avro-forward-sink02.channel = jdbc-channel02

weblog-agent.sinks.avro-forward-sink02.type = avro

weblog-agent.sinks.avro-forward-sink02.hostname = 10.1.124.198

weblog-agent.sinks.avro-forward-sink02.port = 41414

#channels config

#weblog-agent.channels.jdbc-channel.type = memory

weblog-agent.channels.jdbc-channel01.type = jdbc

weblog-agent.channels.jdbc-channel01.sysprop.user.home = /tmp/flumedb01

weblog-agent.channels.jdbc-channel02.type = jdbc

weblog-agent.channels.jdbc-channel02.sysprop.user.home = /tmp/flumedb02

测试结果,查看agentB和C 从agentA得到的event完全一下,注意时间,agentC的event收集时间较B要晚一些。
转载请注明出处: http://www.ttlsa.com/html/2770.html

weinxin
微信公众号
扫一扫关注运维生存时间公众号,获取最新技术文章~

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

目前评论:3   其中:访客  1   博主  0   引用   2

    • […] 关于Flume-ng那些事(三):常见架构测试 ?http://www.ttlsa.com/log-system/about-flume-ng-3/ […]

    • 来自外部的引用: 2

      • Flume NG 简介及配置实战 – 欧姆云
      • Flume NG 简介及配置实战 – 阿龙的博客