4.3 写入数据
目前支持DataStream和DataStream格式的数据流写入Iceberg表。
1)写入方式支持 append、overwrite、upsert
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<RowData> input = env.fromElements("")
.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String s) throws Exception {
GenericRowData genericRowData = new GenericRowData(2);
genericRowData.setField(0, 99L);
genericRowData.setField(1, 99L);
return genericRowData;
}
});
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append() // append方式
//.overwrite(true) // overwrite方式
//.upsert(true) // upsert方式
;
env.execute("Test Iceberg DataStream");
写数据报错:
org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode=...
在hdfs-site.xml中添加下方配置,修改后重启hadoop
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
2)写入选项
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
可配置选项如下:
td {white-space:nowrap;border:1px solid #dee0e3;font-size:10pt;font-style:normal;font-weight:normal;vertical-align:middle;word-break:normal;word-wrap:normal;}选项 | 默认值 | 说明 |
---|---|---|
write-format | Parquet同write.format.default | 写入操作使用的文件格式:Parquet, avro或orc |
target-file-size-bytes | 536870912(512MB)同write.target-file-size-bytes | 控制生成的文件的大小,目标大约为这么多字节 |
upsert-enabled | 同write.upsert.enabled, | |
overwrite-enabled | FALSE | 覆盖表的数据,不能和UPSERT模式同时开启 |
distribution-mode | None同 write.distribution-mode | 定义写数据的分布方式:Ø none:不打乱行;Ø hash:按分区键散列分布;Ø range:如果表有SortOrder,则通过分区键或排序键分配 |
compression-codec | 同 write.(fileformat).compression-codec | |
compression-level | 同 write.(fileformat).compression-level | |
compression-strategy | 同write.orc.compression-strategy |
4.4 合并小文件
Iceberg现在不支持在flink sql中检查表,需要使用Iceberg提供的Java API来读取元数据来获得表信息。可以通过提交Flink批处理作业将小文件重写为大文件:
import org.apache.iceberg.flink.actions.Actions;
// 1.获取 Table对象
// 1.1 创建 catalog对象
Configuration conf = new Configuration();
HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg");
// 1.2 通过 catalog加载 Table对象
Table table = hadoopCatalog.loadTable(TableIdentifier.of("default", "a"));
// 有Table对象,就可以获取元数据、进行维护表的操作
// System.out.println(table.history());
// System.out.println(table.expireSnapshots().expireOlderThan());
// 2.通过 Actions 来操作 合并
Actions.forTable(table)
.rewriteDataFiles()
.targetSizeInBytes(1024L)
.execute();
得到Table对象,就可以获取元数据、进行维护表的操作。
5. Iceberg 与Spark 集成
5.1 环境准备
5.1.1 安装 Spark
spark使用的是3.3.1版本,Iceberg使用的是1.1.0版本
1)Spark与Iceberg的版本对应关系如下
td {white-space:nowrap;border:1px solid #dee0e3;font-size:10pt;font-style:normal;font-weight:normal;vertical-align:middle;word-break:normal;word-wrap:normal;}Spark 版本 | Iceberg 版本 |
---|---|
2.4 | 0.7.0-incubating – 1.1.0 |
3.0 | 0.9.0 – 1.0.0 |
3.1 | 0.12.0 – 1.1.0 |
3.2 | 0.13.0 – 1.1.0 |
3.3 | 0.14.0 – 1.1.0 |
spark与iceberg版本匹配:iceberg 0.12.1支持spark 2.4+ ,但是不完善;建议使用spark 3.x以上版本
2)上传并解压Spark安装包
tar -zxvf spark-3.3.1-bin-hadoop3.tgz
mv spark-3.3.1-bin-hadoop3 spark-3.3.1
3)配置环境变量
sudo vim /etc/profile.d/my_env.sh
export SPARK_HOME=/opt/module/spark-3.3.1
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile.d/my_env.sh
4)拷贝iceberg的jar包到Spark的jars目录
cp /opt/software/iceberg/iceberg-spark-runtime-3.3_2.12-1.1.0.jar /opt/module/spark-3.3.1/jars
5.1.2 启动 Hadoop
5.2 Spark 配置 Catalog
Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是Iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。
vim spark-defaults.conf
5.2.1 Hive Catalog
##hive_prod hive catalog 名称 ,指定类名为sparkcatalog
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://192.168.110.120:9083
use hive_prod.db;
5.2.2 Hadoop Catalog
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://192.168.110.120:8020/warehouse/spark-iceberg
use hadoop_prod.db;
5.3 启动spark sql
./bin/spark-sql
5.4 SQL 操作
5.4.1 创建表
-- 使用catalog hadoop_prod
spark-sql> use hadoop_prod;
-- 创建数据库,默认没有
spark-sql> create database default;
spark-sql> use default;
-- 使用hive_prod catalog ,默认库是default
spark-sql> use hive_prod;
spark-sql> use default;
spark-sql> CREATE TABLE hadoop_prod.default.sample1 (
id bigint COMMENT 'unique id',
data string)
USING iceberg
hadoop_prod: hadoop_catalog名称
USING iceberg : 创建为iceberg表
·PARTITIONED BY (partition-expressions) :配置分区
·LOCATION ‘(fully-qualified-uri)’ :指定表路径
·COMMENT ‘table documentation’ :配置表备注
·TBLPROPERTIES (‘key’=‘value’, …) :配置表属性
表属性:可以参考iceberg官网文档https://iceberg.apache.org/docs/latest/configuration/
对Iceberg表的每次更改都会生成一个新的元数据文件(json文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。
如果要自动清除元数据文件,在表属性中设置write.metadata.delete-after-commit.enabled=true。这将保留一些元数据文件(直到write.metadata.previous-versions-max),并在每个新创建的元数据文件之后删除旧的元数据文件。
1)创建分区表
(1)分区表
CREATE TABLE hadoop_prod.default.sample2 (
id bigint,
data string,
category string)
USING iceberg
PARTITIONED BY (category);
(2)创建隐藏分区表
CREATE TABLE hadoop_prod.default.sample3 (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);
支持的转换有:
·years(ts):按年划分
·months(ts):按月划分
·days(ts)或date(ts):等效于dateint分区
·hours(ts)或date_hour(ts):等效于dateint和hour分区
·bucket(N, col):按哈希值划分mod N个桶
·truncate(L, col):按截断为L的值划分
字符串被截断为给定的长度
整型和长型截断为bin: truncate(10, i)生成分区0,10,20,30,…
隐藏分区 :可以指定分区字段做计算,计算的结果不用体现在字段定义中
2)使用 CTAS 语法建表
CREATE TABLE hadoop_prod.default.sample4
USING iceberg
AS SELECT * from hadoop_prod.default.sample3;
不指定分区就是无分区,需要重新指定分区、表属性:
CREATE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (bucket(8, id), hours(ts), category)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3;
3)使用 Replace table 建表
REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
AS SELECT * from hadoop_prod.default.sample3;
REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3;
-- 存在替换,不存在创建
CREATE OR REPLACE TABLE hadoop_prod.default.sample6
USING iceberg
AS SELECT * from hadoop_prod.default.sample3;
5.4.2 删除表
对于HadoopCatalog而言,运行DROP TABLE将从catalog中删除表并删除表内容。
CREATE EXTERNAL TABLE hadoop_prod.default.sample7 (
id bigint COMMENT 'unique id',
data string)
USING iceberg;
INSERT INTO hadoop_prod.default.sample7 values(1,'a');
DROP TABLE hadoop_prod.default.sample7;
对于HiveCatalog而言:
·在0.14之前,运行DROP TABLE将从catalog中删除表并删除表内容。
·从0.14开始,DROP TABLE只会从catalog中删除表,不会删除数据。为了删除表内容,应该使用DROP TABLE PURGE。
CREATE TABLE hive_prod.default.sample7 (
id bigint COMMENT 'unique id',
data string)
USING iceberg;
INSERT INTO hive_prod.default.sample7 values(1,'a');
1)删除表
DROP TABLE hive_prod.default.sample7
2)删除表和数据
DROP TABLE hive_prod.default.sample7 PURGE
5.4.3 修改表
Iceberg在Spark 3(只在spark 3中支持)中完全支持ALTER TABLE,包括:
·重命名表
·设置或删除表属性
·添加、删除和重命名列
·添加、删除和重命名嵌套字段
·重新排序顶级列和嵌套结构字段
·扩大int、float和decimal字段的类型
·将必选列变为可选列
此外,还可以使用SQL扩展来添加对分区演变的支持和设置表的写顺序。
CREATE TABLE hive_prod.default.sample1 (
id bigint COMMENT 'unique id',
data string)
USING iceberg;
1)修改表名( 不支持修改HadoopCatalog的表名 )
ALTER TABLE hive_prod.default.sample1 RENAME TO hive_prod.default.sample2;
2)修改表属性
(1)修改表属性
ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES (
'read.split.target-size'='268435456'
);
ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES (
'comment' = 'A table comment.'
);
(2)删除表属性
ALTER TABLE hive_prod.default.sample1 UNSET TBLPROPERTIES ('read.split.target-size');
3)添加列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMNS (
category string comment 'new_column'
);
-- 添加struct类型的列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN point struct<x: double, y: double>;
-- 往struct类型的列中添加字段
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN point.z double;
-- 创建struct的嵌套数组列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN points array<struct<x: double, y: double>>;
-- 在数组中的结构中添加一个字段。使用关键字'element'访问数组的元素列。
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN points.element.z double;
-- 创建一个包含Map类型的列,key和value都为struct类型
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;
-- 在Map类型的value的struct中添加一个字段。
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN pointsm.value.b int;
-- 在Spark 2.4.4及以后版本中,可以通过添加FIRST或AFTER子句在任何位置添加列:
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN new_column1 bigint AFTER id;
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN new_column2 bigint FIRST;
4)修改列
(1)修改列名
ALTER TABLE hadoop_prod.default.sample1 RENAME COLUMN data TO data1;
(2)Alter Column修改类型(只允许安全的转换)
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMNS (
idd int
);
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN idd TYPE bigint;
(3)Alter Column 修改列的注释
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id TYPE double COMMENT 'a';
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id COMMENT 'b';
(4)Alter Column修改列的顺序
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id FIRST;
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN new_column2 AFTER new_column1;
(5)Alter Column修改列是否允许为null
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id DROP NOT NULL;
ALTER COLUMN不用于更新struct类型。使用ADD COLUMN和DROP COLUMN添加或删除struct类型的字段。
5)删除列
ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN idd;
ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN point.z;
6)添加分区( spark 2.4不支持 ,Spark3,需要配置扩展)
vim spark-default.conf
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
重新进入spark-sql shell:
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD category;
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id);
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD truncate(data, 4);
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD years(ts);
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id) AS shard;
7)删除分区(Spark3,需要配置扩展)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD category;
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD bucket(16, id);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD truncate(data, 4);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD years(ts);
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD shard;
注意: 尽管删除了分区,但列仍然存在于表结构中。
删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。
当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么覆盖将覆盖每小时划分的分区,而不再覆盖按天划分的分区。
删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。
8)修改分区(Spark3,需要配置扩展)
ALTER TABLE hadoop_prod.default.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id);
9)修改表的写入顺序
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category, id;
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC, id DESC;
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST;
表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。
WRITE ORDERED BY设置了一个全局排序,即跨任务的行排序,就像在INSERT命令中使用ORDER BY一样:
INSERT INTO hadoop_prod.default.sample1;
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category;
要在每个任务内排序,而不是跨任务排序,使用local ORDERED BY:
ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id;
10)按分区并行写入
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION;
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id;
5.4.4 插入数据
CREATE TABLE hadoop_prod.default.a (
id bigint,
count bigint)
USING iceberg;
CREATE TABLE hadoop_prod.default.b (
id bigint,
count bigint,
flag string)
USING iceberg;
1)Insert Into
INSERT INTO hadoop_prod.default.a VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO hadoop_prod.default.b VALUES (1, 1, 'a'), (2, 2, 'b'), (4, 4, 'd');
2)MERGE INTO行级更新
MERGE INTO hadoop_prod.default.a t
USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
WHEN MATCHED AND u.flag='a' THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count);
5.4.5 查询数据
1)普通查询
SELECT count(1) as count, data
FROM local.db.table
GROUP BY data;
2)查询元数据
--查询表快照
SELECT * FROM hadoop_prod.default.a.snapshots;
-- 查询数据文件信息
SELECT * FROM hadoop_prod.default.a.files;
-- 查询表历史
SELECT * FROM hadoop_prod.default.a.history;
-- 查询 manifest
SELECT * FROM hadoop_prod.default.a.manifests;
5.4.6 存储过程
Procedures可以通过CALL从任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。
- 语法
按照参数名传参
CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)
当按位置传递参数时,如果结束参数是可选的,则只有结束参数可以省略。
CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)
2)快照管理
(1)回滚到指定的快照id
CALL hadoop_prod.system.rollback_to_snapshot('default.a', 7601163594701794741);
(2)回滚到指定时间的快照
CALL hadoop_prod.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000');
(3)设置表的当前快照ID
CALL hadoop_prod.system.set_current_snapshot('db.sample', 1);
(4)从快照变为当前表状态
CALL hadoop_prod.system.cherrypick_snapshot('default.a', 7629160535368763452);
CALL hadoop_prod.system.cherrypick_snapshot(snapshot_id => 7629160535368763452, table => 'default.a' );
3)元数据管理
(1)删除早于指定日期和时间的快照,但保留最近100个快照:
CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)
(2)删除Iceberg表中任何元数据文件中没有引用的文件
#列出所有需要删除的候选文件
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true)
#删除指定目录中db.sample表不知道的任何文件
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')
(3)合并数据文件(合并小文件)
CALL catalog_name.system.rewrite_data_files('db.sample');
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST');
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)');
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'));
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"');
(4)重写表清单来优化执行计划
CALL catalog_name.system.rewrite_manifests('db.sample');
#重写表db中的清单。并禁用Spark缓存的使用。这样做可以避免执行程序上的内存问题。
CALL catalog_name.system.rewrite_manifests('db.sample', false);
4)迁移表
(1)快照
CALL catalog_name.system.snapshot('db.sample', 'db.snap');
CALL catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/');
(2)迁移
CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar'));
CALL catalog_name.system.migrate('db.sample');
(3)添加数据文件
CALL spark_catalog.system.add_files(
table => 'db.tbl',
source_table => 'db.src_tbl',
partition_filter => map('part_col_1', 'A')
);
CALL spark_catalog.system.add_files(
table => 'db.tbl',
source_table => 'parquet.path/to/table'
);
5)元数据信息
(1)获取指定快照的父快照id
CALL spark_catalog.system.ancestors_of('db.tbl');
(2)获取指定快照的所有祖先快照
CALL spark_catalog.system.ancestors_of('db.tbl', 1);
CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl');
5.5 DataFrame 操作
5.5.1 环境准备
1)创建maven工程,配置pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.iceberg</groupId>
<artifactId>spark-iceberg-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.3.1</spark.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<!-- <scope>provided</scope>-->
<version>${spark.version}</version>
</dependency>
<!--fastjson <= 1.2.80 存在安全漏洞,-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- assembly打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<!--Maven编译scala所需依赖-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2)配置Catalog
val spark: SparkSession = SparkSession.builder().master("local").appName(this.getClass.getSimpleName)
//指定hive catalog, catalog名称为iceberg_hive
.config("spark.sql.catalog.iceberg_hive", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hive.type", "hive")
.config("spark.sql.catalog.iceberg_hive.uri", "thrift://hadoop1:9083")
// .config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名称为iceberg_hadoop
.config("spark.sql.catalog.iceberg_hadoop", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hadoop.type", "hadoop")
.config("spark.sql.catalog.iceberg_hadoop.warehouse", "hdfs://hadoop1:8020/warehouse/spark-iceberg")
.getOrCreate()
5.5.2 读取表
1)加载表
spark.read
.format("iceberg")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()
或
// 仅支持Spark3.0以上
spark.table("iceberg_hadoop.default.a")
.show()
2)时间旅行:指定时间查询
spark.read
.option("as-of-timestamp", "499162860000")
.format("iceberg")
.load("hdfs://192.168.110.120:8020/warehouse/spark-iceberg/default/a")
.show()
3)时间旅行:指定快照id查询
spark.read
.option("snapshot-id", 7601163594701794741L)
.format("iceberg")
.load("hdfs://192.168.110.120:8020/warehouse/spark-iceberg/default/a")
.show()
4)增量查询
spark.read
.format("iceberg")
.option("start-snapshot-id", "10963874102873")
.option("end-snapshot-id", "63874143573109")
.load("hdfs://192.168.110.120:8020/warehouse/spark-iceberg/default/a")
.show()
查询的表只能是append的方式写数据,不支持replace, overwrite, delete操作。
5.5.3 写入表
1)插入数据并建表
package org.scala.iceberg.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo1 {
def main(args: Array[String]): Unit = {
//写入表报错:Permssion denied: user=Administrator, access=WRITE,inode="/warehouse/spark-iceberg/default"
//可以在最上方加入下边这行代码设置 HADOOP_PROXY_USER 环境变量 为root用户
System.setProperty("HADOOP_USER_NAME","root")
// 1. 创建catalog
val spark: SparkSession = SparkSession.builder.master("local[*]").appName(this.getClass.getSimpleName)
// 设置hive_catalog ,catalog_name为 iceberg_hive
.config("spark.sql.catalog.iceberg_hive", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hive.type", "hive")
.config("spark.sql.catalog.iceberg_hive.uri", "thrift://192.168.110.120:9083")
//支持iceberg
.config("iceberg.engine.hive.enabled", "true")
// 设置hive_catalog ,catalog_name为 iceberg_hive
.config("spark.sql.catalog.iceberg_hadoop", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hadoop.type", "hadoop")
.config("spark.sql.catalog.iceberg_hadoop.warehouse", "hdfs://192.168.110.120:8020/warehouse/spark-iceberg")
.getOrCreate
// 3. 写入表
val df: DataFrame = spark.createDataFrame(Seq(Sample(1, "222", "222"), Sample(2, "test", "22")))
df.writeTo("iceberg_hadoop.default.table1").create()
}
case class Sample(id:Int,data:String,category:String)
}
2)插入并修改表结构 指定表属性,分区
//隐式转换
import spark.implicits._
df.writeTo("iceberg_hadoop.default.table1")
.tableProperty("write.format.default", "orc")
.partitionedBy($"category")
.createOrReplace()
3)append追加
df.writeTo("iceberg_hadoop.default.table1").append()
4)动态分区覆盖
val df1: DataFrame = spark.createDataFrame(Seq(Sample(11, "222", "a1"), Sample(22, "test", "b1")))
df1.writeTo("iceberg_hadoop.default.table1").overwritePartitions()
5)静态分区覆盖
//隐式转换
import spark.implicits._
df.writeTo("iceberg_hadoop.default.table1").overwrite($"category" === "b1")
6)插入分区表且分区内排序
df.sortWithinPartitions("category")
.writeTo("iceberg_hadoop.default.table1")
.append()
5.5.4 维护表
1)获取Table对象
(1)HadoopCatalog
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://hadoop1:8020/warehouse/spark-iceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("db","table1"))
(2)HiveCatalog
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)
val properties = new util.HashMapString,String
properties.put("warehouse", "hdfs://hadoop1:8020/warehouse/spark-iceberg")
properties.put("uri", "thrift://hadoop1:9083")
catalog.initialize("hive", properties)
val table: Table = catalog.loadTable(TableIdentifier.of("db", "table1"))
2)快照过期清理
每次写入Iceberg表都会创建一个表的新快照或版本。快照可以用于时间旅行查询,或者可以将表回滚到任何有效的快照。建议设置快照过期时间,过期的旧快照将从元数据中删除(不再可用于时间旅行查询)。
// 1天过期时间
val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
table.expireSnapshots()
.expireOlderThan(tsToExpire)
.commit()
或使用SparkActions来设置过期:
//SparkActions可以并行运行大型表的表过期设置
SparkActions.get()
.expireSnapshots(table)
.expireOlderThan(tsToExpire)
.execute()
3)删除无效文件
在Spark和其他分布式处理引擎中,任务或作业失败可能会留下未被表元数据引用的文件,在某些情况下,正常的快照过期可能无法确定不再需要并删除该文件。
SparkActions
.get()
.deleteOrphanFiles(table)
.execute()
4)合并小文件
数据文件过多会导致更多的元数据存储在清单文件中,而较小的数据文件会导致不必要的元数据量和更低效率的文件打开成本。
SparkActions
.get()
.rewriteDataFiles(table)
.filter(Expressions.equal("category", "a"))
.option("target-file-size-bytes", 1024L.toString) //1KB
.execute()