庆祝社区一周年!第三连!支持读写的Spark Connector

Spark Connector(测试版)

Spark Connector 可以支持通过 Spark 读取 StarRocks 中存储的数据,也支持通过Spark写入数据到StarRocks。

  • 支持从StarRocks中读取数据
  • 支持Spark DataFrame批量/流式 写入StarRocks
  • 可以将StarRocks表映射为DataFrame或者RDD,推荐使用DataFrame
  • 支持在StarRocks端完成数据过滤,减少数据传输量。

版本兼容

Connector Spark StarRocks Java Scala
starrocks-spark2_2.11-1.0.1 2.x 8 2.11

特别说明:本工具是个人基于starrocks-spark-connector-1.0的简易改造版本。
下载地址:starrocks-spark2_2.11-1.0.1.jar (9.9 MB)

Maven信息

<dependency>
	<groupId>com.starrocks</groupId>
	<artifactId>starrocks-spark2_2.11</artifactId>
	<version>1.0.1</version>
</dependency>

JAR加载

目前该jar包未上传至maven 中央仓库,需要手动加载到自己的项目工程中,命令如下:

mvn install:install-file
-DgroupId=包名
-DartifactId=项目名
-Dversion=版本号
-Dpackaging=jar
-Dfile=jar文件所在路径

例如,假设本地jar在D:\starrocks-spark2_2.11-1.0.1.jar,则加载命令为:

mvn install:install-file -DgroupId=com.starrocks -DartifactId=starrocks-spark2_2.11 -Dversion=1.0.1 -Dpackaging=jar -Dfile=D:\starrocks-spark2_2.11-1.0.1.jar

使用示例

读取

SQL

CREATE TEMPORARY VIEW spark_starrocks
USING starrocks
OPTIONS(
  "table.identifier"="$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME",
  "fenodes"="$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT",
  "user"="$YOUR_STARROCKS_USERNAME",
  "password"="$YOUR_STARROCKS_PASSWORD"
);

SELECT * FROM spark_starrocks;

DataFrame

val starrocksSparkDF = spark.read.format("starrocks")
  .option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
  .option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
  .option("user", "$YOUR_STARROCKS_USERNAME")
  .option("password", "$YOUR_STARROCKS_PASSWORD")
  .load()

starrocksSparkDF.show(5)

RDD

import org.apache.starrocks.spark._
val starrocksSparkRDD = sc.starrocksRDD(
  tableIdentifier = Some("$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME"),
  cfg = Some(Map(
    "starrocks.fenodes" -> "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT",
    "starrocks.request.auth.user" -> "$YOUR_STARROCKS_USERNAME",
    "starrocks.request.auth.password" -> "$YOUR_STARROCKS_PASSWORD"
  ))
)

starrocksSparkRDD.collect()

pySpark

starrocksSparkDF = spark.read.format("starrocks")
.option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
.option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
.option("user", "$YOUR_STARROCKS_USERNAME")
.option("password", "$YOUR_STARROCKS_PASSWORD")
.load()
# show 5 lines data 
starrocksSparkDF.show(5)

写入

SQL

CREATE TEMPORARY VIEW spark_starrocks
USING starrocks
OPTIONS(
  "table.identifier"="$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME",
  "fenodes"="$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT",
  "benodes"="$YOUR_STARROCKS_BE_HOSTNAME:$YOUR_STARROCKS_BE_HTTP_PORT",
  "user"="$YOUR_STARROCKS_USERNAME",
  "password"="$YOUR_STARROCKS_PASSWORD"
);

INSERT INTO spark_starrocks VALUES ("VALUE1","VALUE2",...);
# or
INSERT INTO spark_starrocks SELECT * FROM YOUR_TABLE;

DataFrame(batch/stream)

## batch sink
val mockDataDF = List(
  (3, "440403001005", "starrocks.com"),
  (1, "4404030013005", "starrocks.cn"),
  (2, null, "23333.com")
).toDF("id", "sr_code", "sr_name")
mockDataDF.show(5)

mockDataDF.write.format("starrocks")
  .option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
  .option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
  .option("starrocks.benodes", "$YOUR_STARROCKS_BE_HOSTNAME:$YOUR_STARROCKS_BE_HTTP_PORT")
  .option("user", "$YOUR_STARROCKS_USERNAME")
  .option("password", "$YOUR_STARROCKS_PASSWORD")
  //其它选项
  //指定你要写入的字段
  .option("starrocks.write.fields","$YOUR_FIELDS_TO_WRITE")
  .save()

## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
  .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
  .option("startingOffsets", "latest")
  .option("subscribe", "$YOUR_KAFKA_TOPICS")
  .format("kafka")
  .load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
  .writeStream
  .format("starrocks")
  .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
  .option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
  .option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
  .option("starrocks.benodes", "$YOUR_STARROCKS_BE_HOSTNAME:$YOUR_STARROCKS_BE_HTTP_PORT")
  .option("user", "$YOUR_STARROCKS_USERNAME")
  .option("password", "$YOUR_STARROCKS_PASSWORD")
  //其它选项
  //指定你要写入的字段
  .option("starrocks.write.fields","$YOUR_FIELDS_TO_WRITE")
  .start()
  .awaitTermination()

配置

通用配置项

Key Default Value Comment
starrocks.fenodes starrocks FE http 地址,支持多个地址,使用逗号分隔
starrocks.benodes starrocks BE http 地址,支持多个地址,使用逗号分隔
starrocks.table.identifier starrocks 表名,如:db1.tbl1
starrocks.request.retries 3 向starrocks发送请求的重试次数
starrocks.request.connect.timeout.ms 30000 向starrocks发送请求的连接超时时间
starrocks.request.read.timeout.ms 30000 向starrocks发送请求的读取超时时间
starrocks.request.query.timeout.s 3600 查询starrocks的超时时间,默认值为1小时,-1表示无超时限制
starrocks.request.tablet.size Integer.MAX_VALUE 一个RDD Partition对应的starrocks Tablet个数。
此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对starrocks造成更大的压力。
starrocks.batch.size 1024 一次从BE读取数据的最大行数。增大此数值可减少Spark与starrocks之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。
starrocks.exec.mem.limit 2147483648 单个查询的内存限制。默认为 2GB,单位为字节
starrocks.deserialize.arrow.async false 是否支持异步转换Arrow格式到spark-starrocks-connector迭代所需的RowBatch
starrocks.deserialize.queue.size 64 异步转换Arrow格式的内部处理队列,当starrocks.deserialize.arrow.async为true时生效
starrocks.write.fields 指定写入starrocks表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照starrocks表字段顺序写入全部字段。
sink.batch.size 10000 单次写BE的最大行数
sink.max-retries 1 写BE失败之后的重试次数

SQL 和 Dataframe 专有配置

Key Default Value Comment
user 访问starrocks的用户名
password 访问starrocks的密码
starrocks.filter.query.in.max.count 100 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。

RDD 专有配置

Key Default Value Comment
starrocks.request.auth.user 访问starrocks的用户名
starrocks.request.auth.password 访问starrocks的密码
starrocks.read.field 读取starrocks表的列名列表,多列之间使用逗号分隔
starrocks.filter.query 过滤读取数据的表达式,此表达式透传给starrocks。starrocks使用此表达式完成源端数据过滤。

starrocks 和 Spark 列类型映射关系

starrocks Type Spark Type
NULL_TYPE DataTypes.NullType
BOOLEAN DataTypes.BooleanType
TINYINT DataTypes.ByteType
SMALLINT DataTypes.ShortType
INT DataTypes.IntegerType
BIGINT DataTypes.LongType
FLOAT DataTypes.FloatType
DOUBLE DataTypes.DoubleType
DATE DataTypes.StringType1
DATETIME DataTypes.StringType1
BINARY DataTypes.BinaryType
DECIMAL DecimalType
CHAR DataTypes.StringType
LARGEINT DataTypes.StringType
VARCHAR DataTypes.StringType
DECIMAL DecimalType
HLL Unsupported datatype
  • 注:Connector中,暂将DATEDATETIME映射为String
2赞

爱了爱了,这个标题甚好! :clap:

这个项目能开源么?能否给我们一个学习的机会? :grinning:

已让同事放到Github:
https://github.com/gaoy121/starrocks-connector-for-apache-spark

1赞

谢谢,非常感谢,我们认真学习一下

大神,你们编译的时候,thrift用的哪个版本啊?

请问我在用工具类写入StarRocks 表的时候程序一直卡死,不报错但是也没有数据写进去可能是什么原因呀?

可是不支持spark3啊 利用jdbc写9030端口一直超时。求解决

spark3 支持好多bug。必须要cache、show之后才能写

1赞

方便上传一下spark3的jar包吗?我的电脑安装thrift有点费劲

请问你编译时也是运行:sh build.sh 3时报错如下是吗?
`[INFO] — maven-thrift-plugin:0.1.11:compile (thrift-sources) @ starrocks-spark2_2.11 —
[ERROR] thrift failed output:
[ERROR] thrift failed error: /bin/sh: thrift: command not found

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 8.023 s
[INFO] Finished at: 2023-02-14T15:51:33+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.thrift.tools:maven-thrift-plugin:0.1.11:compile (thrift-sources) on project starrocks-spark2_2.11: thrift did not exit cleanly. Review output for more information. -> [Help 1]`
请问你最终解决了吗?我参考https://zhuanlan.zhihu.com/p/119404869未能解决。

请问我编译时报错(见楼下评论),你spark3解决了吗?

已解决,详见github上issue:https://github.com/gaoy121/starrocks-connector-for-apache-spark/issues/1

写入一段时间数据后,就会报
Caused by: com.starrocks.connector.spark.exception.StreamLoadException: stream load error: Too many versions. tablet_id: 339236, version_count: 1018, limit: 1000:
轻微这个versions是怎么判断的?

请问解决了 我也遇到这个问题 如何解决

提交到集群上报java.lang.ClassNotFoundException: Failed to find data source: starrocks. Please find packages at http://spark.apache.org/third-party-projects.html,spark connector也打进去包里了,求助

之前的代码没有再继续维护,基于官网git上的代码重新打了一个connector jar,整理了一个小demo,辛苦测试看看:
starrocks-spark-connector-3.2_2.12-1.1.1-SNAPSHOT.jar (1.9 MB)
starrocks-spark-demo.zip (4.8 KB)

23/07/13 15:11:28 WARN BackendClient: Get next from StarRocks BE{host=‘xxxx’, port=9060} failed.
com.starrocks.shaded.org.apache.thrift.transport.TTransportException: MaxMessageSize reached

com.starrocks.shaded.org.apache.thrift.protocol.TProtocolException: Bad version in readMessageBegin

请问用 [starrocks-spark-connector-3.2_2.12-1.1.1-SNAPSHOT.jar] 从SR读取数据,有时候能成功,有时候会失败,这是报错信息。

将打包后的 starrocks-spark-connector-3.2_2.12-1.1.1-SNAPSHOT.jar 上传到Spark集群 jars 目录下 ,否则会报错

Exception in thread “main” java.lang.ClassNotFoundException:
Failed to find data source: starrocks. Please find packages at
http://spark.apache.org/third-party-projects.html

您好,这块您这边有解决吗?我们也遇到了一样的问题