flink的撤回流写入starrocks

【详述】问题详细描述
【背景】做过哪些操作?
【业务影响】
【StarRocks版本】例如:2.3.4
【集群规模】例如:1fe+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,例如:48C/64G/万兆
【表模型】例如:主键模型
【导入或者导出方式】例如:Flink
【联系方式】为了在解决问题过程中能及时联系到您获取一些日志信息,请补充下您的联系方式,例如:2760787474@qq.com,谢谢
【附件】
image

左流 左关联 右流
左流一个id=1数据先来,发现右流没有数据,写入starrock,但是主键增加了一个前缀是/n(换行符),变为/n1
后面右流来数据了,左流关联右流,再写入starrocks,starrocks原先主键是/n1,变为1,同时数据补全了

请问下使用的flink,flink-connector版本分别是多少?另外能发下写入starrocks的代码吗

create table tmp.bu_order_doris (
l_id string ,
l_code string ,
r_code string ,
elt_time timestamp
) with (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://172.26.136.211:9030/tmp’,
‘table-name’ = ‘bu_order’,
‘sink.buffer-flush.max-rows’ = ‘100’,
‘sink.buffer-flush.interval’ = ‘1s’,
‘lookup.cache.max-rows’ = ‘10000’,
‘lookup.cache.ttl’ = ‘300s’,
‘username’ = ‘root’,
‘password’ = ‘root123!m’
)

insert into tmp.bu_order_doris
select
l.in_store_id as l_id ,
l.in_store_code as l_code,
r.in_store_d_id as r_code,
LOCALTIMESTAMP
from dg_ve.t_ve_bu_in_store_order_kafka l
left join dg_ve.t_ve_bu_in_store_order_d_kafka r
on l.in_store_id=r.in_store_id

dg_ve.t_ve_bu_in_store_order_kafka和
dg_ve.t_ve_bu_in_store_order_d_kafka是在flinksql的环境中建立kafka的数据源
tmp.bu_order_doris是flinksql环境中对starocks的主键表tmp.bu_order的映射,建表语句如下

CREATE TABLE bu_order (
l_id varchar(65533) NOT NULL COMMENT “”,
l_code varchar(65533) NULL COMMENT “”,
r_code varchar(65533) NULL COMMENT “”,
elt_time datetime NULL COMMENT “”
) ENGINE=OLAP
PRIMARY KEY(l_id)
COMMENT “OLAP”
DISTRIBUTED BY HASH(l_id) BUCKETS 10
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”
)

flink的版本:1.13.5
连接器的版本:flink-connector-starrocks-1.2.4_flink-1.13_2.11.jar

麻烦再发下sink的配置,另外有确认过数据源没有脏数据吧?带/n的这种

create table tmp.bu_order_doris (
l_id string ,
l_code string ,
r_code string ,
elt_time timestamp
) with (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://172.26.136.211:9030/tmp’,
‘table-name’ = ‘bu_order’,
‘sink.buffer-flush.max-rows’ = ‘100’,
‘sink.buffer-flush.interval’ = ‘1s’,
‘lookup.cache.max-rows’ = ‘10000’,
‘lookup.cache.ttl’ = ‘300s’,
‘username’ = ‘root’,
‘password’ = ‘root123!m’
)
这个表就是我的sink表

dg_ve.t_ve_bu_in_store_order_kafka表的测试数据如下
{“in_store_id”:“1”,“in_store_code”:“L1”}
{“in_store_id”:“2”,“in_store_code”:“L2”}
{“in_store_id”:“3”,“in_store_code”:“L3”}
{“in_store_id”:“4”,“in_store_code”:“L4”}
{“in_store_id”:“5”,“in_store_code”:“L5”}

dg_ve.t_ve_bu_in_store_order_d_kafka表的测试数据如下
{“in_store_id”:“1”,“in_store_d_id”:“R1”}
{“in_store_id”:“1”,“in_store_d_id”:“R2”}
{“in_store_id”:“2”,“in_store_d_id”:“R2”}
{“in_store_id”:“2”,“in_store_d_id”:“R2”}
{“in_store_id”:“3”,“in_store_d_id”:“R3”}
{“in_store_id”:“4”,“in_store_d_id”:“R4”}
{“in_store_id”:“5”,“in_store_d_id”:“R5”}

写错了
create table tmp.bu_order_doris (
l_id string ,
l_code string ,
r_code string ,
elt_time timestamp ,

PRIMARY KEY (l_id) NOT ENFORCED
) with (
‘connector’ = ‘starrocks’,
‘jdbc-url’ = ‘jdbc:mysql://172.26.136.211:9030’,
‘load-url’ = ‘172.26.136.211:8030’,
‘database-name’ = ‘tmp’,
‘table-name’ = ‘bu_order’,
‘username’ = ‘root’,
‘password’ = ‘root123!m’,
‘sink.connect.timeout-ms’ = ‘10000’ ,
‘sink.buffer-flush.max-rows’ = ‘64000’ ,
‘sink.buffer-flush.max-bytes’ = ‘300000000’ ,
‘sink.buffer-flush.interval-ms’ = ‘2000’ ,
‘sink.properties.column_separator’ = ‘\x01’ ,
‘sink.properties.row_delimiter’ = ‘\x02’ ,
‘sink.max-retries’ = ‘3’
)
复制错了,这个才是我的sink表,就是在flinksql中两张数据源是kafka的表进行join写入starrocks的主键表的场景

通过mysql client执行select * from bu_order结果也是这样吗?

另外可以把sink改成print确认一下进sr前的数据是什么样的

打印的方式输出如下
左流,kafak主题:a_test_dg_ve__t_ve_bu_in_store_order写入数据{“in_store_id”:“1”,“in_store_code”:“L1”}
idea控制台输出:+I[1, L1, null, 2023-02-13T11:31:03.020]
右流,kafak主题:a_test_dg_ve__t_ve_bu_in_store_order_d写入数据{“in_store_id”:“1”,“in_store_d_id”:“R1”}
idea控制台输出:-D[1, L1, null, 2023-02-13T11:31:56.410],+I[1, L1, R1, 2023-02-13T11:31:56.411]

哪个测试机器现在有点问题,我等会再回复您,这个是我以前将数据导出来的样子

第一次插入l_id=2的数据会有\n,我再插入右流的数据的时候,数据补全了,同时\n2变成2,
但是我再一次左流l_id=2的时候,表中会多出l_id=\n2的数据来

收到,我本地复现下

这是个bug,配置了sink.properties.row_delimiter会有问题,你的场景下不能用换行符作为row_delimiter吗

请问这段代码有遇到过connect processor exception because
java.io.IOException: Connection reset by peer的报错吗,怎么解决呀

2.4.1版本 fe的日志 一直有Connection reset by peer 日志错误是为什么 报错参考

我最近项目也遇到了一样的问题,这个怎么解决的??

https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/186 可以打上这个补丁,或是等flink connector的新版本。