How do I make View's asList() sortable in Google Dataflow SDK? -
we have problem making aslist()
method sortable.
we thought extending view class , override aslist method realized view class has private constructor not this.
our other attempt fork google dataflow code on github , modify pcollectionviews
class return sorted list using collections.sort method shown in code snippet below
@override protected list<t> fromelements(iterable<windowedvalue<t>> contents) { iterable<t> itr = iterables.transform( contents, new function<windowedvalue<t>, t>() { @suppresswarnings("unchecked") @override public t apply(windowedvalue<t> input){ return input.getvalue(); } }); log.info("#### start sorting list !"); list<t> templist = new arraylist<t>(); (t element : itr) { templist.add(element); }; collections.sort((list<? extends comparable>) templist); log.info("##### list should sorted !"); return immutablelist.copyof(templist); }
note sorting list.
this seemed work, when run directpipelinerunner when tried blockingdataflowpipelinerunner, didn't seem code change being executed.
note: recompiled dataflow used in our project did not work.
how can able achieve (as sorted list aslist
method call)?
the classes in pcollectionviews
not intended extension. primitive view types provided view.assingleton
, view.assingleton
view.asiterable
, view.asmap
, , view.asmultimap
supported.
to obtain sorted list pcollectionview
, you'll need sort after have read it. following code demonstrates pattern.
// assume have pcollection pcollection<mycomparable> mypc = ...; // prepare side input list final pcollectionview<list<mycomparable> myview = mypc.apply(view.aslist()); // side input list , sort someothervalue.apply( pardo.withsideinputs(myview).of( new dofn<a, b>() { @override public void processelement(processcontext ctx) { list<mycomparable> templist = lists.newarraylist(ctx.sideinput(myview)); collections.sort(templist); // whatever want sorted list } }));
of course, may not want sort repeatedly, depending on cost of sorting vs cost of materializing new pcollection
, can output value , read new side input without difficulty:
// side input list, sort it, , put in pcollection pcollection<list<mycomparable>> sortedsingleton = create.<void>of(null).apply( pardo.withsideinputs(myview).of( new dofn<void, b>() { @override public void processelement(processcontext ctx) { list<mycomparable> templist = lists.newarraylist(ctx.sideinput(myview)); collections.sort(templist); ctx.output(templist); } })); // prepare side input list final pcollectionview<list<mycomparable>> sortedview = sortedsingleton.apply(view.assingleton()); someothervalue.apply( pardo.withsideinputs(sortedview).of( new dofn<a, b>() { @override public void processelement(processcontext ctx) { ... ctx.sideinput(sortedview) ... // whatever want sorted list } }));
you may interested in unsupported sorter contrib module doing larger sorts using both memory , local disk.
Comments
Post a Comment