Spark Connector(测试版)
Spark Connector 可以支持通过 Spark 读取 StarRocks 中存储的数据,也支持通过Spark写入数据到StarRocks。
- 支持从
StarRocks
中读取数据 - 支持
Spark DataFrame
批量/流式 写入StarRocks
- 可以将
StarRocks
表映射为DataFrame
或者RDD
,推荐使用DataFrame
。 - 支持在
StarRocks
端完成数据过滤,减少数据传输量。
版本兼容
Connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
starrocks-spark2_2.11-1.0.1 | 2.x | 无 | 8 | 2.11 |
特别说明:本工具是个人基于starrocks-spark-connector-1.0的简易改造版本。
下载地址:starrocks-spark2_2.11-1.0.1.jar (9.9 MB)
Maven信息
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark2_2.11</artifactId>
<version>1.0.1</version>
</dependency>
JAR加载
目前该jar包未上传至maven 中央仓库,需要手动加载到自己的项目工程中,命令如下:
mvn install:install-file
-DgroupId=包名
-DartifactId=项目名
-Dversion=版本号
-Dpackaging=jar
-Dfile=jar文件所在路径
例如,假设本地jar在D:\starrocks-spark2_2.11-1.0.1.jar,则加载命令为:
mvn install:install-file -DgroupId=com.starrocks -DartifactId=starrocks-spark2_2.11 -Dversion=1.0.1 -Dpackaging=jar -Dfile=D:\starrocks-spark2_2.11-1.0.1.jar
使用示例
读取
SQL
CREATE TEMPORARY VIEW spark_starrocks
USING starrocks
OPTIONS(
"table.identifier"="$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME",
"fenodes"="$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT",
"user"="$YOUR_STARROCKS_USERNAME",
"password"="$YOUR_STARROCKS_PASSWORD"
);
SELECT * FROM spark_starrocks;
DataFrame
val starrocksSparkDF = spark.read.format("starrocks")
.option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
.option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
.option("user", "$YOUR_STARROCKS_USERNAME")
.option("password", "$YOUR_STARROCKS_PASSWORD")
.load()
starrocksSparkDF.show(5)
RDD
import org.apache.starrocks.spark._
val starrocksSparkRDD = sc.starrocksRDD(
tableIdentifier = Some("$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME"),
cfg = Some(Map(
"starrocks.fenodes" -> "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT",
"starrocks.request.auth.user" -> "$YOUR_STARROCKS_USERNAME",
"starrocks.request.auth.password" -> "$YOUR_STARROCKS_PASSWORD"
))
)
starrocksSparkRDD.collect()
pySpark
starrocksSparkDF = spark.read.format("starrocks")
.option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
.option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
.option("user", "$YOUR_STARROCKS_USERNAME")
.option("password", "$YOUR_STARROCKS_PASSWORD")
.load()
# show 5 lines data
starrocksSparkDF.show(5)
写入
SQL
CREATE TEMPORARY VIEW spark_starrocks
USING starrocks
OPTIONS(
"table.identifier"="$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME",
"fenodes"="$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT",
"benodes"="$YOUR_STARROCKS_BE_HOSTNAME:$YOUR_STARROCKS_BE_HTTP_PORT",
"user"="$YOUR_STARROCKS_USERNAME",
"password"="$YOUR_STARROCKS_PASSWORD"
);
INSERT INTO spark_starrocks VALUES ("VALUE1","VALUE2",...);
# or
INSERT INTO spark_starrocks SELECT * FROM YOUR_TABLE;
DataFrame(batch/stream)
## batch sink
val mockDataDF = List(
(3, "440403001005", "starrocks.com"),
(1, "4404030013005", "starrocks.cn"),
(2, null, "23333.com")
).toDF("id", "sr_code", "sr_name")
mockDataDF.show(5)
mockDataDF.write.format("starrocks")
.option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
.option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
.option("starrocks.benodes", "$YOUR_STARROCKS_BE_HOSTNAME:$YOUR_STARROCKS_BE_HTTP_PORT")
.option("user", "$YOUR_STARROCKS_USERNAME")
.option("password", "$YOUR_STARROCKS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("starrocks.write.fields","$YOUR_FIELDS_TO_WRITE")
.save()
## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.format("kafka")
.load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.writeStream
.format("starrocks")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
.option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
.option("starrocks.benodes", "$YOUR_STARROCKS_BE_HOSTNAME:$YOUR_STARROCKS_BE_HTTP_PORT")
.option("user", "$YOUR_STARROCKS_USERNAME")
.option("password", "$YOUR_STARROCKS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("starrocks.write.fields","$YOUR_FIELDS_TO_WRITE")
.start()
.awaitTermination()
配置
通用配置项
Key | Default Value | Comment |
---|---|---|
starrocks.fenodes | – | starrocks FE http 地址,支持多个地址,使用逗号分隔 |
starrocks.benodes | – | starrocks BE http 地址,支持多个地址,使用逗号分隔 |
starrocks.table.identifier | – | starrocks 表名,如:db1.tbl1 |
starrocks.request.retries | 3 | 向starrocks发送请求的重试次数 |
starrocks.request.connect.timeout.ms | 30000 | 向starrocks发送请求的连接超时时间 |
starrocks.request.read.timeout.ms | 30000 | 向starrocks发送请求的读取超时时间 |
starrocks.request.query.timeout.s | 3600 | 查询starrocks的超时时间,默认值为1小时,-1表示无超时限制 |
starrocks.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的starrocks Tablet个数。 此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对starrocks造成更大的压力。 |
starrocks.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与starrocks之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
starrocks.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
starrocks.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-starrocks-connector迭代所需的RowBatch |
starrocks.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当starrocks.deserialize.arrow.async为true时生效 |
starrocks.write.fields | – | 指定写入starrocks表的字段或者字段顺序,多列之间使用逗号分隔。 默认写入时要按照starrocks表字段顺序写入全部字段。 |
sink.batch.size | 10000 | 单次写BE的最大行数 |
sink.max-retries | 1 | 写BE失败之后的重试次数 |
SQL 和 Dataframe 专有配置
Key | Default Value | Comment |
---|---|---|
user | – | 访问starrocks的用户名 |
password | – | 访问starrocks的密码 |
starrocks.filter.query.in.max.count | 100 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 |
RDD 专有配置
Key | Default Value | Comment |
---|---|---|
starrocks.request.auth.user | – | 访问starrocks的用户名 |
starrocks.request.auth.password | – | 访问starrocks的密码 |
starrocks.read.field | – | 读取starrocks表的列名列表,多列之间使用逗号分隔 |
starrocks.filter.query | – | 过滤读取数据的表达式,此表达式透传给starrocks。starrocks使用此表达式完成源端数据过滤。 |
starrocks 和 Spark 列类型映射关系
starrocks Type | Spark Type |
---|---|
NULL_TYPE | DataTypes.NullType |
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DATE | DataTypes.StringType1 |
DATETIME | DataTypes.StringType1 |
BINARY | DataTypes.BinaryType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
LARGEINT | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
DECIMAL | DecimalType |
HLL | Unsupported datatype |
- 注:Connector中,暂将
DATE
和DATETIME
映射为String
。