用datax导入json类型和数据类型遇到的问题

【详述】
在starrocks中建表字段涉及了json类型和数组类型的字段,且像通过datax实现数据从postgresql导出然后导入到starrocks,但由于datax还不支持 json类型和数组类型的数据,所以需要在从postgresql导出时将 json类型和数组类型数据转为varchar类型。但是在将varchar类型导入到starrocks对应的 json类型和数组类型字段时则表示无法导入。
请问能否在导入时将字符串类型的数据转为json类型和数组类型的数据,来解决这个问题?

datax的json配置文件内容如下:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader", 
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [ "jdbc:postgresql://10.0.0.2:6543/db_name"], 
                                 "querySql": [
                                    "select c_bh,j_ry::varchar,c_zw::varchar from public.t_test ;"
                                ]
                            }
                        ], 
                        "password": "*****", 
                        "username": "user"
                    }
                }, 
                "writer": {
                    "name": "starrockswriter", 
                    "parameter": {
                        "column": ["c_bh","j_ry","c_zw"], 
                        "database": "star", 
                        "jdbcUrl": "jdbc:mysql://10.0.0.1:9230/", 
                        "loadUrl": ["10.0.0.1:8230"], 
                        "password": "root", 
                        "postSql": [], 
                        "preSql": ["truncate table star.t_test; "], 
                        "table": "t_test", 
                        "username": "root",
                        "flushInterval": 20000,
                        "loadProps": { "format": "json",
                                       "ignore_json_size": true,
                                       "strip_outer_array": true,
                                       "strict_mode": true,
                                       "jsonpaths":["$.c_bh","$.j_ry","$.c_zw"] ,
                                       "columns":"c_bh,j_ry,c_zw"}
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "byte":10485760,
                "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        }
    }
}

starrocks目标表建表语句如下:

CREATE TABLE star.t_test (
  `c_bh` varchar(100) NOT NULL COMMENT "",
  `j_ry` JSON REPLACE NULL COMMENT "",
  `c_zw`  ARRAY<varchar(300)> REPLACE NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`c_bh`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c_bh`) BUCKETS 9 
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

执行导出导入时报错信息如下:

2022-11-16 16:58:18.034 [Thread-1] WARN  StarRocksWriterManager - Failed to flush batch data to StarRocks, retry times = 0
java.io.IOException: Failed to flush data to StarRocks.
{"Status":"Fail","BeginTxnTimeMs":0,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"c2840dfc-e421-4a3d-bb31-47e53d5ae11d","LoadBytes":697173,"StreamLoadPutTimeMs":1,"NumberTotalRows":1000,"WriteDataTimeMs":155,"TxnId":670044,"LoadTimeMs":158,"ErrorURL":"http://10.0.0.1:8240/api/_load_error_log?file=error_log_14497fceed9fb759_dcbc5f5bd2e46d83","ReadDataTimeMs":1,"NumberLoadedRows":576,"NumberFilteredRows":424}
        at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:74) ~[starrockswriter-release.jar:na]
        at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager.asyncFlush(StarRocksWriterManager.java:166) [starrockswriter-release.jar:na]
        at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager.access$000(StarRocksWriterManager.java:21) [starrockswriter-release.jar:na]
        at com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager$1.run(StarRocksWriterManager.java:137) [starrockswriter-release.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_211]

其中 http://10.0.0.1:8240/api/_load_error_log?file=error_log_14497fceed9fb759_dcbc5f5bd2e46d83 的部分内容如下:

Error: Invalid argument: Failed to parse value as array, column=c_zw
/root/starrocks/be/src/exec/vectorized/json_scanner.cpp:567 _construct_column(val, column, _slot_descs[i]->type(), _slot_descs[i]->col_name()). Row: {"c_bh":"77af8f9d641e49a7a6781b6c5313e4e9","j_ry":"{\"dsr\": [{\"bh\": \"e7676d582b94449d9c7deee56a878fa8\", \"mc\": \"几验\", \"lxdh\": \"lxfsnr\", \"ssdw\": [\"1\"], \"tssf\": [\"1\", \"2\"]}, {\"bh\": \"978a954ce7f142d6b706d3904eb1ab5a\", \"mc\": \"很马办\", \"lxdh\": \"lxfsnr2\", \"ssdw\": [\"1\"], \"tssf\": [\"1\", \"17\"]}, {\"bh\": \"8908e0d23ad3484f9fdb8b3c2255f1c3\", \"mc\": \"吃越全风直他两力五花\", \"lxdh\": \"lxfsnr\", \"ssdw\": [\"1\"], \"tssf\": [\"1\", \"2\"]}, {\"bh\": \"508cacf5578c4a1caf5648b2bb27d4e6\", \"mc\": \"下历\", \"lxdh\": \"lxfsnr\", \"ssdw\": [\"1\"], \"tssf\": [\"1\", \"2\"]}], \"dsrmc\": \"叶丞仉气朽谢邵申发危阳\"}","c_zw":"{8}"}

【业务影响】全量数据更新
【StarRocks版本】2.3.3
【集群规模】2fe(1 follower+1observer)+3be(fe与be混部)
【机器信息】CPU虚拟核/内存/网卡,48C/48G/千兆
【表模型】主键模型
【导入或者导出方式】DATAX

我理解这个starrocks端应该要写成为varchar类型?

但是我的目的是要存储为json类型 或 数组类型 的,由于datax不支持这两种类型,所以在导出的时候才转成了varchar。
所以想问问starrocks有什么方式能在导入时将varchar类型转换为 json类型 或 数组类型

这个问题解决了,可以分别通过函数 PARSE_JSON() 和 cast(‘[1]’as ARRAY) 再结合 导入衍生列 的方式将数据转为相应的类型导入进去(不过需要是在版本2.4.1的时候可以,版本2.3.3的 cast(‘[1]’as ARRAY) 会报错
修改后的datax任务json文件内容如下:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader", 
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [ "jdbc:postgresql://10.0.0.2:6543/db_name"], 
                                 "querySql": [
                                    "select c_bh,j_ry::varchar,replace(replace(c_zw::text,'{','['),'}',']') from public.t_test ;"
                                ]
                            }
                        ], 
                        "password": "**********", 
                        "username": "user"
                    }
                }, 
                "writer": {
                    "name": "starrockswriter", 
                    "parameter": {
                        "column": ["c_bh","j_ry_tmp","c_zw_tmp"], 
                        "database": "star", 
                        "jdbcUrl": "jdbc:mysql://10.0.0.1:9330/", 
                        "loadUrl": ["10.0.0.1:8330"], 
                        "password": "root", 
                        "postSql": [], 
                        "preSql": ["truncate table star.t_test; "], 
                        "table": "t_test", 
                        "username": "root",
                        "flushInterval": 20000,
                        "loadProps": { "format": "json",
                                       "ignore_json_size": true,
                                       "strip_outer_array": true,
                                       "strict_mode": true,
                                       "jsonpaths":["$.c_bh","$.j_ry_tmp","$.c_zw_tmp"] ,
                                       "columns":"c_bh,j_ry_tmp,c_zw_tmp, j_ry=PARSE_JSON(j_ry_tmp),c_zw= cast(c_zw_tmp as ARRAY<varchar>)"}
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "byte":10485760,
                "channel": 1
            },
            "errorLimit": {
                "record": 1,
                "percentage": 0.02
            }
        }
    }
}
1赞