【详述】flink cdc 同步mysql数据至SR中flink任务内存占用异常
【背景】无
【业务影响】
【StarRocks版本】例如:2.2.4
【集群规模】例如:3fe+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:16C/64G/万兆
【附件】
flink cdc 从mysql向 sr同步数据;sr使用主键模型未使用分区,整库约500w数据,几张大表占70%数据量,发现flink任务需要十几G内存,而且Managed Memory 一直100%,mysql更改数据多的时段还容易oom,这种有处理思路吗?
是直接使用flinksql同步mysql到sr吗
data stream api
方便脱敏帖下代码么?怀疑是state使用姿势问题
代码如下 :
checkpoint 20s
// 加载数据源
val cdcSource = FlinkCDCProperty.apply(dataSyncConf, dateFormat = true, StartupOptions.initial())
val resultDS: DataStream[String] = env.fromSource[CDCWithSchema](cdcSource, WatermarkStrategy.noWatermarks(), sourceDbName)
.uid(s"$sourceDbName-uid")
.name(s"$sourceDbName-name")
.startNewChain()
.keyBy(x => x.table)
// 这里是处理表结构变更 自己实现的 跟数据同步无关
.map(AbstractTableSchemaChange.electStrategy[CDCWithSchema](dataSyncConf, tables).processFunctionWithSchemaChange)
.setParallelism(if (dataSyncConf.getSchemaChange) 1 else dataSyncConf.getThread)
.uid("SchemaChange-uid")
.name("SchemaChange-name")
.startNewChain()
//处理新增 修改删除代码
.process(new TableSplitProcessFunction(outputTags))
.uid("Process:TableSplit-uid")
.name("Process:TableSplit-name")
.filter(x => StringUtils.isNotBlank(x))
.uid("NotBlank-uid")
.name("NotBlank-name")
// 将所有表数据sink出去
outputTags.foreach({ case (tableName: String, tag: OutputTag[String]) =>
resultDS.getSideOutput(tag)
.map(x => x.replaceAll("[\\u0001-\\u0007\\u000b-\\u000c\\u000e-\\u001f]", ""))
.uid(s"ASCII-$tableName-uid")
.name(s"ASCII-$tableName-name")
.startNewChain()
// json格式
.addSink(new StarRocksSinkFuncV3(dataSyncConf, tableName).createSinkFun)
.name(s"$tableName-name")
.uid(s"$tableName-id")
})
env.execute(params.get("app.name"))
//-----------------------------TableSplitProcessFunction------------------------------
class TableSplitProcessFunction(tags: mutable.HashMap[String, OutputTag[String]]) extends ProcessFunction[CDCWithSchema, String] {
override def processElement(in: CDCWithSchema, context: ProcessFunction[CDCWithSchema, String]#Context, collector: Collector[String]): Unit = {
Option.apply(tags.get(in.table)).get match {
case Some(x) =>
val after = in.after
val before = in.before
in.operation match {
case READ | CREATE | UPDATE => after.put("__op", 0)
context.output(x, after.toJSONString)
case DELETE => before.put("__op", 1)
context.output(x, before.toJSONString)
case _ =>
}
case None =>
}
}
遇到相同的问题了
直接使用 file system state backend
使用 file system state backend还会有内存占用多的问题么?