Iceberg介绍及集成Hive、Flink、Spark(上篇)

1. Iceberg 介绍

1.1 什么是Iceberg

Iceberg是一个面向海量数据分析场景的开放 表格式(Table Format)。 表格式可以理解为元数据以及数据文件的一种组织方式,处于计算框架(Flink,Spark…)之下,数据文件之上。

可以跟一些主流的框架做集成

1.2 Iceberg的特性

1.2.1 Iceberg支持隐藏分区

隐藏分区 :可以指定分区字段做计算,计算的结果不用体现在字段定义中

1.2.2 Iceberg支持表演化

Iceberg可以通过SQL的方式进行表级别模式演进。进行这些操作的时候,代价极低。 不存在读出数据重新写入或者迁移数据这种费时费力的操作。

1.2.3 Iceberg支持Scheme 演化

Iceberg支持下面几种模式演化:

ADD:向表或者嵌套结构增加新列

Drop:从表中或者嵌套结构中移除一列

Rename:重命名表中或者嵌套结构中的一列

Update:将复杂结构(struct, map<key, value>, list)中的基本类型扩展类型长度, 比如tinyint修改成int.

Iceberg保证模式演化(Schema Evolution)是没有副作用的独立操作流程, 一个元数据操作, 不会涉及到重写数据文件的过程。具体的如下:

增加列时候,不会从另外一个列中读取已存在的的数据

删除列或者嵌套结构中字段的时候,不会改变任何其他列的值

更新列或者嵌套结构中字段的时候,不会改变任何其他列的值

改变列列或者嵌套结构中字段顺序的时候,不会改变相关联的值

在表中Iceberg 使用唯一ID来定位每一列的信息。新增一个列的时候,会新分配给它一个唯一ID, 并且绝对不会使用已经被使用的ID。

使用名称或者位置信息来定位列的, 都会存在一些问题, 比如使用名称的话,名称可能会重复, 使用位置的话, 不能修改顺序并且废弃的字段也不能删除。

1.2.4 Iceberg支持分区演化

Iceberg可以在一个已存在的表上直接修改,因为Iceberg的查询流程并不和分区信息直接关联。

改变一个表的分区策略时,对应修改分区之前的数据不会改变, 依然会采用老的分区策略,新的数据会采用新的分区策略,也就是说同一个表会有两种分区策略,旧数据采用旧分区策略,新数据采用新新分区策略, 在元数据里两个分区策略相互独立,不重合。

asynccode

上图中booking_table表2008年按月分区,进入2009年后改为按天分区,这种中分区策略共存于该表中。

借助Iceberg的隐藏分区(Hidden Partition),在写SQL 查询的时候,不需要在SQL中特别指定分区过滤条件,Iceberg会自动分区,过滤掉不需要的数据。

Iceberg分区演化操作同样是一个元数据操作, 不会重写数据文件。

1.2.5 Iceberg支持列顺序演化

Iceberg可以在一个已经存在的表上修改排序策略。修改了排序策略之后, 旧数据依旧采用老排序策略不变。往Iceberg里写数据的计算引擎总是会选择最新的排序策略, 但是当排序的代价极其高昂的时候, 就不进行排序了。

1.3 Iceberg数据类型


详细可以参考官网文档:https://iceberg.apache.org/spec/?h=data+type#primitive-types

1.4 Iceberg存储结构

这个表是在hive底下存储的,上图中data是真正的数据目录,metadata 是元数据

data files : 真实存储数据的文件。一般是在表的数据存储目录的data目录下,是以parquet格式存储的 (什么时候会生成parquet文件?每一次向表里插入数据,就会生成一个parquet( 每一次操作iceberg,都会生成一个parquet文件

Manifest file: 清单文件,每次生成parquet的时候都会生成一个清单文件

每一个manifest file文件都指向parquet的详细描述(表是哪个表,分区是哪个分区,分区列是什么,包含几条数据)

Manifest list: manifest list是一个元数据文件,它列出构建表快照(Snapshot)的清单(Manifest file)。这个元数据文件中存储的是Manifest file列表,每个Manifest file占据一行。

Metadata file: 元数据文件,以json格式存储

snapshot: 就等于 Manifest list 表快照。每一个avro文件对应一个snapshot. 【每操作一次iceberg表,都会生成一个data file,同时也会生成一个snapshot】 快照可以回滚

注意:向上图表中插入一条数据,生成一个parquet文件,parquet生成manifest file ,生成的manifest file 会存储在snapshot 中,

插入第二条一条数据,会再生成一个parquet文件,parquet生成manifest file ,新的manifest file文件和第一个manifest file 存储在第二个snapshot中。

2. Iceberg与Hive集成

这里使用的 hive版本是3.1.2 iceberg版本是1.1.0

2.1 环境准备

1)Hive与Iceberg的版本对应关系如下

Hive 版本 官方推荐Hive版本 Iceberg 版本
2.x 2.3.8 0.8.0-incubating – 1.1.0
3.x 3.1.2 0.10.0 – 1.1.0

Iceberg与Hive 2和Hive 3.1.2/3的集成,支持以下特性:

·创建表

·删除表

·读取表

·插入表(INSERT into)

更多功能需要Hive 4.x(目前alpha版本)才能支持。

2). 上传jar包,拷贝到Hive的auxlib目录中

用到以下两个jar包

mkdir auxlib
cp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlib
cp libfb303-0.9.3.jar /opt/module/hive/auxlib

3)修改hive-site.xml,添加配置项

<property>
    <name>iceberg.engine.hive.enabled</name>
    <value>true</value>
</property>

<property>
    <name>hive.aux.jars.path</name>
    <value>/opt/module/hive/auxlib</value>
</property>

4)使用TEZ引擎注意事项(可选):

(1)使用Hive版本>=3.1.2,需要TEZ版本>=0.10.1

(2)指定tez更新配置:

<property>
    <name>tez.mrreader.config.update.properties</name>
    <value>hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids</value>
</property>

(3)从Iceberg 0.11.0开始,如果Hive使用Tez引擎,需要关闭向量化执行:

<property>
    <name>hive.vectorized.execution.enabled</name>
    <value>false</value>
</property>

5)启动HMS服务

6)启动 Hadoop

2.2 创建 管理catalog

Iceberg支持多种不同的Catalog类型,例如:Hive、Hadoop、亚马逊的AWS Glue和自定义Catalog。

根据不同配置,分为三种情况:

·没有设置iceberg.catalog,默认使用HiveCatalog

配置项 说明
iceberg.catalog.<catalog_name>.type Catalog的类型: hive, hadoop, 如果使用自定义Catalog,则不设置
iceberg.catalog.<catalog_name>.catalog-impl Catalog的实现类, 如果上面的type没有设置,则此参数必须设置
iceberg.catalog.<catalog_name>. Catalog的其他配置项

·设置了 iceberg.catalog的类型,使用指定的Catalog类型,如下表格:

·设置 iceberg.catalog=location_based_table,直接通过指定的根路径来加载Iceberg表

2.2.1 默认使用 HiveCatalog

## 创建表
CREATE TABLE iceberg_test1 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
 
注意:STORED BY 指定类名使iceberg_test1成为iceberg表
如果使用的是hive 4, STORED BY 'Iceberg' (类名只需要写Iceberg即可)

## 插入数据
INSERT INTO iceberg_test1 values(1);
## 查看表:
describe iceberg_test1;
## 查看表的详细信息:
describe formatted iceberg_test1;

创建的表目录在默认的hive仓库路径下。

2.2.2 指定 Catalog 类型

1)使用 HiveCatalog

set iceberg.catalog.iceberg_hive.type=hive;

## iceberg.catalog 前缀是固定的
## iceberg_hive: <catalog_name>是自定义的,但是下边的<catalog_name>需要保持一致
## type:catalog的类型
set iceberg.catalog.iceberg_hive.uri=thrift://192.168.110.120:9083;# 元数据服务的地址
set iceberg.catalog.iceberg_hive.clients=10;# 连接并行数,可设置也可以不设置

#不设置warehouse
set iceberg.catalog.iceberg_hive.warehouse=hdfs://192.168.110.120:8020/warehouse/iceberg-hive; #仓库路径的地址。也是可指定可不指定;不指定默认是hive-site里配置的默认仓库路径地址 (3.1.2版本指定仓库路径地址未生效,实际还是使用的hive-site.xml中的仓库路径地址)

--建表
CREATE TABLE iceberg_test2 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES('iceberg.catalog'='iceberg_hive'); ##用上边创建好的catalog
 
INSERT INTO iceberg_test2 values(1);

2)使用 HadoopCatalog

set iceberg.catalog.iceberg_hadoop.type=hadoop;#iceberg_hadoop:<catalog_name>

set iceberg.catalog.iceberg_hadoop.warehouse=hdfs://192.168.110.120:8020/warehouse/iceberg-hadoop;

--建表
CREATE TABLE iceberg_test3 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://192.168.110.120:8020/warehouse/iceberg-hadoop/default/iceberg_test3'
TBLPROPERTIES('iceberg.catalog'='iceberg_hadoop');
##使用hadoop catalog时必须指定location,location需要跟catalog仓库路径地址保持一致;
##后边需要写上库名表名,默认是default库下
 
INSERT INTO iceberg_test3 values(1);

Hive catalog 和hadoop catalog的区别

区别在于元数据文件的名称,可以在hdfs中查看metadata进行区分


2.2.3 指定路径加载

如果HDFS中已经存在iceberg格式表,我们可以通过在Hive中创建Icerberg格式表指定对应的location路径映射数据。

CREATE EXTERNAL TABLE iceberg_test4 (i int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://192.168.110.120:8020/warehouse/iceberg-hadoop/default/iceberg_test3'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');

注意: 表iceberg_test4需要跟原表数据类型保持一致

location: location要指定到具体的表名

如果指定路径加载时创建的是内部表,删除iceberg_test4表的同时,会删除iceberg_test3表的数据(show tables;可以查到表,但是查表数据会报错:Table does not exist)。所以指定路径加载 建议创建外部表。

2.3 DDL操作

2.3.1 创建表

1)创建外部表

CREATE EXTERNAL TABLE iceberg_create1 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
-- 查看表相情
describe formatted iceberg_create1;

2)创建内部表

CREATE TABLE iceberg_create2 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

describe formatted iceberg_create2;

3)创建分区表

CREATE EXTERNAL TABLE iceberg_create3 (id int,name string)
PARTITIONED BY (age int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

describe formatted iceberg_create3;

Hive语法创建分区表,不会在HMS中创建分区(这个指的是hive分隔的分区),而是将分区数据转换为Iceberg标识分区。这种情况下不能使用Iceberg的分区转换,例如:days(timestamp),如果想要使用Iceberg格式表的分区转换标识分区,需要使用Spark或者Flink引擎创建表。

2.3.2 修改表

只支持HiveCatalog表修改表属性(字段调整,分区变化都不支持修改。hive 4支持字段调整等),Iceberg表属性和Hive表属性存储在HMS中是同步的。

ALTER TABLE iceberg_create1 SET TBLPROPERTIES('external.table.purge'='FALSE');

2.3.3 插入表

支持标准单表INSERT INTO操作

INSERT INTO iceberg_create2 VALUES (1);
INSERT INTO iceberg_create1 select * from iceberg_create2;

在HIVE 3.x中,INSERT OVERWRITE虽然能执行,但其实是追加。

2.3.4 删除表

DROP TABLE iceberg_create1;

3. Iceberg与Flink SQL集成

这里使用flink版本是 flink-1.16.0 ,Iceberg版本是 1.1.0

3.1 环境准备

3.1.1 安装 Flink

1)Flink与Iceberg的版本对应关系如下

Flink 版本 Iceberg 版本
1.11 0.9.0 – 0.12.1
1.12 0.12.0 – 0.13.1
1.13 0.13.0 – 1.0.0
1.14 0.13.0 – 1.1.0
1.15 0.14.0 – 1.1.0
1.16 1.1.0 – 1.1.0

2)上传并解压Flink安装包

tar -zxvf flink-1.16.0-bin-scala_2.12.tgz -C /opt/module/

3)配置环境变量

sudo vim /etc/profile.d/my_env.sh

export HADOOP_CLASSPATH=`hadoop classpath` #这样就不需要在flink lib下放hadoop相关的包

source /etc/profile.d/my_env.sh

4)拷贝iceberg的jar包到Flink的lib目录

cp /opt/software/iceberg/iceberg-flink-runtime-1.16-1.1.0.jar /opt/module/flink-1.16.0/lib

3.1.2 启动 Hadoop

3.1.3 启动 sql-client

1)修改flink-conf.yaml配置

vim /opt/module/flink-1.16.0/conf/flink-conf.yaml


classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4

state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop1:8020/ckps
state.backend.incremental: true

(2)启动Flink

/opt/module/flink-1.16.0/bin/start-cluster.sh

(3)启动Flink的sql-client

/opt/module/flink-1.16.0/bin/sql-client.sh embedded

3.2 创建和使用 Catalog

3.2.1 语法说明

CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 

注意:

· type: 必须是iceberg。(必填)

· catalog-type: 内置了hive和hadoop两种catalog,也可以使用catalog-impl来自定义catalog。(可选)

· catalog-impl: 自定义catalog实现的全限定类名。如果未设置catalog-type,则必须设置。(可选)

· property-version: 描述属性版本的版本号。此属性可用于向后兼容,以防属性格式更改。当前属性版本为1。(可选)

· cache-enabled: 是否启用目录缓存,默认值为true。(可选)

· cache.expiration-interval-ms: 本地缓存catalog条目的时间(以毫秒为单位);负值,如-1表示没有时间限制,不允许设为0。默认值为-1。(可选)

3.2.2 Hive Catalog

1)上传hive connector到flink的lib中

cp flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar /opt/module/flink-1.16.0/lib/

2)启动hive metastore服务

hive --service metastore

3)创建hive catalog

重启flink集群,重新进入sql-client

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg', 
  'catalog-type'='hive',
  'uri'='thrift://192.168.110.120:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://192.168.110.120:8020/warehouse/iceberg-hive'
);

use catalog hive_catalog;

注意:

· uri: Hive metastore的thrift uri。(必填)

· clients:Hive metastore客户端池大小,默认为2。(可选)

· warehouse: 数仓目录。(必选)

3.2.3 Hadoop Catalog

Iceberg还支持HDFS中基于目录的catalog,可以使用’catalog-type’='hadoop’配置。

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
'warehouse'='hdfs://192.168.110.120:8020/warehouse/iceberg-hadoop',
  'property-version'='1'
);

use catalog hadoop_catalog;

·warehouse:存放元数据文件和数据文件的HDFS目录。(必填)

3.2.4 配置sql-client初始化文件

vim /opt/module/flink-1.16.0/conf/sql-client-init.sql

## hive catalog
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg', 
  'catalog-type'='hive',
  'uri'='thrift://192.168.110.120:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://192.168.110.120:8020/warehouse/iceberg-hive'
);
## hadoop catalog
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
'warehouse'='hdfs://192.168.110.120:8020/warehouse/iceberg-hadoop',
  'property-version'='1'
);

## 默认使用hive_catalog(可选)
USE CATALOG hive_catalog;

后续启动sql-client时,加上 -i sql文件路径 即可完成catalog的初始化。

/opt/module/flink-1.16.0/bin/sql-client.sh embedded -i conf/sql-client-init.sql

3.3 DDL 语句

3.3.1 创建数据库

CREATE DATABASE iceberg_db;
USE iceberg_db;

3.3.2 创建表

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

建表命令现在支持最常用的flink建表语法,包括:

·PARTITION BY (column1, column2, …):配置分区,apache flink还不支持隐藏分区。

·COMMENT ‘table document’:指定表的备注

·WITH (‘key’=‘value’, …):设置表属性

目前,不支持计算列、watermark(支持主键)。

1)创建分区表

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
) PARTITIONED BY (data);

Apache Iceberg支持隐藏分区,但Apache flink不支持在列上通过函数进行分区,现在无法在flink DDL中支持隐藏分区。

2)使用LIKE语法建表

LIKE语法用于创建一个与另一个表具有相同schema、分区和属性的表。

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

CREATE TABLE  `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;

3.3.3 修改表

目前只支持修改表属性,表名

1)修改表属性

ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro');

2)修改表名

ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;

3.3.4 删除表

DROP TABLE `hive_catalog`.`default`.`sample`;

3.4 插入语句

3.4.1 INSERT INTO

INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from sample2;

3.4.2 INSERT OVERWRITE

仅支持Flink的Batch模式

SET execution.runtime-mode = batch;

INSERT OVERWRITE sample VALUES (1, 'a');

INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;

3.4.3 UPSERT

当将数据写入v2表格式时,Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。

1)建表时指定

CREATE TABLE `hive_catalog`.`iceberg_db`.`sample` (
  `id`  INT UNIQUE COMMENT 'unique id',
  `data` STRING NOT NULL,
 PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2', 
'write.upsert.enabled'='true'
);

format-version : 格式版本,默认值是1。1 表示只支持追加;2表示upsert

2)插入时指定,使用sql hint指定

INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...

插入的表,format-version需要为2。

OVERWRITE和UPSERT不能同时设置。在UPSERT模式下,如果对表进行分区,则分区字段必须也是主键。

3)读取Kafka流,upsert插入到iceberg表中

create table default_catalog.default_database.kafka(
  id int,
  data string
) with (
  'connector' = 'kafka'
  ,'topic' = 'test111'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'format' = 'json'
  ,'properties.group.id'='iceberg'
  ,'scan.startup.mode'='earliest-offset'
);

scan.startup.mode:扫描模式

INSERT INTO hive_catalog.test1.sample5 SELECT * FROM default_catalog.default_database.kafka;

3.5 查询语句

Iceberg支持Flink的流式和批量读取。

3.5.1 Batch模式

SET execution.runtime-mode = batch;
select * from sample;

3.5.2 Streaming模式

SET execution.runtime-mode = streaming;
SET table.dynamic-table-options.enabled=true;
SET sql-client.execution.result-mode=tableau;

1)从当前快照读取所有记录,然后从该快照读取增量数据

SELECT * FROM sample5 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

2)读取指定快照id(不包含)后的增量数据

SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

·monitor-interval: 连续监控新提交数据文件的时间间隔(默认为10s)。

·start-snapshot-id: 流作业开始的快照id。

注意: 如果是无界数据流式upsert进iceberg表(读kafka,upsert进iceberg表),那么再去流读iceberg表会存在读不出数据的问题。如果无界数据流式append进iceberg表(读kafka,append进iceberg表),那么流读该iceberg表可以正常看到结果。

3.6 与Flink集成的不足

支持的特性 Flink 备注
SQL create catalog
SQL create database
SQL create table
SQL create table like
SQL alter table 只支持修改表属性,不支持更改列和分区
SQL drop_table
SQL select 支持流式和批处理模式
SQL insert into 支持流式和批处理模式
SQL insert overwrite
DataStream read
DataStream append
DataStream overwrite
Metadata tables 支持Java API,不支持Flink SQL
Rewrite files action

·不支持创建隐藏分区的Iceberg表。

·不支持创建带有计算列的Iceberg表。

·不支持创建带watermark的Iceberg表。

·不支持添加列,删除列,重命名列,更改列。

·Iceberg目前不支持Flink SQL 查询表的元数据信息,需要使用Java API 实现。

4. Iceberg与Flink DataStream 集成

4.1 环境准备

4.1.1 配置pom文件

新建Maven工程,pom文件配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.iceberg.demo</groupId>
    <artifactId>flink-iceberg-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.16.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>   <!–不会打包到依赖中,只参与编译,不参与运行 –>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!--idea运行时也有webui-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
<!--            <scope>provided</scope>-->
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.16 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime-1.16</artifactId>
            <version>1.1.0</version>
        </dependency>

    </dependencies>
</project>

4.1.2 配置log4j

resources目录下新建log4j.properties。

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

4.2 读取数据

4.2.1 常规Source写法

1)Batch方式

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;

public class ReadDemo {
    public static void main(String[] args) throws Exception {
        //1.创建flink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://192.168.110.120:8020/warehouse/iceberg-hadoop/iceberg_db/sample");
        DataStream<RowData> batch = FlinkSource.forRowData()
                .env(env)
                .tableLoader(tableLoader)
                .streaming(false) // false : batch方式读取数据; true:streaming方式读取数据
                //.asOfTimestamp()
                //.startSnapshotId()
                .build();
        batch.map(r -> Tuple2.of(r.getInt(0),r.getString(1).toString()))
                .returns(Types.TUPLE(Types.INT,Types.STRING))
                .print();
        env.execute("Test Iceberg Read");
    }
}

2)Streaming方式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a"); 
DataStream<RowData> stream = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(true)
     .startSnapshotId(3821550127947089987L)
     .build();
 
stream.map(r -> Tuple2.of(r.getLong(0),r.getLong(1) ))
      .returns(Types.TUPLE(Types.LONG,Types.LONG))
      .print();
 
env.execute("Test Iceberg Read");

4.2.2 FLIP-27 Source写法

1)Batch方式

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");

        IcebergSource<RowData> source1 = IcebergSource.forRowData()
                .tableLoader(tableLoader)
                .assignerFactory(new SimpleSplitAssignerFactory())
                .build();

        DataStream<RowData> batch = env.fromSource(
                Source1,
                WatermarkStrategy.noWatermarks(),
                "My Iceberg Source",
                TypeInformation.of(RowData.class));

        batch.map(r -> Tuple2.of(r.getLong(0), r.getLong(1)))
                .returns(Types.TUPLE(Types.LONG, Types.LONG))
                .print();

        env.execute("Test Iceberg Read");

2)Streaming方式

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");

        IcebergSource source2 = IcebergSource.forRowData()
                .tableLoader(tableLoader)
                .assignerFactory(new SimpleSplitAssignerFactory())
                .streaming(true)
                .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
                .monitorInterval(Duration.ofSeconds(60))
                .build();

        DataStream<RowData> stream = env.fromSource(
                Source2,
                WatermarkStrategy.noWatermarks(),
                "My Iceberg Source",
                TypeInformation.of(RowData.class));

        stream.map(r -> Tuple2.of(r.getLong(0), r.getLong(1)))
                .returns(Types.TUPLE(Types.LONG, Types.LONG))
                .print();

        env.execute("Test Iceberg Read");