How do I make View's asList() sortable in Google Dataflow SDK? -


we have problem making aslist() method sortable.

we thought extending view class , override aslist method realized view class has private constructor not this.

our other attempt fork google dataflow code on github , modify pcollectionviews class return sorted list using collections.sort method shown in code snippet below

@override protected list<t> fromelements(iterable<windowedvalue<t>> contents) {     iterable<t> itr = iterables.transform(         contents,         new function<windowedvalue<t>, t>() {           @suppresswarnings("unchecked")           @override           public t apply(windowedvalue<t> input){             return input.getvalue();           }         });      log.info("#### start sorting list !");     list<t> templist = new arraylist<t>();     (t element : itr) {       templist.add(element);     };     collections.sort((list<? extends comparable>) templist);     log.info("##### list should sorted !");     return immutablelist.copyof(templist); } 

note sorting list.

this seemed work, when run directpipelinerunner when tried blockingdataflowpipelinerunner, didn't seem code change being executed.

note: recompiled dataflow used in our project did not work.

how can able achieve (as sorted list aslist method call)?

the classes in pcollectionviews not intended extension. primitive view types provided view.assingleton, view.assingleton view.asiterable, view.asmap, , view.asmultimap supported.

to obtain sorted list pcollectionview, you'll need sort after have read it. following code demonstrates pattern.

// assume have pcollection pcollection<mycomparable> mypc = ...;  // prepare side input list final pcollectionview<list<mycomparable> myview = mypc.apply(view.aslist());  // side input list , sort someothervalue.apply(     pardo.withsideinputs(myview).of(         new dofn<a, b>() {           @override           public void processelement(processcontext ctx) {             list<mycomparable> templist =                 lists.newarraylist(ctx.sideinput(myview));             collections.sort(templist);             // whatever want sorted list            }         })); 

of course, may not want sort repeatedly, depending on cost of sorting vs cost of materializing new pcollection, can output value , read new side input without difficulty:

// side input list, sort it, , put in pcollection pcollection<list<mycomparable>> sortedsingleton = create.<void>of(null).apply(     pardo.withsideinputs(myview).of(         new dofn<void, b>() {           @override           public void processelement(processcontext ctx) {             list<mycomparable> templist =                 lists.newarraylist(ctx.sideinput(myview));             collections.sort(templist);             ctx.output(templist);           }         }));  // prepare side input list final pcollectionview<list<mycomparable>> sortedview =     sortedsingleton.apply(view.assingleton());  someothervalue.apply(     pardo.withsideinputs(sortedview).of(         new dofn<a, b>() {           @override           public void processelement(processcontext ctx) {             ... ctx.sideinput(sortedview) ...             // whatever want sorted list            }         })); 

you may interested in unsupported sorter contrib module doing larger sorts using both memory , local disk.


Comments

Popular posts from this blog

mysql - Dreamhost PyCharm Django Python 3 Launching a Site -

java - Sending SMS with SMSLib and Web Services -

java - How to resolve The method toString() in the type Object is not applicable for the arguments (InputStream) -