Spark Streaming And Flume Integration -
i trying integrate flume spark streaming.
i have following workflow defined , seeing issue spark not able commit transaction flume channel when data volumes higher.
i new both flume , spark streaming if there's scalability option have not tried please let me know.
my workflow
spool dir -> file-channel-1 -> avro sink -> avro source -> file-channel-2
file-channel-2 consumed 2 sinks
- hdfs sink
- sparksink
so theres 2 agents,
agent 1 delivers avro sink running @ port 4545 spool directory.
agent 2 reads avro source running @ port 4545 , delivers hdfs , spark sink,
i can see events getting delivered hdfs. no problems there.
the spark sink on other hand throws following errors,
org.apache.spark.streaming.flume.sink.logging$class.loginfo(logging.scala:47)] spark failed commit transaction. reattempt events. org.apache.spark.streaming.flume.sink.logging$class.logwarning(logging.scala:59)] spark not commit transaction, nack received. rolling transaction.
these errors not occur right away. see lot of logs ones below , after time has passed starts throwing errors.
(new i/o worker #18) [info - org.apache.avro.ipc.nettyserver$nettyserveravrohandler.handleupstream(nettyserver.java:171)] [id: 0xab559b05, /127.0.0.1:51736 => /127.0.0.1:11111] interest_changed (new i/o worker #18) [info - org.apache.avro.ipc.nettyserver$nettyserveravrohandler.handleupstream(nettyserver.java:171)] [id: 0xab559b05, /127.0.0.1:51736 => /127.0.0.1:11111] interest_changed (new i/o worker #18) [info - org.apache.avro.ipc.nettyserver$nettyserveravrohandler.handleupstream(nettyserver.java:171)] [id: 0xab559b05, /127.0.0.1:51736 => /127.0.0.1:11111] interest_changed
i tried following options,
- i looked sparksink api. assumed if time out. increased timeout 300 seconds. didnt help.
- in flume.createpollingstream, specified batch size of 10000 parallellism 1. didn't either.
i know sure scalability issue because if spool small file 50 events spool directory, can see small dataset reaching both hdfs , sparksink.
my file channel properties basic , mentioned below.
agent_1.channels.file-channel-2.type = file agent_1.channels.file-channel-2.checkpointdir = <<some dir 1>> agent_1.channels.file-channel-2.datadirs = << dir 2 >>
they same file channel 1 too. pointing different directories of course. moreover, there's no isses agent1.
it's agent 2 delivers hdfs , spark problematic one.
please find sparksink config below.
agent_2.sinks.sparkpullsink.type = org.apache.spark.streaming.flume.sink.sparksink agent_2.sinks.sparkpullsink.hostname = localhost agent_2.sinks.sparkpullsink.port = 11111
i have spark streaming job runs @ port 11111, polling sink , processing events received. job doesn't receive events process when try large dataset.
thanks in advance inputs on this.
Comments
Post a Comment