flink-starrocks-connector中,如何使用stream load的where语句,过滤掉某些字段为null的记录?
比如id和name字段值都不能为空
.withProperty(“sink.properties.where”, “id is not null”);这样写不起作用
现在导入报啥错吗?
starrocks表不允许为空的字段,导入的记录出现了为空的,出现了异常:streamLoadErrorLog":"Error: NULL value in non-nullable column。使用.withProperty(“sink.properties.where”, “id is not null”);似乎不起作用
sink的配置和建表语句发下
Starrocks版本:2.3
建表:
CREATE TABLE t_web_click
(
click_date
date NOT NULL COMMENT “点击时间”,
url
varchar(512) NOT NULL COMMENT “链接”,
type
tinyint(4) NOT NULL COMMENT “类型”,
user
varchar(128) NOT NULL COMMENT “IP”,
click_times
int(11) SUM NOT NULL COMMENT “点击数量”,
process_time
datetime MAX NOT NULL COMMENT “处理时间”
) ENGINE=OLAP
AGGREGATE KEY(click_date
, url
, type
, user
)
COMMENT “mpp链接点击信息表”
PARTITION BY RANGE(click_date
)
(PARTITION p20221211 VALUES [(‘0000-01-01’), (‘2022-12-12’)),
PARTITION p20221212 VALUES [(‘2022-12-12’), (‘2022-12-13’)),
PARTITION p20221213 VALUES [(‘2022-12-13’), (‘2022-12-14’)),
PARTITION p20221214 VALUES [(‘2022-12-14’), (‘2022-12-15’)),
PARTITION p20221215 VALUES [(‘2022-12-15’), (‘2022-12-16’)),
PARTITION p20221216 VALUES [(‘2022-12-16’), (‘2022-12-17’)),
PARTITION p20221217 VALUES [(‘2022-12-17’), (‘2022-12-18’)),
PARTITION p20221218 VALUES [(‘2022-12-18’), (‘2022-12-19’)),
PARTITION p20221219 VALUES [(‘2022-12-19’), (‘2022-12-20’)),
PARTITION p20221220 VALUES [(‘2022-12-20’), (‘2022-12-21’)),
PARTITION p20221221 VALUES [(‘2022-12-21’), (‘2022-12-22’)),
PARTITION p20221222 VALUES [(‘2022-12-22’), (‘2022-12-23’)),
PARTITION p20221223 VALUES [(‘2022-12-23’), (‘2022-12-24’)),
PARTITION p20221224 VALUES [(‘2022-12-24’), (‘2022-12-25’)),
PARTITION p20221225 VALUES [(‘2022-12-25’), (‘2022-12-26’)),
PARTITION p20221226 VALUES [(‘2022-12-26’), (‘2022-12-27’)),
PARTITION p20221227 VALUES [(‘2022-12-27’), (‘2022-12-28’)),
PARTITION p20221228 VALUES [(‘2022-12-28’), (‘2022-12-29’)),
PARTITION p20221229 VALUES [(‘2022-12-29’), (‘2022-12-30’)),
PARTITION p20221230 VALUES [(‘2022-12-30’), (‘2022-12-31’)))
DISTRIBUTED BY HASH(click_date
, url
) BUCKETS 144
PROPERTIES (
“replication_num” = “1”,
“dynamic_partition.enable” = “true”,
“dynamic_partition.time_unit” = “DAY”,
“dynamic_partition.time_zone” = “America/New_York”,
“dynamic_partition.start” = “-366”,
“dynamic_partition.end” = “3”,
“dynamic_partition.prefix” = “p”,
“dynamic_partition.buckets” = “144”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”
);
sink配置
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, “”)
.withProperty(“load-url”, “”)
.withProperty(“username”, “”)
.withProperty(“password”, “”)
.withProperty(“table-name”,"")
.withProperty(“database-name”, “”)
.withProperty(“sink.properties.format”, “json”)
.withProperty(“sink.properties.strip_outer_array”, “true”)
.withProperty(“sink.semantic”, “exactly-once”)
.withProperty(“sink.properties.ignore_json_size”, “true”)
.withProperty(“sink.properties.where”, “url is not null”)
.build(),
把完整的代码和报错发下,我这边测试没有问题
异常信息:
[]-[2022-12-28 14:18:36.989] INFO runtime.checkpoint.CheckpointCoordinator : Triggering checkpoint 10 (type=CheckpointType{name=‘Checkpoint’, sharingFilesStrategy=FORWARD_BACKWARD}) @ 1672208316989 for job 0b05f4be0b45eecbcbee910e9ca1f9de.
[]-[2022-12-28 14:18:45.766] INFO runtime.checkpoint.CheckpointCoordinator : Completed checkpoint 10 for job 0b05f4be0b45eecbcbee910e9ca1f9de (83344 bytes, checkpointDuration=8775 ms, finalizationTime=2 ms).
[]-[2022-12-28 14:18:45.767] INFO ime.source.coordinator.SourceCoordinator : Marking checkpoint 10 as completed for source Source: kafka数据.
[]-[2022-12-28 14:19:05.954] INFO runtime.checkpoint.CheckpointCoordinator : Triggering checkpoint 11 (type=CheckpointType{name=‘Checkpoint’, sharingFilesStrategy=FORWARD_BACKWARD}) @ 1672208345953 for job 0b05f4be0b45eecbcbee910e9ca1f9de.
[]-[2022-12-28 14:19:05.957] INFO ector.flink.manager.StarRocksSinkManager : Async stream load: db[dap] table[t_web_click] rows[1] bytes[108] label[c38ed0d2-7992-4871-a3e0-4e3fc5a08e69].
[]-[2022-12-28 14:19:05.987] INFO flink.manager.StarRocksStreamLoadVisitor : Start to join batch data: label[c38ed0d2-7992-4871-a3e0-4e3fc5a08e69].
[]-[2022-12-28 14:19:05.988] INFO flink.manager.StarRocksStreamLoadVisitor : Executing stream load to: ‘http://node1:8030/api/dap/t_web_click/_stream_load’, size: ‘110’, thread: 330
[]-[2022-12-28 14:19:08.820] WARN ector.flink.manager.StarRocksSinkManager : Failed to flush batch data to StarRocks, retry times = 0
com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:1,“Message”:“too many filtered rows”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“c38ed0d2-7992-4871-a3e0-4e3fc5a08e69”,“LoadBytes”:110,“StreamLoadPutTimeMs”:410,“NumberTotalRows”:1,“WriteDataTimeMs”:1112,“TxnId”:21021,“LoadTimeMs”:1525,“ErrorURL”:“http://192.168.10.101:8040/api/_load_error_log?file=error_log_934d13cd24535379_73a05c5c45b3a88c",“ReadDataTimeMs”:0,“NumberLoadedRows”:0,"NumberFilteredRows”:1}
{“streamLoadErrorLog”:“Error: NULL value in non-nullable column ‘url’. Row: [‘2022-12-27’, NULL, 2, ‘221.176.33.136’, 1, ‘20221228141836’]\n”}
sink设置
StarRocksSink.sink(
TableSchema.builder()
.field(“click_date”, DataTypes.DATE())
.field(“url”, DataTypes.VARCHAR(512))
.field(“type”, DataTypes.TINYINT())
.field(“user”, DataTypes.VARCHAR(64))
.field(“click_times”, DataTypes.INT())
.field(“process_time”, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.build(),
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, parameterTool.get(“StarRocks.dbJdbcUrl”))
.withProperty(“load-url”, parameterTool.get(“StarRocks.dbLoadUrl”))
.withProperty(“username”, parameterTool.get(“StarRocks.dbUserName”))
.withProperty(“password”, parameterTool.get(“StarRocks.dbPassWord”))
.withProperty(“table-name”, parameterTool.get(“StarRocks.webClick.tbName”, “t_web_click”))
.withProperty(“database-name”, parameterTool.get(“StarRocks.dbName”))
.withProperty(“sink.properties.format”, “json”)
.withProperty(“sink.properties.strip_outer_array”, “true”)
.withProperty(“sink.semantic”, “exactly-once”)
.withProperty(“sink.properties.ignore_json_size”, “true”)
.withProperty(“sink.properties.where”, “url is not null”)
.build(),
(slots, maapClick) -> {
int i = 0;
slots[i++] = maapClick.getClickDate();
slots[i++] = maapClick.getUrl();
slots[i++] = maapClick.getType();
slots[i++] = maapClick.getUser();
slots[i++] = maapClick.getClickTimes();
slots[i++] = maapClick.getProcessTime();
}
)
看下是不是url越界了,超过定义的varchar(512)了?
starrocks表中url定义为varchar(65533)看看
好的;应该是这个问题,之前遇到过越界判断为null的问题
但是在我自己环境测,手动把url设置为null了之后,再addSink,也不行
报错还是url字段不能为null吗?
是的;跟我贴的异常日志是一样的
手动设置的是null还是空字符串呀?
手动设置null
我按照下面方式设置没有一点问题呀,把你的代码上传看看?
CREATE TABLE `users1` (
`user_id` bigint(20) NOT NULL COMMENT "",
`name` varchar(65533) NOT NULL COMMENT "",
`pv` int sum
) ENGINE=OLAP
AGGREGATE KEY(`user_id`,`name`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
Bean.java (5.4 KB)