自定义窗口函数实现count(distinct)执行缓慢

代码如下所示,为了实现count(distinct) over()的功能,自定义窗口函数,使用时:
count_loop_distinct(user_id) over ( order by rk rows BETWEEN UNBOUNDED PRECEDING and CURRENT ROW)
在大数据量(百万级)执行非常缓慢,还请官方的同学看一下,是哪里写的有问题,因为要去重,所以定义了一个set集合用于去重。

import java.io.*;
import java.util.HashSet;
import java.util.Set;

/**

  • Created by Jo
    */
    public class CountLoopDistinctUDWF {

    public static class State {
    Set counter = new HashSet<>();

     public int serializeLength() {
         return 4;
     }
    
     @Override
     public String toString() {
         return "State{" +
                 "counter length=" + counter.size() +
                 '}';
     }
    

    }

    public State create() {
    return new State();
    }

    public void destroy(State state) {

    }

    public void serialize(State state, java.nio.ByteBuffer buff) {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos;
    try {
    oos = new ObjectOutputStream(bos);
    oos.writeObject(state.counter);
    byte[] bytes = bos.toByteArray();
    buff.put(bytes);
    } catch (IOException e) {
    System.err.println(“serialize failed:” + e);
    }

    }

    public void update(State state, String val) {
    if (val != null) {
    state.counter.add(val);
    }
    }

    public void merge(State state, java.nio.ByteBuffer buffer) {
    byte[] bytes = new byte[]{};
    buffer.get(bytes);
    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
    ObjectInputStream ois;
    try {
    ois = new ObjectInputStream(bis);
    Set s = (Set) ois.readObject();
    state.counter.addAll(s);
    } catch (IOException e) {
    System.err.println(“merge exception:” + e);
    } catch (ClassNotFoundException e) {
    System.err.println(“merge exception:” + e);
    }
    }

    public Integer finalize(State state) {
    return state.counter.size();
    }

    public void reset(State state) {
    state.counter = new HashSet<>();
    }

    public void windowUpdate(State state,
    int peer_group_start, int peer_group_end,
    int frame_start, int frame_end,
    String[] inputs) {
    for (int i = (int) frame_start; i < (int) frame_end; ++i) {
    state.counter.add(inputs[i]);
    }
    }

}

@许秀不许秀 麻烦大佬帮忙看下这个

有大佬帮忙看一下吗?

StarRocks版本号和explain costs 发下,最好能获取个Profile

我现在把数据量降到11万多,执行的sql如下:

with t2 as (
select project_name ,kol_user_id
,dm.count_loop_distinct(user_id) over ( order by rk rows BETWEEN UNBOUNDED PRECEDING and CURRENT ROW ) as all_reply_number_dist
from das.ta_rating_kol_test
)
select project_name ,kol_user_id, max (all_reply_number_dist)all_reply_number_dist from t2
group by 1,2
order by 1,3

表结构如下:

CREATE TABLE das.ta_rating_kol_test (
project_name varchar (1048576) NULL COMMENT “”,
kol_user_id varchar (1048576) NULL COMMENT “”,
user_id varchar (1048576) NULL COMMENT “”,
rk bigint (20) NULL COMMENT “”
) ENGINE=OLAP
DUPLICATE KEY (project_name)
COMMENT “OLAP”
DISTRIBUTED BY HASH(project_name) BUCKETS 10
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”
);
耗时8分钟多,也是非常的长,explain_cost和profile见下:
窗口函数profile.txt (41.9 KB)
窗口函数sql_explain_costs.txt (3.9 KB)
另外11万条数据为也导出了,见下:
data.csv (8.7 MB)
导出的数据按照逗号分隔,还请帮忙看下,谢谢!

看起来像慢在调用自己写的UDAF函数上,我们先模拟测试下。或是你先用高版本测试下。

我在本机用docker构建了一个3.1.5的sr环境,本机是2核8g内存,同样的表结构,同样的sql,同样的数据,本机3.1.5版本也耗时了7分钟多,附上本机测试的profile和explain costs:
本机测试profile.txt (37.2 KB)
本机测试sql_explain_costs.txt (4.2 KB)
11万多的数据7分钟的耗时也是蛮长的,是不是UDWF写的有问题,对Set序列化耗时吗?

这是UDWF打包后的jar包:
count-loop.jar (3.9 MB)
这是创建语句:
CREATE AGGREGATE FUNCTION count_loop_distinct(string)
RETURNS int
properties (
"analytic" = "true" ,
"symbol" = "com.marcpoint.bigdata.CountLoopDistinctUDWF" ,
"type" = "StarrocksJar" ,
"file" = "http://172.16.1.xx:80/jar/count-loop.jar"
);

我们先测试下,再给你答复

好的,感谢:pray:

这种窗口执行慢是正常的,这个窗口无界到当前值是个 N^2 的算法

所以这个是不是官方不提供这个窗口函数的原因 :joy:

hello,问一下,这个你们测试的咋样了,还是如@许秀不许秀 所说,本来就慢呢。

就是慢,这种自定义的UDAF,我们没法实现增量计算,每次移动一个位置,前面的数据都要全算下。

好的,了解了,感谢!

大佬,想问下自定义函数在哪里执行啊,我在mysql客户端执行报错呢
CREATE FUNCTION dw_dev.my_starrocks_first_udf(string)
RETURNS string
PROPERTIES (
“symbol” = “com.onion.etl.udf.StarrocksUDFJava”,
“type” = “StarrocksJar”,
“file” = “hdfs://user/hive/jar/xxx.jar”
)
报错信息:
image
我的自定义函数java代码:
package com.onion.etl.udf;

public class StarrocksUDFJava {

    public final String evaluate(String text) {
        if (text == null || text.isEmpty()) {
            return null;
        }
        String a = "starrocks_udf";
        return  a;
    }

}

这个URL应该还不支持HDFS