为了更快的定位您的问题,请提供以下信息,谢谢
【详述】
按照官网示例:https://docs.starrocks.io/docs/sql-reference/sql-functions/JAVA_UDF/#step-3-compile-a-udf 中的com.starrocks.udf.sample.SumInt 生成了udaf代码:
代码:
public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() {
return 4;
}
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public final void update(State state, Integer val) {
if (val != null) {
state.counter += val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
int capacity = buff.capacity();
int limit = buff.limit();
int remaining = buff.remaining();
int counter = state.counter;
System.out.println("serialize,capacity:" + capacity + ",limit:" + limit + ",remain:" + remaining + ",counter:" + counter);
buff.putInt(counter);
}
public void merge(State state, java.nio.ByteBuffer buff) {
int capacity = buff.capacity();
int limit = buff.limit();
int remaining = buff.remaining();
int val = buff.getInt();
System.out.println("merge,capacity:" + capacity + ",limit:" + limit + ",remain:" + remaining + ",counter:" + val);
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
}
上传udf:
drop GLOBAL function MY_SUM_INT(INT);
CREATE GLOBAL AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES
(
“symbol” = “com.xxx.buffetbi.example.SumInt”,
“type” = “StarrocksJar”,
“file” = “http://xxx.jar”
);
CREATE TABLE orders
(
o_orderkey
int(11) NOT NULL COMMENT “”,
o_orderdate
date NOT NULL COMMENT “”,
o_custkey
int(11) NOT NULL COMMENT “”,
o_orderstatus
varchar(1) NOT NULL COMMENT “”,
o_totalprice
decimal(15, 2) NOT NULL COMMENT “”,
o_orderpriority
varchar(15) NOT NULL COMMENT “”,
o_clerk
varchar(15) NOT NULL COMMENT “”,
o_shippriority
int(11) NOT NULL COMMENT “”,
o_comment
varchar(79) NOT NULL COMMENT “”
) ENGINE=OLAP
DUPLICATE KEY(o_orderkey
, o_orderdate
)
COMMENT “OLAP”
DISTRIBUTED BY HASH(o_orderkey
) BUCKETS 96
PROPERTIES (
“replication_num” = “1”,
“colocate_with” = “group_tpch_100”,
“in_memory” = “false”,
“enable_persistent_index” = “false”,
“replicated_storage” = “true”,
“compression” = “LZ4”
);
select MY_SUM_INT(o_orderkey), sum(o_orderkey), o_custkey from orders group by o_custkey limit 100;
通过自己的日志打印,发现merge的时候会公用一个java.nio.ByteBuffer buff,导致数据错乱,
应该是batch merge时触发的
【业务影响】
【是否存算分离】
【StarRocks版本】例如:1.18.2
【集群规模】例如:3fe(1 follower+2observer)+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【联系方式】为了在解决问题过程中能及时联系到您获取一些日志信息,请补充下您的联系方式,例如:社区群4-小李或者邮箱,谢谢
【附件】