请问下,使用java 写 starRocks ,如果starrocks 表的字段有array 类型,使用Java 应该用哪种类型对应?
问题背景,官网自带的 flink-connector-starrocks 不支持 array 类型写 starRocks ,我这边修改 connector 源码部分,发现直接这种事不生效的,后面我又把 record.getArray(pos) 转换为 arraylist 也不行
请问下,使用java 写 starRocks ,如果starrocks 表的字段有array 类型,使用Java 应该用哪种类型对应?
问题背景,官网自带的 flink-connector-starrocks 不支持 array 类型写 starRocks ,我这边修改 connector 源码部分,发现直接这种事不生效的,后面我又把 record.getArray(pos) 转换为 arraylist 也不行
如果没有设置sink.properties.format = json的话,不需要修改 flink-connector-starrocks。
在flink schema 中定义对应字段为string, java 中传入对应字段值 “[1,2,3]” 这样即可
sink.properties.format = json ,这个是 sink 端的设置,kafka 中的 json 有字段本身为 arraylist 和 json 对象;实际如下:
create table t_kafka_source (
_id STRING,
activity ARRAY,
address STRING,
address_city STRING,
app_client_id STRING
) WITH (
‘properties.bootstrap.servers’=’**:9092’,
‘properties.group.id’=‘udtfetlmaicaigroup’,
‘scan.startup.mode’=‘latest-offset’,
‘scan.topic-partition-discovery.interval’ = ‘30s’,
‘topic’='’,
‘connector’=‘kafka’,
‘format’=‘json’,
‘json.fail-on-missing-field’ = ‘false’,
‘json.ignore-parse-errors’ = ‘true’
);
create table t_kafka_sink (
_id STRING,
activity ARRAY,
address STRING,
address_city STRING,
app_client_id STRING,
intime TIMESTAMP
) WITH (
'connector'='starrocks',
'jdbc-url'='jdbc:mysql://***9030?ods_db',
'load-url'='****:8030',
'database-name'='ods_db',
'table-name'='MaicaiOrderDetail',
'username'='***',
'password'='****',
'sink.buffer-flush.max-rows'='1000000',
'sink.buffer-flush.max-bytes'='300000000',
'sink.buffer-flush.interval-ms'='300000',
'sink.properties.column_separator'='\\x01',
'sink.properties.row_delimiter'='\\x02',
'sink.parallelism'='2'
);
INSERT INTO t_kafka_sink
SELECT _id, activity, address, address_city, app_client_id
,CURRENT_TIMESTAMP as intime
FROM t_kafka_source ;
starrocks 建表时 , activity 为 array 类型,这种写法,写入 array 字段就为 null
flink 直接使用 string 处理 没问题,不从 json 里解析出来,当作 string 处理
flink sql array类型这种需要等待connector做下兼容。
看这种使用 ,就是 拼成 “[1,2,3]” 这种字符串处理 ?就像我问题描述的那样,
case array:
return 需要返回拼接的字符串
如果format是json的话不可以这样处理,需要array而不是string
sink 端为format json时,array 是不是就是采用 arraylist 这种对应 ?
是的,
format = json 需要 arraylist
format = csv 需要 string
好的 get 了,如果是 嵌套的 子 json 就是直接当作 string 处理吧 ?kafka 里的某个字段的 value 是 json
已经加入了对 array, map, row 类型的转换适配,可以尝试拉取最新代码,看下是否能够满足需求。
https://github.com/StarRocks/flink-connector-starrocks
请问下,如果flinksql 的 sink 端 数据 输出为。[0.0, , 0.0, , 0.0, 0.00, 2021-11-05T14:52:03.436],中间有的没有值 直接是 ,分开的,这种写入 starrocks为啥报错了,报错的信息是 Reason: column count mismatch, expect=166 real=15. src line:
这个参数配置下为",",默认分隔符是"\t",sink.properties.column_separator
改为
‘sink.properties.format’ = ‘json’,
‘sink.properties.strip_outer_array’ = ‘true’,
试下吧,现在csv格式的array类型导入可能还需要完善。
flinksql 默认的是 逗号分隔, ‘sink.properties.column_separator’=’\x01’,
‘sink.properties.row_delimiter’=’\x02’, 我加上这两个参数,直接打成一列了
{“Status”:“Fail”,“BeginTxnTimeMs”:0,“Message”:“The size of this batch exceed the max size [104857600] of json type data data [ 296503428 ]”,“NumberUnselectedRows”:0,“CommitAndPublishTimeMs”:0,“Label”:“7f7d4830-d46a-4eb3-8081-39da78246c81”,“LoadBytes”:0,“StreamLoadPutTimeMs”:0,“NumberTotalRows”:0,“WriteDataTimeMs”:0,“TxnId”:-1,“LoadTimeMs”:0,“ReadDataTimeMs”:0,“NumberLoadedRows”:0,“NumberFilteredRows”:0}
这个是需要再添加两个大小的限制参数吧
sink.buffer-flush.max-bytes 去掉用默认的就好了,starrocks只支持100m单次json格式的导入
有考虑即将出的2.0版本放开starrocks单次json最大100m的这个限制吗?
2.0会进行优化