【详述】用flinkcdc向starrocks写入数据,一行数据如果太多null,则报错写不进去;
示例数据:
{“op”:“r”,“after”:{“score”:5.00,“create_user_id”:515,“create_date_time”:1670941297000,“update_user_id”:515,“ac_template_detailed_id”:7,“ac_score_id”:1292,“id”:495,“update_date_time”:1670941297000},“source”:{“server_id”:0,“version”:“1.5.4.Final”,“file”:"",“connector”:“mysql”,“pos”:0,“name”:“mysql_binlog_source”,“row”:0,“ts_ms”:0,“snapshot”:“false”,“db”:“redmoonoa9”,“table”:“ac_score_detail”},“ts_ms”:1680330521183}
starrocks示例表:
CREATE TABLE if not exists ods_ac_score_detail_duplicate
(
op
varchar,
id
bigint(20) NOT NULL COMMENT ‘id’,
ac_template_detailed_id
bigint(20) DEFAULT NULL COMMENT ‘细项id’,
score
decimal(5,2) DEFAULT NULL COMMENT ‘评分’,
ac_score_id
bigint(20) DEFAULT NULL COMMENT ‘绩效评分Id’,
create_date_time
datetime DEFAULT NULL COMMENT ‘创建时间’,
create_user_id
bigint(20) DEFAULT NULL COMMENT ‘创建人’,
update_date_time
datetime DEFAULT NULL COMMENT ‘更新时间’,
update_user_id
bigint(20) DEFAULT NULL COMMENT ‘更新人’,
before_id
bigint(20) NOT NULL COMMENT ‘id’,
before_ac_template_detailed_id
bigint(20) DEFAULT NULL COMMENT ‘细项id’,
before_score
decimal(5,2) DEFAULT NULL COMMENT ‘评分’,
before_ac_score_id
bigint(20) DEFAULT NULL COMMENT ‘绩效评分Id’,
before_create_date_time
datetime DEFAULT NULL COMMENT ‘创建时间’,
before_create_user_id
bigint(20) DEFAULT NULL COMMENT ‘创建人’,
before_update_date_time
datetime DEFAULT NULL COMMENT ‘更新时间’,
before_update_user_id
bigint(20) DEFAULT NULL COMMENT ‘更新人’,
ts_ms
bigint,
ts_date
date
)
DUPLICATE KEY(op
)
DISTRIBUTED BY HASH(ts_date
) BUCKETS 366 properties(
“replication_num” = “2”
);
【目的】
将示例数据after的值写入ods_ac_score_detail_duplicate表的op,id,ac_template_detailed_id,score,ac_score_id,create_date_time,create_user_id,update_date_time,update_user_id,ts_ms列,如果数据op类型是u(更新操作),则将before的值写入表中有before前缀的列中
【报错信息】
33215 [Thread-8] ERROR com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor - Stream Load response:
{“Status”:“Fail”,“BeginTxnTimeMs”:101,“Message”:“too many filtered rows”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“061f0edd-7a68-445f-a4d5-079aa79d8d23”,“LoadBytes”:245396,“StreamLoadPlanTimeMs”:102,“NumberTotalRows”:984,“WriteDataTimeMs”:265,“TxnId”:13787,“LoadTimeMs”:469,“ErrorURL”:“http://172.30.16.26:18040/api/_load_error_log?file=error_log_5a4aad49f7900ad7_9dec92739f8027b7",“ReadDataTimeMs”:0,“NumberLoadedRows”:0,"NumberFilteredRows”:984}
33216 [Thread-8] WARN com.starrocks.connector.flink.manager.StarRocksSinkManager - Failed to flush batch data to StarRocks, retry times = 5
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:101,“Message”:“too many filtered rows”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“061f0edd-7a68-445f-a4d5-079aa79d8d23”,“LoadBytes”:245396,“StreamLoadPlanTimeMs”:102,“NumberTotalRows”:984,“WriteDataTimeMs”:265,“TxnId”:13787,“LoadTimeMs”:469,“ErrorURL”:“http://172.30.16.26:18040/api/_load_error_log?file=error_log_5a4aad49f7900ad7_9dec92739f8027b7",“ReadDataTimeMs”:0,“NumberLoadedRows”:0,"NumberFilteredRows”:984}
at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:89)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:258)
at com.starrocks.connector.flink.manager.StarRocksSinkManager.access$000(StarRocksSinkManager.java:52)
at com.starrocks.connector.flink.manager.StarRocksSinkManager$1.run(StarRocksSinkManager.java:120)
at java.lang.Thread.run(Thread.java:748)
【其他操作】
在starrocks客户端将示例数据通过insert into的方式写入表又不会报错
或者
在flinkcdc程序中将before前缀的列进行赋值后写入表中也不会报错,但是我希望达到的效果是:如果数据op的类型是r(历史同步)或者c(新增)的话,表中before前缀的列的值都是null,如果数据op的类型是u(更新)的话,表中其他列的值是更新后的值,before前缀的列的值则是更改前的值
【StarRocks版本】2.5.2
【集群规模】3fe(1 follower+2observer)+3be(fe与be混部)
【联系方式】392388393@qq.com
【附件:flink的代码】
public class Synchronize_pro_mysql_ac_score_detail_demo {
public static void main(String[] args) throws InterruptedException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<JSONObject> detailStream = readdetail(env);
SingleOutputStreamOperator<JSONObject> detailFilterSteam = filterdetail(detailStream);
writeToStarRocksdetail(detailFilterSteam);
try {
env.execute("Synchronize_pro_mysql_detail_demo");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void writeToStarRocksdetail(SingleOutputStreamOperator<JSONObject> detailStream) {
detailStream.map(bean -> {
JSONObject after = bean.getJSONObject("after");
/*String jsonStr = JSON.toJSONString(after);
Mysql_detail detailBean = JSON.parseObject(jsonStr, Mysql_detail.class);*/
after.put("op",bean.getString("op"));
after.put("ts_ms",bean.getLong("ts_ms"));
after.put("ts_date",ParseDateTime.longToDate(bean.getLong("ts_ms")));
if (after.getLong("create_date_time") != null){
after.put("create_date_time",ParseDateTime.longToDateTime(after.getLong("create_date_time")-8*3600*1000));
}
if (after.getLong("update_date_time") != null){
after.put("update_date_time",ParseDateTime.longToDateTime(after.getLong("update_date_time")-8*3600*1000));
}
if ("u".equals(bean.getString("op"))) {
JSONObject before = bean.getJSONObject("before");
if (before.getLong("create_date_time") != null) {
after.put("before_create_date_time", ParseDateTime.longToDateTime(before.getLong("create_date_time") - 8 * 3600 * 1000));
}
if (before.getLong("update_date_time") != null) {
after.put("before_update_date_time", ParseDateTime.longToDateTime(before.getLong("update_date_time") - 8 * 3600 * 1000));
}
if (before.getLong("id") != null) {
after.put("before_id", before.getLong("id"));
}
if (before.getLong("ac_template_detailed_id") != null) {
after.put("before_ac_template_detailed_id", before.getLong("ac_template_detailed_id"));
}
if (before.getDouble("score") != null) {
after.put("before_score", before.getDouble("score"));
}
if (before.getLong("ac_score_id") != null) {
after.put("before_ac_score_id", before.getLong("ac_score_id"));
}
if (before.getLong("create_user_id") != null) {
after.put("before_create_user_id", before.getLong("create_user_id"));
}
if (before.getLong("update_user_id") != null) {
after.put("before_update_user_id", before.getLong("update_user_id"));
}
}else{
after.put("before_create_date_time", ParseDateTime.longToDateTime(after.getLong("create_date_time")));
after.put("before_update_date_time", ParseDateTime.longToDateTime(after.getLong("update_date_time")));
after.put("before_id", after.getLong("id"));
after.put("before_ac_template_detailed_id", after.getLong("ac_template_detailed_id"));
after.put("before_score", after.getDouble("score"));
after.put("before_ac_score_id", after.getLong("ac_score_id"));
after.put("before_create_user_id", after.getLong("create_user_id"));
after.put("before_update_user_id", after.getLong("update_user_id"));
}
String s = JSON.toJSONString(after);
return s;
}).uid("04_map_detail").name("04_map_detail")
.addSink(FlinkSinkUtil.getStarRocksSink("test_house","ods_ac_score_detail_duplicate"));
}
private static SingleOutputStreamOperator<JSONObject> filterdetail(SingleOutputStreamOperator<JSONObject> odsData) {
return odsData.filter(obj -> {
JSONObject after = obj.getJSONObject("after");
return after != null;
});
}
private static SingleOutputStreamOperator<JSONObject> readdetail(StreamExecutionEnvironment env) {
Map config = new HashMap();
config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(Constant.MYSQL_PRO__URL)
.port(Constant.MYSQL_PRO_PORT)
.databaseList(Constant.MYSQL_PRO__DATABASE)
.tableList(Constant.MYSQL_PRO__DATABASE + ".ac_score_detail")
.username(Constant.MYSQL_PRO_USERNAME)
.password(Constant.MYSQL_PRO_PASSWORD)
.startupOptions(StartupOptions.initial())
.deserializer(jdd)
.build();
return env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.map(json -> {
JSONObject obj = JSON.parseObject(json);
return obj;
});
}
}
//如果将else注释就报上述错误
else{
after.put(“before_create_date_time”, ParseDateTime.longToDateTime(after.getLong(“create_date_time”)));
after.put(“before_update_date_time”, ParseDateTime.longToDateTime(after.getLong(“update_date_time”)));
after.put(“before_id”, after.getLong(“id”));
after.put(“before_ac_template_detailed_id”, after.getLong(“ac_template_detailed_id”));
after.put(“before_score”, after.getDouble(“score”));
after.put(“before_ac_score_id”, after.getLong(“ac_score_id”));
after.put(“before_create_user_id”, after.getLong(“create_user_id”));
after.put(“before_update_user_id”, after.getLong(“update_user_id”));
}