【详述】请教大佬们一个问题,flink sql支持导入MAP类型数据到SR么,测试了json键值对的格式是无法正常导入的。看文档是支持insert into和broker load方式导入,stream load方式目前还不能支持么?
这个是支持的 您是什么版本
3.1,json数据,如何导入到Map结构里呢,有demo吗
大佬可以帮忙解答一下吗,导入"map_col": {“a”:“ss”,“b”:“bb”}这种格式数据的时候报这个错,
responseBody: {
“Status”: “INTERNAL_ERROR”,
“Message”: “Not support cast VARCHAR(1048576) to MAP<VARCHAR(65533), VARCHAR(65533)>.”
}
Flink SQL任务简化如下
CREATE TABLE source_table(
map_col
String,
) COMMENT ‘’ with (
‘connector’ = ‘kafka’,
‘format’=‘debezium-json’,
‘debezium-json.ignore-parse-errors’=‘true’
);
CREATE TABLE sink_table(
map_col
String,
) COMMENT ‘olap生成’ with (
‘sink.properties.format’ = ‘json’,
‘sink.properties.strip_outer_array’ = ‘true’,
‘sink.properties.ignore_json_size’ = ‘true’,
‘sink.properties.strict_mode’ = ‘true’
);
insert into
sink_table
select
STR_TO_MAP(map_col) as map_col
from
source_table;
https://docs.starrocks.io/zh/docs/sql-reference/sql-statements/data-types/Map/ 您可以参考下这个手册 用insert 或者broker load的方式 导入 map结构的数据
目前map结构还是无法支持streamload导入吗,
我尝试过"fields":{“name”:“Alice”}
“fields”:"{“name”:“Alice”}"
columns tmp_fields=fields,fields=json_parse(tmp_fields)
columns fields=json_parse(fields)
str_to_map也不行
报错:
“Message”: “Not support cast VARCHAR(1048576) to MAP<VARCHAR(65533), VARCHAR(65533)>.”
“Message”: “Mapping column is not in table. column: tmp_fields”
“Message”: “unknown reference column, column=fields, reference=fields”
我使用的 spark connector,版本是:starrocks-spark-connector-3.5_2.12-1.1.2.jar。
从 hive 导入 starrocks,hive 有 map 类型的字段,这是 schema
DataFrame schema: root
|-- event_time: timestamp (nullable = true)
|-- id: integer (nullable = true)
|-- map_varchar_varchar: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
报错:
StarRocksWriter: Failed to write to StarRocks table: test_db.test_map
java.lang.UnsupportedOperationException: Unsupported starrocks type, column name: map_varchar_varchar, data type: map
at com.starrocks.connector.spark.sql.schema.InferSchema.inferDataType(InferSchema.java:131)
at com.starrocks.connector.spark.sql.schema.InferSchema.inferStructField(InferSchema.java:96)
at com.starrocks.connector.spark.sql.schema.InferSchema.inferSchema(InferSchema.java:75)
at com.starrocks.connector.spark.sql.StarRocksTableProvider.inferSchema(StarRocksTableProvider.java:65)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:90)
at org.apache.spark.sql.DataFrameWriter.getTable$1(DataFrameWriter.scala:284)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:300)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)