Spark Streaming And Flume Integration -


i trying integrate flume spark streaming.

i have following workflow defined , seeing issue spark not able commit transaction flume channel when data volumes higher.

i new both flume , spark streaming if there's scalability option have not tried please let me know.

my workflow

spool dir -> file-channel-1 -> avro sink -> avro source -> file-channel-2

file-channel-2 consumed 2 sinks

  1. hdfs sink
  2. sparksink

so theres 2 agents,

agent 1 delivers avro sink running @ port 4545 spool directory.

agent 2 reads avro source running @ port 4545 , delivers hdfs , spark sink,

i can see events getting delivered hdfs. no problems there.

the spark sink on other hand throws following errors,

org.apache.spark.streaming.flume.sink.logging$class.loginfo(logging.scala:47)] spark failed commit transaction. reattempt events. org.apache.spark.streaming.flume.sink.logging$class.logwarning(logging.scala:59)] spark not commit transaction, nack received. rolling transaction. 

these errors not occur right away. see lot of logs ones below , after time has passed starts throwing errors.

(new i/o worker #18) [info - org.apache.avro.ipc.nettyserver$nettyserveravrohandler.handleupstream(nettyserver.java:171)] [id: 0xab559b05, /127.0.0.1:51736 => /127.0.0.1:11111] interest_changed (new i/o worker #18) [info - org.apache.avro.ipc.nettyserver$nettyserveravrohandler.handleupstream(nettyserver.java:171)] [id: 0xab559b05, /127.0.0.1:51736 => /127.0.0.1:11111] interest_changed (new i/o worker #18) [info - org.apache.avro.ipc.nettyserver$nettyserveravrohandler.handleupstream(nettyserver.java:171)] [id: 0xab559b05, /127.0.0.1:51736 => /127.0.0.1:11111] interest_changed 

i tried following options,

  1. i looked sparksink api. assumed if time out. increased timeout 300 seconds. didnt help.
  2. in flume.createpollingstream, specified batch size of 10000 parallellism 1. didn't either.

i know sure scalability issue because if spool small file 50 events spool directory, can see small dataset reaching both hdfs , sparksink.

my file channel properties basic , mentioned below.

agent_1.channels.file-channel-2.type = file agent_1.channels.file-channel-2.checkpointdir = <<some dir 1>> agent_1.channels.file-channel-2.datadirs = << dir 2 >> 

they same file channel 1 too. pointing different directories of course. moreover, there's no isses agent1.

it's agent 2 delivers hdfs , spark problematic one.

please find sparksink config below.

agent_2.sinks.sparkpullsink.type = org.apache.spark.streaming.flume.sink.sparksink agent_2.sinks.sparkpullsink.hostname = localhost agent_2.sinks.sparkpullsink.port = 11111 

i have spark streaming job runs @ port 11111, polling sink , processing events received. job doesn't receive events process when try large dataset.

thanks in advance inputs on this.


Comments

Popular posts from this blog

c++ - Difference between pre and post decrement in recursive function argument -

php - Nothing but 'run(); ' when browsing to my local project, how do I fix this? -

php - How can I echo out this array? -