用flinkcdc向starrocks写入数据,一行数据如果太多null,则报错写不进去

【详述】用flinkcdc向starrocks写入数据,一行数据如果太多null,则报错写不进去;
示例数据:
{“op”:“r”,“after”:{“score”:5.00,“create_user_id”:515,“create_date_time”:1670941297000,“update_user_id”:515,“ac_template_detailed_id”:7,“ac_score_id”:1292,“id”:495,“update_date_time”:1670941297000},“source”:{“server_id”:0,“version”:“1.5.4.Final”,“file”:"",“connector”:“mysql”,“pos”:0,“name”:“mysql_binlog_source”,“row”:0,“ts_ms”:0,“snapshot”:“false”,“db”:“redmoonoa9”,“table”:“ac_score_detail”},“ts_ms”:1680330521183}

starrocks示例表:
CREATE TABLE if not exists ods_ac_score_detail_duplicate (
op varchar,
id bigint(20) NOT NULL COMMENT ‘id’,
ac_template_detailed_id bigint(20) DEFAULT NULL COMMENT ‘细项id’,
score decimal(5,2) DEFAULT NULL COMMENT ‘评分’,
ac_score_id bigint(20) DEFAULT NULL COMMENT ‘绩效评分Id’,
create_date_time datetime DEFAULT NULL COMMENT ‘创建时间’,
create_user_id bigint(20) DEFAULT NULL COMMENT ‘创建人’,
update_date_time datetime DEFAULT NULL COMMENT ‘更新时间’,
update_user_id bigint(20) DEFAULT NULL COMMENT ‘更新人’,
before_id bigint(20) NOT NULL COMMENT ‘id’,
before_ac_template_detailed_id bigint(20) DEFAULT NULL COMMENT ‘细项id’,
before_score decimal(5,2) DEFAULT NULL COMMENT ‘评分’,
before_ac_score_id bigint(20) DEFAULT NULL COMMENT ‘绩效评分Id’,
before_create_date_time datetime DEFAULT NULL COMMENT ‘创建时间’,
before_create_user_id bigint(20) DEFAULT NULL COMMENT ‘创建人’,
before_update_date_time datetime DEFAULT NULL COMMENT ‘更新时间’,
before_update_user_id bigint(20) DEFAULT NULL COMMENT ‘更新人’,
ts_ms bigint,
ts_date date
)
DUPLICATE KEY(op)
DISTRIBUTED BY HASH(ts_date) BUCKETS 366 properties(
“replication_num” = “2”
);

【目的】
将示例数据after的值写入ods_ac_score_detail_duplicate表的op,id,ac_template_detailed_id,score,ac_score_id,create_date_time,create_user_id,update_date_time,update_user_id,ts_ms列,如果数据op类型是u(更新操作),则将before的值写入表中有before前缀的列中
【报错信息】
33215 [Thread-8] ERROR com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor - Stream Load response:
{“Status”:“Fail”,“BeginTxnTimeMs”:101,“Message”:“too many filtered rows”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“061f0edd-7a68-445f-a4d5-079aa79d8d23”,“LoadBytes”:245396,“StreamLoadPlanTimeMs”:102,“NumberTotalRows”:984,“WriteDataTimeMs”:265,“TxnId”:13787,“LoadTimeMs”:469,“ErrorURL”:“http://172.30.16.26:18040/api/_load_error_log?file=error_log_5a4aad49f7900ad7_9dec92739f8027b7",“ReadDataTimeMs”:0,“NumberLoadedRows”:0,"NumberFilteredRows”:984}
33216 [Thread-8] WARN com.starrocks.connector.flink.manager.StarRocksSinkManager - Failed to flush batch data to StarRocks, retry times = 5
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:101,“Message”:“too many filtered rows”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“061f0edd-7a68-445f-a4d5-079aa79d8d23”,“LoadBytes”:245396,“StreamLoadPlanTimeMs”:102,“NumberTotalRows”:984,“WriteDataTimeMs”:265,“TxnId”:13787,“LoadTimeMs”:469,“ErrorURL”:“http://172.30.16.26:18040/api/_load_error_log?file=error_log_5a4aad49f7900ad7_9dec92739f8027b7",“ReadDataTimeMs”:0,“NumberLoadedRows”:0,"NumberFilteredRows”:984}
at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:89)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:258)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.access$000(StarRocksSinkManager.java:52)
at com.starrocks.connector.flink.manager.StarRocksSinkManager$1.run(StarRocksSinkManager.java:120)
at java.lang.Thread.run(Thread.java:748)

【其他操作】
在starrocks客户端将示例数据通过insert into的方式写入表又不会报错
或者
在flinkcdc程序中将before前缀的列进行赋值后写入表中也不会报错,但是我希望达到的效果是:如果数据op的类型是r(历史同步)或者c(新增)的话,表中before前缀的列的值都是null,如果数据op的类型是u(更新)的话,表中其他列的值是更新后的值,before前缀的列的值则是更改前的值
【StarRocks版本】2.5.2
【集群规模】3fe(1 follower+2observer)+3be(fe与be混部)
【联系方式】392388393@qq.com
【附件:flink的代码】
public class Synchronize_pro_mysql_ac_score_detail_demo {
public static void main(String[] args) throws InterruptedException {

   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SingleOutputStreamOperator<JSONObject> detailStream = readdetail(env);
    SingleOutputStreamOperator<JSONObject> detailFilterSteam = filterdetail(detailStream);
    writeToStarRocksdetail(detailFilterSteam);


    try {
        env.execute("Synchronize_pro_mysql_detail_demo");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

private static void writeToStarRocksdetail(SingleOutputStreamOperator<JSONObject> detailStream) {
    detailStream.map(bean -> {
        JSONObject after = bean.getJSONObject("after");
        /*String jsonStr = JSON.toJSONString(after);
        Mysql_detail detailBean = JSON.parseObject(jsonStr, Mysql_detail.class);*/
        after.put("op",bean.getString("op"));
        after.put("ts_ms",bean.getLong("ts_ms"));
        after.put("ts_date",ParseDateTime.longToDate(bean.getLong("ts_ms")));

        if (after.getLong("create_date_time") != null){
            after.put("create_date_time",ParseDateTime.longToDateTime(after.getLong("create_date_time")-8*3600*1000));
        }
        if (after.getLong("update_date_time") != null){
            after.put("update_date_time",ParseDateTime.longToDateTime(after.getLong("update_date_time")-8*3600*1000));
        }
        if ("u".equals(bean.getString("op"))) {
            JSONObject before = bean.getJSONObject("before");
            if (before.getLong("create_date_time") != null) {
                after.put("before_create_date_time", ParseDateTime.longToDateTime(before.getLong("create_date_time") - 8 * 3600 * 1000));
            }
            if (before.getLong("update_date_time") != null) {
                after.put("before_update_date_time", ParseDateTime.longToDateTime(before.getLong("update_date_time") - 8 * 3600 * 1000));
            }
            if (before.getLong("id") != null) {
                after.put("before_id", before.getLong("id"));
            }
            if (before.getLong("ac_template_detailed_id") != null) {
                after.put("before_ac_template_detailed_id", before.getLong("ac_template_detailed_id"));
            }
            if (before.getDouble("score") != null) {
                after.put("before_score", before.getDouble("score"));
            }
            if (before.getLong("ac_score_id") != null) {
                after.put("before_ac_score_id", before.getLong("ac_score_id"));
            }
            if (before.getLong("create_user_id") != null) {
                after.put("before_create_user_id", before.getLong("create_user_id"));
            }
            if (before.getLong("update_user_id") != null) {
                after.put("before_update_user_id", before.getLong("update_user_id"));
            }
        }else{
            after.put("before_create_date_time", ParseDateTime.longToDateTime(after.getLong("create_date_time")));
            after.put("before_update_date_time", ParseDateTime.longToDateTime(after.getLong("update_date_time")));
            after.put("before_id", after.getLong("id"));
            after.put("before_ac_template_detailed_id", after.getLong("ac_template_detailed_id"));
            after.put("before_score", after.getDouble("score"));
            after.put("before_ac_score_id", after.getLong("ac_score_id"));
            after.put("before_create_user_id", after.getLong("create_user_id"));
            after.put("before_update_user_id", after.getLong("update_user_id"));
        }
        String s = JSON.toJSONString(after);
        return s;
    }).uid("04_map_detail").name("04_map_detail")
            .addSink(FlinkSinkUtil.getStarRocksSink("test_house","ods_ac_score_detail_duplicate"));
}

private static SingleOutputStreamOperator<JSONObject> filterdetail(SingleOutputStreamOperator<JSONObject> odsData) {
    return odsData.filter(obj -> {
        JSONObject after = obj.getJSONObject("after");
        return after != null;
    });
}

private static SingleOutputStreamOperator<JSONObject> readdetail(StreamExecutionEnvironment env) {
    Map config = new HashMap();
    config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
    JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config);
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname(Constant.MYSQL_PRO__URL)
            .port(Constant.MYSQL_PRO_PORT)
            .databaseList(Constant.MYSQL_PRO__DATABASE)
            .tableList(Constant.MYSQL_PRO__DATABASE + ".ac_score_detail")
            .username(Constant.MYSQL_PRO_USERNAME)
            .password(Constant.MYSQL_PRO_PASSWORD)
            .startupOptions(StartupOptions.initial())
            .deserializer(jdd)
            .build();

    return env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            .map(json -> {
                JSONObject obj = JSON.parseObject(json);
                return obj;
            });
}

}

//如果将else注释就报上述错误
else{
after.put(“before_create_date_time”, ParseDateTime.longToDateTime(after.getLong(“create_date_time”)));
after.put(“before_update_date_time”, ParseDateTime.longToDateTime(after.getLong(“update_date_time”)));
after.put(“before_id”, after.getLong(“id”));
after.put(“before_ac_template_detailed_id”, after.getLong(“ac_template_detailed_id”));
after.put(“before_score”, after.getDouble(“score”));
after.put(“before_ac_score_id”, after.getLong(“ac_score_id”));
after.put(“before_create_user_id”, after.getLong(“create_user_id”));
after.put(“before_update_user_id”, after.getLong(“update_user_id”));
}

报错的时候帮忙执行下curl ErrorURL看下具体的报错

没看懂什么意思,能再说的具体一点吗

出现该报错的时候,有error的信息,里面有个errorurl

,然后你访问一下这个url看下具体的信息

只有提问那里有报错信息

我通过在starrocks建表时处理了,以此来解决这个问题