flink 导入 starrocks【3.2.9】 报No database selected

【详述】利用flink-connector-starrocks,版本:flink-connector-starrocks-1.2.8_flink-1.17.jar
【背景】在sql-client 窗口 通过flink sql 从mysql导入starroks数据
【业务影响】无法通过flink导入数据
【StarRocks版本】3.2.9
【导入或者导出方式】Flink 1.17
希望大佬指导解决问题,不胜感激!
【在starrocks报错截图如下:】


具体堆栈:
[TransactionLoadAction.executeWithoutPassword():177] com.starrocks.common.UserException: No database selected.
at com.starrocks.http.rest.TransactionLoadAction.executeTransaction(TransactionLoadAction.java:211)
at com.starrocks.http.rest.TransactionLoadAction.executeWithoutPassword(TransactionLoadAction.java:166)
at com.starrocks.http.rest.RestBaseAction.execute(RestBaseAction.java:134)
at com.starrocks.http.rest.RestBaseAction.handleRequest(RestBaseAction.java:80)
at com.starrocks.http.HttpServerHandler.channelRead(HttpServerHandler.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)

【flink sql】
CREATE DATABASE IF NOT EXISTS default_catalog.advert-center-biz;

CREATE TABLE IF NOT EXISTS default_catalog.advert-center-biz.db_biz_advert_upload_event_src (
id BIGINT NOT NULL,
app_id BIGINT NOT NULL,
tenant_id BIGINT NOT NULL,
uid BIGINT NOT NULL,
business_id STRING NOT NULL,
client_id STRING NOT NULL,
ad_platform INT NOT NULL,
event STRING NOT NULL,
ad_event STRING NOT NULL,
pay_amount BIGINT NOT NULL,
need_static TINYINT NOT NULL,
source TINYINT NOT NULL,
reattribution_id BIGINT NOT NULL,
track_uniq_id BIGINT NULL,
upload_status TINYINT NOT NULL,
upload_failed_reason STRING NULL,
created_time TIMESTAMP NULL,
PRIMARY KEY(id)
NOT ENFORCED
) with (
‘username’ = ‘root’,
‘password’ = ‘888888’,
‘database-name’ = ‘advert-center-biz’,
‘table-name’ = ‘db_biz_advert_upload_event’,
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘rm-888888.mysql.rds.aliyuncs.com’,
‘port’ = ‘3306’
);

CREATE TABLE IF NOT EXISTS default_catalog.advert-center-biz.db_biz_advert_upload_event_sink (
id BIGINT NOT NULL,
app_id BIGINT NOT NULL,
tenant_id BIGINT NOT NULL,
uid BIGINT NOT NULL,
business_id STRING NOT NULL,
client_id STRING NOT NULL,
ad_platform INT NOT NULL,
event STRING NOT NULL,
ad_event STRING NOT NULL,
pay_amount BIGINT NOT NULL,
need_static TINYINT NOT NULL,
source TINYINT NOT NULL,
reattribution_id BIGINT NOT NULL,
track_uniq_id BIGINT NULL,
upload_status TINYINT NOT NULL,
upload_failed_reason STRING NULL,
created_time TIMESTAMP NULL,
PRIMARY KEY(app_id, uid, created_time,id)
NOT ENFORCED
) with (
‘jdbc-url’ = ‘jdbc:mysql://127.0.0.1:9031’,
‘password’ = ‘888888’,
‘sink.max-retries’ = ‘10’,
‘table-name’ = ‘advert_upload_event_tbl’,
‘load-url’ = ‘127.0.0.1:8031’,
‘username’ = ‘root’,
‘database-name’ = ‘test_adcenter_db’,
‘sink.buffer-flush.interval-ms’ = ‘5000’,
‘connector’ = ‘starrocks’,
‘sink.properties.format’ = ‘json’,
‘sink.properties.strip_outer_array’ = ‘true’
);

EXECUTE STATEMENT SET
BEGIN
INSERT INTO default_catalog.advert-center-biz.db_biz_advert_upload_event_sink
SELECT * FROM default_catalog.advert-center-biz.db_biz_advert_upload_event_src;
END;

解决了吗?我也遇到了