starrocks array字段写入问题

请问下,使用java 写 starRocks ,如果starrocks 表的字段有array 类型,使用Java 应该用哪种类型对应?

问题背景,官网自带的 flink-connector-starrocks 不支持 array 类型写 starRocks ,我这边修改 connector 源码部分,发现直接这种事不生效的,后面我又把 record.getArray(pos) 转换为 arraylist 也不行
企业微信截图_db707f87-c0b2-463c-9b6e-e6aacfb4247b

如果没有设置sink.properties.format = json的话,不需要修改 flink-connector-starrocks。
在flink schema 中定义对应字段为string, java 中传入对应字段值 “[1,2,3]” 这样即可

sink.properties.format = json ,这个是 sink 端的设置,kafka 中的 json 有字段本身为 arraylist 和 json 对象;实际如下:

create table t_kafka_source (
_id STRING,
activity ARRAY,
address STRING,
address_city STRING,
app_client_id STRING
) WITH (
‘properties.bootstrap.servers’=’**:9092’,
‘properties.group.id’=‘udtfetlmaicaigroup’,
‘scan.startup.mode’=‘latest-offset’,
‘scan.topic-partition-discovery.interval’ = ‘30s’,
‘topic’='
’,
‘connector’=‘kafka’,
‘format’=‘json’,
‘json.fail-on-missing-field’ = ‘false’,
‘json.ignore-parse-errors’ = ‘true’
);

create table t_kafka_sink (
_id STRING,
activity ARRAY,
address STRING,
address_city STRING,
app_client_id STRING,
intime TIMESTAMP
) WITH (

        'connector'='starrocks',
        'jdbc-url'='jdbc:mysql://***9030?ods_db',
        'load-url'='****:8030',
        'database-name'='ods_db',
        'table-name'='MaicaiOrderDetail',
        'username'='***',
        'password'='****',
        'sink.buffer-flush.max-rows'='1000000',
        'sink.buffer-flush.max-bytes'='300000000',
        'sink.buffer-flush.interval-ms'='300000',
        'sink.properties.column_separator'='\\x01',
        'sink.properties.row_delimiter'='\\x02',
        'sink.parallelism'='2'

);

INSERT INTO t_kafka_sink
SELECT _id, activity, address, address_city, app_client_id
,CURRENT_TIMESTAMP as intime
FROM t_kafka_source ;

starrocks 建表时 , activity 为 array 类型,这种写法,写入 array 字段就为 null

flink 直接使用 string 处理 没问题,不从 json 里解析出来,当作 string 处理

flink sql array类型这种需要等待connector做下兼容。

看这种使用 ,就是 拼成 “[1,2,3]” 这种字符串处理 ?就像我问题描述的那样,
case array:
return 需要返回拼接的字符串

如果format是json的话不可以这样处理,需要array而不是string

sink 端为format json时,array 是不是就是采用 arraylist 这种对应 ?

是的,
format = json 需要 arraylist
format = csv 需要 string

1赞

好的 :ok_hand: get 了,如果是 嵌套的 子 json 就是直接当作 string 处理吧 ?kafka 里的某个字段的 value 是 json

已经加入了对 array, map, row 类型的转换适配,可以尝试拉取最新代码,看下是否能够满足需求。
https://github.com/StarRocks/flink-connector-starrocks

请问下,如果flinksql 的 sink 端 数据 输出为。[0.0, , 0.0, , 0.0, 0.00, 2021-11-05T14:52:03.436],中间有的没有值 直接是 ,分开的,这种写入 starrocks为啥报错了,报错的信息是 Reason: column count mismatch, expect=166 real=15. src line:

这个参数配置下为",",默认分隔符是"\t",sink.properties.column_separator

改为

‘sink.properties.format’ = ‘json’,
‘sink.properties.strip_outer_array’ = ‘true’,

试下吧,现在csv格式的array类型导入可能还需要完善。

flinksql 默认的是 逗号分隔, ‘sink.properties.column_separator’=’\x01’,
‘sink.properties.row_delimiter’=’\x02’, 我加上这两个参数,直接打成一列了

{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“The size of this batch exceed the max size [104857600] of json type data data [ 296503428 ]”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“7f7d4830-d46a-4eb3-8081-39da78246c81”,“LoadBytes”:0,“StreamLoadPutTimeMs”:0,“NumberTotalRows”:0,“WriteDataTimeMs”:0,“TxnId”:-1,“LoadTimeMs”:0,“ReadDataTimeMs”:0,“NumberLoadedRows”:0,“NumberFilteredRows”:0}

这个是需要再添加两个大小的限制参数吧

sink.buffer-flush.max-bytes 去掉用默认的就好了,starrocks只支持100m单次json格式的导入

有考虑即将出的2.0版本放开starrocks单次json最大100m的这个限制吗? :persevere:

2.0会进行优化