代码如下所示,为了实现count(distinct) over()的功能,自定义窗口函数,使用时:
count_loop_distinct(user_id) over ( order by rk rows BETWEEN UNBOUNDED PRECEDING and CURRENT ROW)
在大数据量(百万级)执行非常缓慢,还请官方的同学看一下,是哪里写的有问题,因为要去重,所以定义了一个set集合用于去重。
import java.io.*;
import java.util.HashSet;
import java.util.Set;
/**
-
Created by Jo
*/
public class CountLoopDistinctUDWF {public static class State {
Set counter = new HashSet<>();public int serializeLength() { return 4; } @Override public String toString() { return "State{" + "counter length=" + counter.size() + '}'; }
}
public State create() {
return new State();
}public void destroy(State state) {
}
public void serialize(State state, java.nio.ByteBuffer buff) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(bos);
oos.writeObject(state.counter);
byte[] bytes = bos.toByteArray();
buff.put(bytes);
} catch (IOException e) {
System.err.println(“serialize failed:” + e);
}}
public void update(State state, String val) {
if (val != null) {
state.counter.add(val);
}
}public void merge(State state, java.nio.ByteBuffer buffer) {
byte[] bytes = new byte[]{};
buffer.get(bytes);
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois;
try {
ois = new ObjectInputStream(bis);
Set s = (Set) ois.readObject();
state.counter.addAll(s);
} catch (IOException e) {
System.err.println(“merge exception:” + e);
} catch (ClassNotFoundException e) {
System.err.println(“merge exception:” + e);
}
}public Integer finalize(State state) {
return state.counter.size();
}public void reset(State state) {
state.counter = new HashSet<>();
}public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
String[] inputs) {
for (int i = (int) frame_start; i < (int) frame_end; ++i) {
state.counter.add(inputs[i]);
}
}
}