flink-sink

【详述】
用table的方式将日志数据导入到sr里,字段有日志时间(datetime类型),ip(string类型),info(string类型)等等总共7个字段,但是导入的时候,总是报错说是少字段,看了下日志,应该是把我的数据转换成了\t分隔符的数据,但是日志信息大概率有\t,这样是不是会有问题呢?

改用streaming的方式导入数据要先将数据转换成json格式,导入时会爆出超出65535长度错误,但其实数据量并没有那么大。

官网还有个streaming方式,sink里面传3个参数的方法,第三个参数官网提供的java lambda表达式传参,那么scala应该怎么传呢?希望官方能给一个详细的示例,包括scala的方法谢谢

现在这两种方式导入数据都有问题,请问有什么方法可以解决么?

flink整合这块给的包用着还是有点不是很好用,想过用jdbc的方式,但是看官网说jdbc是要经过fe的可能效率会低很多。sr的查询算法上速度很快,如果数据很难进入的话,再快的算法我也用不上呀~感谢sr,希望能越来越好

【背景】用flink-connector-starrocks导入sr遇到的问题
【业务影响】
【StarRocks版本】例如:2.0.1

列表条目

【集群规模】3fe(1 follower+2observer)+3be(fe与be混部)
【机器信息】8核32G
【附件】
{“streamLoadErrorLog”:"Reason: column count mismatch, expect=7 real=3. src line:。。。。。。。

看了一下,底层用的还是转成了streamload,而streamload的默认分隔符是\t 和 \n,我将日志数据的\t\n都转换了别的,目前来说没什么问题了,但是这样改变了原有数据该有的模样。。。

.withProperty("sink.properties.column_separator", "\\x01")
            .withProperty("sink.properties.row_delimiter", "\\x02")

增加这俩配置指定为其他分隔符就可以了,不用改数据源。

我改了,报错,Reason: column count mismatch, expect=7 real=1. src line:
我看了一下,是把每个字段都变成了一行数据,很难受

sink的配置贴下看看

这个是现在的配置,走的默认
tEnv.executeSql(
“”"
|CREATE TABLE catalina(
| logstime timestamp,
| hostname string,
| info string,
| intime timestamp,
| path string,
| ip string,
| id bigint
|) WITH (
| ‘connector’ = ‘starrocks’,
| ‘jdbc-url’ = ‘jdbc:mysql://192.168.131.59:9030,192.168.131.60:9030,192.168.131.47:9030’,
| ‘load-url’ = ‘192.168.131.59:8030;192.168.131.60:8030;192.168.131.47:8030’,
| ‘database-name’ = ‘logs’,
| ‘table-name’ = ‘catalina’,
| ‘username’ = ‘root’,
| ‘password’ = ‘’,
| ‘sink.buffer-flush.max-rows’ = ‘1000000’,
| ‘sink.buffer-flush.interval-ms’ = ‘15000’
|)
|""".stripMargin
)

这个是加了指定分隔符的,就把每个字段分割成一行了。
tEnv.executeSql(
“”"
|CREATE TABLE catalina(
| logstime timestamp,
| hostname string,
| info string,
| intime timestamp,
| path string,
| ip string,
| id bigint
|) WITH (
| ‘connector’ = ‘starrocks’,
| ‘jdbc-url’ = ‘jdbc:mysql://192.168.131.59:9030,192.168.131.60:9030,192.168.131.47:9030’,
| ‘load-url’ = ‘192.168.131.59:8030;192.168.131.60:8030;192.168.131.47:8030’,
| ‘database-name’ = ‘logs’,
| ‘table-name’ = ‘catalina’,
| ‘username’ = ‘root’,
| ‘password’ = ‘’,
| ‘sink.buffer-flush.max-rows’ = ‘1000000’,
| ‘sink.buffer-flush.interval-ms’ = ‘15000’,
| ‘sink.properties.column_separator’ = ‘\x01’,
| ‘sink.properties.row_delimiter’ = ‘\x02’
|)
|""".stripMargin
)

也有可能是数据中包含\x01或\x02
这种情况就用这个配置吧:

sink.properties.format='json',
sink.properties.strip_outer_array='true',

数据源不需要更改。

如果像你说的会报超长限制,那是因为sr这边的长度是字节长度,而source的长度一般指的是字符长度导致的。

好的,我这边再研究研究,感谢回复

flink-sink运行一段时间会 报错, at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“already stopped, skip waiting for close. cancelled/!eos: : 1/0”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“5ed5afc8-b73f-4acf-bb68-0aaae5aba177”,“LoadBytes”:11234,“StreamLoadPutTimeMs”:0,“NumberTotalRows”:0,“WriteDataTimeMs”:261,“TxnId”:7735925,“LoadTimeMs”:262,“ReadDataTimeMs”:0,“NumberLoadedRows”:0,“NumberFilteredRows”:0}
{}
重启就好了请问是什么原因呀

flink-sink运行一段时间会 报错, at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“already stopped, skip waiting for close. cancelled/!eos: : 1/0”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“5ed5afc8-b73f-4acf-bb68-0aaae5aba177”,“LoadBytes”:11234,“StreamLoadPutTimeMs”:0,“NumberTotalRows”:0,“WriteDataTimeMs”:261,“TxnId”:7735925,“LoadTimeMs”:262,“ReadDataTimeMs”:0,“NumberLoadedRows”:0,“NumberFilteredRows”:0}
{}
重启就好了请问是什么原因呀

这个是be响应的错误,需要看下be的日志具体是什么问题,大概率是导入频率过高导致的。