flinkcdc 写入SR报错

【详述】flinkcdc 写入SR报错
【背景】使用flinksql从ob写入数据至SR报错
【业务影响】
【StarRocks版本】2.1
flink版本1.13.2
connector版本 flink-connector-starrocks-1.2.5_flink-1.13_2.11
【集群规模】3fe+3be(fe与be混部)
【表模型】主键模型
【导入或者导出方式】flinksql+Flinkcdc

flink报错简述
2023-01-06 19:32:17,364 ERROR com.starrocks.data.load.stream.DefaultStreamLoader [] - Stream load failed unknown, label : 2153cd15-2117-4754-8365-7d1e5834b0ec
com.starrocks.streamload.shade.com.alibaba.fastjson.JSONException: create asm deserializer error, com.starrocks.data.load.stream.StreamLoadResponse$StreamLoadResponseBody
at com.starrocks.streamload.shade.com.alibaba.fastjson.parser.ParserConfig.createJavaBeanDeserializer(ParserConfig.java:1082)

flink报错.txt (64.4 KB)

be节点相关日志
be相关日志.txt (52.1 KB)

sr建表语句
CREATE TABLE ob_hist_sql_plan_monitor (
ob_tenant_id bigint(20) NOT NULL COMMENT ‘OB的租户Id’,
ob_cluster_id bigint(20) NOT NULL COMMENT ‘OB的集群Id’,
cluster_name varchar(65535) NOT NULL COMMENT ‘OB的集群名称’,
ob_server_id bigint(20) NOT NULL COMMENT ‘OB的服务Id’,
plan_line_id bigint(20) NOT NULL COMMENT ‘算子行的id’,
request_id bigint(20) NOT NULL COMMENT ‘算子行的request id’,
trace_id varchar(65535) NOT NULL COMMENT ‘sql执行的tarce id’,
sample_time bigint(20) NOT NULL COMMENT ‘采样的时间’,
process_id bigint(20) NOT NULL COMMENT ‘算子执行的进程id’,
plan_operation varchar(65535) NOT NULL COMMENT ‘算子名称’,
plan_depth bigint(20) NOT NULL COMMENT ‘算子在计划树中的深度’,
output_rows bigint(20) NOT NULL COMMENT ‘算子吐行’,
first_refresh_time DATETIME NULL DEFAULT NULL COMMENT ‘算子开始监控的时间’,
last_refresh_time DATETIME NULL DEFAULT NULL COMMENT ‘算子结束监控的时间’,
first_change_time varchar(65535) DEFAULT NULL COMMENT ‘算子吐出首行数据时间’,
last_change_time varchar(65535) DEFAULT NULL COMMENT ‘算子吐出最后一行数据时间’,
rescan_count bigint(20) DEFAULT NULL COMMENT ‘算子被 rescan 的次数’,
otherstat_1_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_1的id’,
otherstat_1_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_1的值’,
otherstat_2_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_2的id’,
otherstat_2_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_2的值’,
otherstat_3_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_3的id’,
otherstat_3_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_3的值’,
otherstat_4_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_4的id’,
otherstat_4_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_4的值’,
otherstat_5_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_5的id’,
otherstat_5_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_5的值’,
otherstat_6_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_6的id’,
otherstat_6_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_6的值’,
otherstat_7_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_7的id’,
otherstat_7_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_7的值’,
otherstat_8_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_8的id’,
otherstat_8_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_8的值’,
otherstat_9_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_9的id’,
otherstat_9_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_9的值’,
otherstat_10_id bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_10的id’,
otherstat_10_value bigint(20) DEFAULT NULL COMMENT ‘hash算子的特异化信息otherstat_10的值’
) ENGINE=OLAP
PRIMARY KEY(ob_tenant_id, ob_cluster_id, cluster_name, ob_server_id, plan_line_id, request_id, trace_id,sample_time )
COMMENT “plan monitor采样”
PARTITION BY RANGE (sample_time) (partition DUMMY values less than (‘0’))
DISTRIBUTED BY HASH(ob_tenant_id,ob_cluster_id,cluster_name,ob_server_id) BUCKETS 30
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”
);

flink语句

CREATE TABLE str_ob_hist_sql_plan_monitor (
ob_tenant_id BIGINT NOT NULL COMMENT ‘OB的租户Id’,
ob_cluster_id BIGINT NOT NULL COMMENT ‘OB的集群Id’,
cluster_name STRING NOT NULL COMMENT ‘OB的集群名称’,
ob_server_id BIGINT NOT NULL COMMENT ‘OB的服务Id’,
plan_line_id BIGINT NOT NULL COMMENT ‘算子行的id’,
request_id BIGINT NOT NULL COMMENT ‘算子行的request id’,
trace_id STRING NOT NULL COMMENT ‘sql执行的tarce id’,
sample_time BIGINT NOT NULL COMMENT ‘采样的时间’,
process_id BIGINT NOT NULL COMMENT ‘算子执行的进程id’,
plan_operation STRING NOT NULL COMMENT ‘算子名称’,
plan_depth BIGINT NOT NULL COMMENT ‘算子在计划树中的深度’,
output_rows BIGINT NOT NULL COMMENT ‘算子吐行’,
first_refresh_time DATETIME NULL COMMENT ‘算子开始监控的时间’,
last_refresh_time DATETIME NULL COMMENT ‘算子结束监控的时间’,
first_change_time STRING COMMENT ‘算子吐出首行数据时间’,
last_change_time STRING COMMENT ‘算子吐出最后一行数据时间’,
rescan_count BIGINT COMMENT ‘算子被 rescan 的次数’,
otherstat_1_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_1的id’,
otherstat_1_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_1的值’,
otherstat_2_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_2的id’,
otherstat_2_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_2的值’,
otherstat_3_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_3的id’,
otherstat_3_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_3的值’,
otherstat_4_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_4的id’,
otherstat_4_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_4的值’,
otherstat_5_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_5的id’,
otherstat_5_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_5的值’,
otherstat_6_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_6的id’,
otherstat_6_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_6的值’,
otherstat_7_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_7的id’,
otherstat_7_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_7的值’,
otherstat_8_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_8的id’,
otherstat_8_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_8的值’,
otherstat_9_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_9的id’,
otherstat_9_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_9的值’,
otherstat_10_id BIGINT COMMENT ‘hash算子的特异化信息otherstat_10的id’,
otherstat_10_value BIGINT COMMENT ‘hash算子的特异化信息otherstat_10的值’,
PRIMARY KEY (ob_tenant_id, ob_cluster_id, cluster_name, ob_server_id, plan_line_id, request_id, trace_id,sample_time) NOT ENFORCED)

Trying to access closed classloader. 
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, 
you can disable this check with the configuration 'classloader.check-leaked-classloader'.

看起来是环境问题

1赞

可以将flink的配置项 classloader.check-leaked-classloader修改为false试下

1赞

麻烦请教下这个参数如何设置