stream load工具 使用文档
利用StarRocks的stream load导入数据一直是一个比较方便快捷的方式,但是在Windows环境下,还无法像在Linux环境下直接使用curl命令方便的导入数据到对应的库表中。该工具 **streamLoad **提供了这一功能,能够让用户像在Linux的环境下使用curl进行数据导入一样导入数据。
该工具支持Linux和Windows
win_streamLoad.exe (8.5 MB)
linux的只需要将最后的代码在Linux环境下编译即可
使用示例
准备工作
这里我的对应的StarRocks的连接为:
参数名 | 参数值 |
---|---|
host | 192.168.110.11 |
fe query port | 9030 |
be http port | 18040 |
database | example_db |
table | test001 |
user | root |
password |
host:192.168.110.11
fe query port: 9030
be http port : 18040
user: root
password:
database: example_db
table: test001
创建表
CREATE TABLE `test001` (
`recruit_date` date NOT NULL COMMENT "yyyy-mm-dd",
`region_num` tinyint(4) NULL COMMENT "range [-128, 127]",
`id` bigint(20) NULL COMMENT "range [-2^63 + 1 ~ 2^63 - 1]",
`income` double NULL COMMENT "8 bytes"
) ENGINE=OLAP
DUPLICATE KEY(`recruit_date`, `region_num`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`recruit_date`, `region_num`) BUCKETS 8
PROPERTIES (
"replication_num" = "3",
"bloom_filter_columns" = "id",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
准备数据文件
-
csv 文件
2022-10-11 11 88 44 2022-10-12 88 77 66
文件名为:test.txt
-
json 文件
{ "recruit_date":"2022-10-11","region_num":"66","id":"77","income":"88.0" }
文件名为:test.json
下载工具
配置导入参数
-
配置文件参数解释
配置文件格式要求为JSON文件
名称要求为:streamLoad.json
配置参数力求和stream load的保持一致
{ "label": "test012", "url": "http://192.168.110.11:18040/api/example_db/test001/_stream_load", "user": "root", "password": "", "file_path": "./test.txt", "format": "csv", "column_separator": "\t", "row_delimiter": "", "partitions": "", "columns": "", "max_filter_ratio": "0", "timeout": "600", "strict_mode": "false", "timezone": "Asia/Shanghai", "load_mem_limit": "", "strip_outer_array": "false", "json_root": "", "ignore_json_size": "true", "jsonpaths": "" }
配置文件使用准则:
对于必填项,需要根据实际情况调整。
对于非必填项,参数保留,值需要根据实际情况调整即可,如没有特殊需求,使用配置文件中默认的配置即可,只需要修改必填项即可。
参数名称 是否必选 参数解释 label 否 当前导入任务的label,默认会使用随机的bigint充当本次当如的label,用户指定时,使用用户指定的值 url 必填 当前导入的http的url,填写时需要注意port使用be的http port,直接使用fe的会有重定向问题 user 必填 导入任务的用户名 password 必填 导入任务使用的用户对应的密码 file_path 必填 导入数据时,数据文件所在的路径 format 必填 指定待导入数据的格式。取值包括 CSV
和JSON
。column_separator 否 用于指定待导入数据文件中的列分隔符,如果填写为空字符串,则默认使用\t row_delimiter 否 用于指定待导入数据文件中的行分隔符,如果填写为空字符串,则默认使用\n partitions 否 指定要把数据导入哪些分区,如果填写为空字符串,则默认导入到 StarRocks 表所在的所有分区中 columns 否 指定待导入数据文件和 StarRocks 表之间的列对应关系,如果填写为空字符串,则代表导入数据文件中的列与 StarRocks 表中的列按顺序一一对应 max_filter_ratio 否 用于指定导入作业的最大容错率,取值范围: 0
~1
,如果填写为**“0”**,当导入的数据行中有错误时,导入作业会失败,如果希望忽略错误的数据行,可以设置该参数的取值大于0
。这样的话,即使导入的数据行中有错误,导入作业也能成功。timeout 否 用于导入作业的超时时间,取值范围:1 ~ 259200。 strict_mode 否 用于指定是否开启严格模式。取值范围: true
和false
。timezone 否 用于指定导入作业所使用的时区。 load_mem_limit 否 导入作业的内存限制,最大不超过 BE 的内存限制。单位:字节。默认内存限制为 2 GB。 strip_outer_array 否 用于指定是否裁剪最外面的 array
含义。取值范围:true
和false
。默认值:false
。true
表示 JSON 格式文件中的数据是以数组形式表示的。如果待导入数据文件中最外层有一对表示 JSON 数组的中括号 ([]
),则一般情况下需要指定该参数取值为true
,这样中括号 ([]
) 中每一个数组元素都作为单独的一行数据行进行导入;否则,StarRocks 会将整个文件数据(即,整个 JSON 数组)作为一行数据导入。例如,JSON 格式的数据为[ {"k1" : 1, "v1" : 2}, {"k1" : 3, "v1" : 4} ]
,如果指定该参数取值为true
,则导入到 StarRocks 表中后会生成两行数据。json_root 否 用于指定待导入 JSON 数据的根节点。该参数仅用于通过匹配模式导入 JSON 格式的数据。 json_root
为合法的 JsonPath 字符串。默认值为空,表示会导入整个导入文件的数据。ignore_json_size 否 用于指定是否检查 HTTP 请求中 JSON Body 的大小。说明
HTTP 请求中 JSON Body 的大小默认不能超过 100 MB。如果 JSON Body 的大小超过 100 MB,会提示 “The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead huge memory consuming.” 错误。为避免该报错,可以在 HTTP 请求头中添加"ignore_json_size:true"
设置,忽略对 JSON Body 大小的检查。jsonpaths 否 用于指定待导入的字段的名称。参数值应为 JSON 格式。Stream Load 支持通过如下模式之一来导入 JSON 格式的数据:简单模式和匹配模式。 该参数仅用于通过匹配模式导入 JSON 格式的数据。 *简单模式:不需要设置 jsonpaths
参数。这种模式下,要求 JSON 数据是对象类型,例如{"k1": 1, "k2": 2, "k3": "hello"}
中,k1
、k2
、k3
是字段的名称,按名称直接对应目标 StarRocks 表中的col1
、col2
、col3
三列。 匹配模式:用于 JSON 数据相对复杂、需要通过jsonpaths
参数匹配待导入字段的场景。
创建一个名为streamLoad.json的配置文件,内容如下:
注:目前版本要求配置文件名称固定位streamLoad.json
- JSON数据导入
{
"label": "test012",
"url": "http://192.168.110.11:18040/api/example_db/test001/_stream_load",
"user": "root",
"password": "",
"file_path": "./test.json",
"format": "json",
"column_separator": "\t",
"row_delimiter": "",
"partitions": "",
"columns": "",
"max_filter_ratio": "0",
"timeout": "600",
"strict_mode": "false",
"timezone": "Asia/Shanghai",
"load_mem_limit": "2147483648",
"strip_outer_array": "false",
"json_root": "",
"ignore_json_size": "true",
"jsonpaths": ""
}
-
CSV数据导入
{ "label": "test012", "url": "http://192.168.110.11:18040/api/example_db/test001/_stream_load", "user": "root", "password": "", "file_path": "./test.txt", "format": "csv", "column_separator": "\t", "row_delimiter": "", "partitions": "", "columns": "", "max_filter_ratio": "0", "timeout": "600", "strict_mode": "false", "timezone": "Asia/Shanghai", "load_mem_limit": "2147483648", "strip_outer_array": "false", "json_root": "", "ignore_json_size": "true", "jsonpaths": "" }
配置文件中,csv和json数据导入参数冲突的用户不用处理,程序内部会根据format选择对应的参数。
执行导入
-
Linux
Linux环境下,是一个可执行文件,使用时,需要指定 配置文件所在的文件夹的目录。
例如我的配置文件位于:
/home/go-study/gotest/main/streamLoad.json
则执行命令为:
./streamLoad /home/go-study/gotest/main
执行后会有对应结果返回:
[root@motest main]# ../streamLoad /home/go-study/gotest/main 2022/10/03 14:31:38 { "TxnId": 1058, "Label": "test013", "Status": "Success", "Message": "OK", "NumberTotalRows": 2, "NumberLoadedRows": 2, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 40, "LoadTimeMs": 33, "BeginTxnTimeMs": 0, "StreamLoadPutTimeMs": 0, "ReadDataTimeMs": 0, "WriteDataTimeMs": 15, "CommitAndPublishTimeMs": 16 }
-
Windows
Windows环境下是一个.exe结尾的可执行文件,执行方式也是和Linux保持一致:
例如我的配置文件位于:
D:\代码目录\gotest\main\streamLoad.json
则执行命令为:
PS D:\代码目录\gotest\main>.\streamLoad.exe D:\代码目录\gotest\main 2022/10/12 12:16:45 { "TxnId": 1059, "Label": "test015", "Status": "Success", "Message": "OK", "NumberTotalRows": 1, "NumberLoadedRows": 1, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 80, "LoadTimeMs": 51, "BeginTxnTimeMs": 0, "StreamLoadPutTimeMs": 0, "ReadDataTimeMs": 0, "WriteDataTimeMs": 27, "CommitAndPublishTimeMs": 22 }
实现代码(自己学习go时写的,大佬勿喷)
streamLoad.go (4.5 KB)
package main
import (
"github.com/spf13/viper"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"strings"
)
func main() {
config := viper.New()
config.AddConfigPath(os.Args[1])
config.SetConfigName("streamLoad")
config.SetConfigType("json")
if err := config.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
log.Fatalln("No related configuration file was found")
} else {
log.Fatalln("Configuration file error")
}
}
//init config
configMap := initConfig(config)
request := prepare(configMap)
// add Header
request.Header.Add("expect", "100-continue")
format := configMap["format"]
label := configMap["label"]
if label == "" {
label = string(rand.Int63())
}
request.Header.Add("EXPECT", "100-continue")
request.SetBasicAuth(configMap["user"], "")
request.Header.Add("label", label)
//request.Header.Add("where", configMap["where"])
//request.Header.Add("partitions", configMap["partitions"])
request.Header.Add("max_filter_ratio", configMap["max_filter_ratio"])
request.Header.Add("timeout", configMap["timeout"])
request.Header.Add("strict_mode", configMap["strict_mode"])
request.Header.Add("timezone", configMap["timezone"])
request.Header.Add("load_mem_limit", configMap["load_mem_limit"])
if len(configMap["partitions"]) > 0 {
request.Header.Add("partitions", configMap["partitions"])
}
if len(configMap["columns"]) > 0 {
request.Header.Add("columns", configMap["columns"])
}
switch format {
case "csv":
{
request.Header.Add("column_separator", configMap["column_separator"])
if len(configMap["row_delimiter"]) > 0 {
request.Header.Add("row_delimiter", configMap["row_delimiter"])
}
request.Header.Add("format", format)
}
case "json":
{
request.Header.Add("format", format)
if len(configMap["jsonpaths"]) > 0 {
request.Header.Add("jsonpaths", configMap["jsonpaths"])
}
request.Header.Add("strip_outer_array", configMap["strip_outer_array"])
request.Header.Add("json_root", configMap["json_root"])
request.Header.Add("ignore_json_size", configMap["ignore_json_size"])
}
default:
log.Fatalln("The data file type is not supported ! ")
}
// do
client := http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
response, err := client.Do(request)
if err != nil {
log.Fatalf("The http request fails. The cause is as follows: %s n", err.Error())
}
statusCode := response.StatusCode
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if statusCode != 200 {
log.Fatalf("Stream load failed, statusCode=%d load result=%s", statusCode, body)
}
if err == nil {
log.Println(string(body))
}
}
func prepare(configMap map[string]string) *http.Request {
url := configMap["url"]
filePath := configMap["file_path"]
// var sb string
file, err := os.Open(filePath)
if err != nil {
log.Fatalf("open file failed: %s n", err.Error())
}
defer file.Close()
readAll, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("some error")
}
payload := strings.NewReader(string(readAll))
request, _ := http.NewRequest("PUT", url, payload)
return request
}
func initConfig(config *viper.Viper) map[string]string {
configMap := make(map[string]string)
configMap["label"] = config.GetString("label")
configMap["url"] = config.GetString("url")
configMap["user"] = config.GetString("user")
configMap["pwd"] = config.GetString("password")
configMap["file_path"] = config.GetString("file_path")
configMap["format"] = config.GetString("format")
configMap["column_separator"] = config.GetString("column_separator")
configMap["partitions"] = config.GetString("partitions")
configMap["columns"] = config.GetString("columns")
configMap["row_delimiter"] = config.GetString("row_delimiter")
configMap["partitions"] = config.GetString("partitions")
configMap["jsonpaths"] = config.GetString("jsonpaths")
configMap["strip_outer_array"] = config.GetString("strip_outer_array")
configMap["json_root"] = config.GetString("json_root")
configMap["ignore_json_size"] = config.GetString("ignore_json_size")
configMap["format"] = config.GetString("format")
configMap["where"] = config.GetString("where")
configMap["max_filter_ratio"] = config.GetString("max_filter_ratio")
configMap["timeout"] = config.GetString("timeout")
configMap["strict_mode"] = config.GetString("strict_mode")
configMap["timezone"] = config.GetString("timezone")
configMap["load_mem_limit"] = config.GetString("load_mem_limit")
return configMap
}