Flink 1.11 与 Iceberg 0.11.1 集成 #
MySQL CDC -> Iceberg #
CREATE TABLE user_login
(
id
INT NOT NULL,
name
CHAR(120) NOT NULL,
phone
char(11)
) WITH (
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘172.17.3.49’,
‘port’ = ‘13306’,
‘username’ = ‘quanmin110’,
‘password’ = ‘quanmin110.com’,
‘database-name’ = ‘user_login’,
‘table-name’ = ‘flink_metric_test’
);
./bin/sql-client.sh embedded -j iceberg/iceberg-flink-runtime-0.12.0.jar shell
CREATE CATALOG hadoop_catalog WITH ( ‘type'='iceberg’, ‘catalog-type'='hadoop’, ‘warehouse'='hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse’, ‘property-version'='1’ );
hdfs dfs -mkdir -p /user/admin/iceberg
CREATE TABLE hadoop_catalog
.default
.sample2
(
id STRING,
data STRING
);
INSERT INTO hadoop_catalog
.default
.sample
VALUES (1, ‘a’);
// batch
SET execution.type = batch;
INSERT OVERWRITE hadoop_catalog
.default
.sample2
VALUES (‘1’, ‘a’);
select * from sample; update sample set data='newSet’ where id = 1;
Hive 2.3.4 #
- add jar
add jar /home/admin/private/zhoukai/iceberg/jar/iceberg-hive-runtime-0.12.1.jar;
或者 copy iceberg-hive-runtime-0.12.1.jar 到 HIVE_HOME/lib
下
- Hadoop Catalog
SET iceberg.catalog.hadoop.type=hadoop;
SET iceberg.catalog.hadoop.warehouse=hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse;
CREATE EXTERNAL TABLE hadoop_sample2
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse/default/sample'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
CREATE EXTERNAL TABLE hadoop_sample2 STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ TBLPROPERTIES (‘iceberg.catalog'='hadoop’);
CREATE EXTERNAL TABLE hadoop_sample1 STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ LOCATION ‘hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse/default/sample’ TBLPROPERTIES (‘iceberg.catalog'='location_based_table’);
INSERT INTO hadoop_sample VALUES (1,‘a’);
CREATE EXTERNAL TABLE config_delete_ck_iceberg STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ LOCATION ‘hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse/catalog_test/default/config_delete_ck_iceberg’ TBLPROPERTIES (‘iceberg.catalog'='location_based_table’);
- 建表
CREATE TABLE `config` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`key` varchar(10) NOT NULL,
`value` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `key` (`key`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
- 导入数据
INSERT INTO `test_cdc`.`config` (`id`, `key`, `value`) VALUES ('1', '1', '北京');
INSERT INTO `test_cdc`.`config` (`id`, `key`, `value`) VALUES ('2', '2', '天津');
INSERT INTO `test_cdc`.`config` (`id`, `key`, `value`) VALUES ('3', '3', '河南');
INSERT INTO `test_cdc`.`config` (`id`, `key`, `value`) VALUES ('4', '4', '河北');
QA #
覆盖写TableMeta文件异常 #
触发checkpoint #
org.apache.iceberg.flink.sink.IcebergFilesCommitter#snapshotState
完成checkpoint #
IcebergFilesCommitter.commitOperation(SnapshotUpdate<?>, int, int, String, String, long) (org.apache.iceberg.flink.sink)
IcebergFilesCommitter.commitDeltaTxn(NavigableMap<Long, WriteResult>, String, long)(2 usages) (org.apache.iceberg.flink.sink)
IcebergFilesCommitter.commitUpToCheckpoint(NavigableMap<Long, byte[]>, String, long) (org.apache.iceberg.flink.sink) IcebergFilesCommitter.notifyCheckpointComplete(long) (org.apache.iceberg.flink.sink)