java - Running Trident Topology in Storm TrackedTopology Unit Test -


how can run junit test of trident topology allow tuples flow through topology while testing , verifying output @ each stage? i've tried running within storm's testing framework, it's falling short of allowing verification , consistent execution of trident.

here's example topology in-line comments i'm having issues.

import static org.junit.assert.assertequals; import java.util.arrays; import java.util.list; import org.junit.test; import storm.trident.tridentstate; import storm.trident.tridenttopology; import storm.trident.operation.builtin.count; import storm.trident.testing.memorymapstate; import storm.trident.testing.split; import backtype.storm.config; import backtype.storm.ilocalcluster; import backtype.storm.testing; import backtype.storm.testing.feederspout; import backtype.storm.testing.testjob; import backtype.storm.testing.trackedtopology; import backtype.storm.tuple.fields; import backtype.storm.utils.utils;  public class wordcounttopologytest {      @test     public void testwordcounttopology() throws exception {         testing.withtrackedcluster(new wordcounttestjob());     }      public class wordcounttestjob implements testjob {          @override         public void run(ilocalcluster cluster) throws exception {              // create test topology submit             tridenttopology termcounttopology = new tridenttopology();              feederspout feeder = new feederspout(new fields("text", "author"));              tridentstate tridentstate = termcounttopology.newstream("inputspout", feeder)                     .each(new fields("text"), new split(), new fields("word"))                    .groupby(new fields("word"))                    .name("counter-output")                    .persistentaggregate(new memorymapstate.factory(), new count(), new fields("count"))                                    .parallelismhint(6);              trackedtopology tracked = testing.mktrackedtopology(cluster, termcounttopology.build());              // feed random data topology             feeder.feed(arrays.aslist("nearly men can stand adversity, if want test man's character, give him power.", "abraham lincoln"));             feeder.feed(arrays.aslist("no man has enough memory successful liar.", "abraham lincoln"));             feeder.feed(arrays.aslist("either write worth reading or worth writing.", "benjamin franklin"));             feeder.feed(arrays.aslist("well done better said.", "benjamin franklin"));              cluster.submittopology("word-count-testing", new config(), tracked.gettopology());              // (!!) runs, bad sleep time when may run faster or slower on other systems             // utils.sleep(5000);              // (!!) fails 5000ms topology timeout             // testing.trackedwait(tracked, 3);              /*              * (!!) 0. trident creates streams , bolts internally              * different names, how can read them verify?              */             list outputtuples = testing.readtuples(tracked, "counter-output");             assertequals(0, outputtuples.size());         }     } } 

beyond this, i've tried writing own basefilter tag on end of stream stores of tuples, seems there must better way. also, doesn't solve issue of running topology in controlled manner. trident supports?

use class feederbatchspout (for trident) instead of feederspout. feederbatchspout blocks itself, there no need use testing.trackedwait() or that.

source: https://groups.google.com/forum/#!topic/storm-user/cradqexo5ou


Comments

Popular posts from this blog

Email notification in google apps script -

c++ - Difference between pre and post decrement in recursive function argument -

javascript - IE11 incompatibility with jQuery's 'readonly'? -