Flink-CDC Stream API同步支持表的初始化么

为了更快的定位您的问题,请提供以下信息,谢谢
【详述】Flink-CDC 用Stream API方式同步达梦库数据库到StarRocks上,不在StarRocks上建表会报错,这种方式支持表的初始化么 如果支持 我需要配置什么参数支持初始化操作
【背景】




【业务影响】
【是否存算分离】
【StarRocks版本】例如:1.2.10
【集群规模】例如:3fe(1 follower+2observer)+3be(fe与be混部)

  • 同步报错:
    Stream load completed, label : flink-3ba52f13-fd61-44a2-989e-c37361e7e5c2, database : db_test, table : ACT_EVT_LOG, body : {
    “TxnId”: -1,
    “Label”: “flink-3ba52f13-fd61-44a2-989e-c37361e7e5c2”,
    “Status”: “Fail”,
    “Message”: “unknown table “db_test.ACT_EVT_LOG””,
    “NumberTotalRows”: 0,
    “NumberLoadedRows”: 0,
    “NumberFilteredRows”: 0,
    “NumberUnselectedRows”: 0,
    “LoadBytes”: 0,
    “LoadTimeMs”: 0,
    “BeginTxnTimeMs”: 0,
    “StreamLoadPlanTimeMs”: 0,
    “ReadDataTimeMs”: 0,
    “WriteDataTimeMs”: 0,
    “CommitAndPublishTimeMs”: 0
    }
    14:32:28.496[ERROR]Exception happens when sending data, thread: I/O client dispatch - cae61f15-13b2-490e-8e3a-e8259049943d
    com.starrocks.data.load.stream.exception.StreamLoadFailException: Stream load failed because of error, db: db_test, table: ACT_EVT_LOG, label: flink-3ba52f13-fd61-44a2-989e-c37361e7e5c2,
    responseBody: {
    “TxnId”: -1,
    “Label”: “flink-3ba52f13-fd61-44a2-989e-c37361e7e5c2”,
    “Status”: “Fail”,
    “Message”: “unknown table “db_test.ACT_EVT_LOG””,
    “NumberTotalRows”: 0,
    “NumberLoadedRows”: 0,
    “NumberFilteredRows”: 0,
    “NumberUnselectedRows”: 0,
    “LoadBytes”: 0,
    “LoadTimeMs”: 0,
    “BeginTxnTimeMs”: 0,
    “StreamLoadPlanTimeMs”: 0,
    “ReadDataTimeMs”: 0,
    “WriteDataTimeMs”: 0,
    “CommitAndPublishTimeMs”: 0
    }
    errorLog: null
    at com.starrocks.data.load.stream.DefaultStreamLoader.sendToSR(DefaultStreamLoader.java:349) ~[flink-connector-starrocks-1.2.10_flink-1.17.jar:?]
    at com.starrocks.data.load.stream.DefaultStreamLoader.lambda$send$3(DefaultStreamLoader.java:176) ~[flink-connector-starrocks-1.2.10_flink-1.17.jar:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_212]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_212]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]