IceBerg

Flink 1.11 与 Iceberg 0.11.1 集成 #

MySQL CDC -> Iceberg #

CREATE TABLE user_login ( id INT NOT NULL, name CHAR(120) NOT NULL, phone char(11) ) WITH ( ‘connector’ = ‘mysql-cdc’, ‘hostname’ = ‘172.17.3.49’, ‘port’ = ‘13306’, ‘username’ = ‘quanmin110’, ‘password’ = ‘quanmin110.com’, ‘database-name’ = ‘user_login’, ‘table-name’ = ‘flink_metric_test’ );

./bin/sql-client.sh embedded -j iceberg/iceberg-flink-runtime-0.12.0.jar shell

CREATE CATALOG hadoop_catalog WITH ( ‘type'='iceberg’, ‘catalog-type'='hadoop’, ‘warehouse'='hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse’, ‘property-version'='1’ );

hdfs dfs -mkdir -p /user/admin/iceberg

CREATE TABLE hadoop_catalog.default.sample2( id STRING, data STRING );

INSERT INTO hadoop_catalog.default.sample VALUES (1, ‘a’);

// batch SET execution.type = batch; INSERT OVERWRITE hadoop_catalog.default.sample2 VALUES (‘1’, ‘a’);

select * from sample; update sample set data='newSet’ where id = 1;

Hive 2.3.4 #

  1. add jar
add jar /home/admin/private/zhoukai/iceberg/jar/iceberg-hive-runtime-0.12.1.jar;

或者 copy iceberg-hive-runtime-0.12.1.jar 到 HIVE_HOME/lib

  1. Hadoop Catalog
SET iceberg.catalog.hadoop.type=hadoop;
SET iceberg.catalog.hadoop.warehouse=hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse;

CREATE EXTERNAL TABLE hadoop_sample2 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse/default/sample'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');

CREATE EXTERNAL TABLE hadoop_sample2 STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ TBLPROPERTIES (‘iceberg.catalog'='hadoop’);

CREATE EXTERNAL TABLE hadoop_sample1 STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ LOCATION ‘hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse/default/sample’ TBLPROPERTIES (‘iceberg.catalog'='location_based_table’);

INSERT INTO hadoop_sample VALUES (1,‘a’);

CREATE EXTERNAL TABLE config_delete_ck_iceberg STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ LOCATION ‘hdfs://v003098.mc3.lwsite.net:8020/user/admin/iceberg/warhouse/catalog_test/default/config_delete_ck_iceberg’ TBLPROPERTIES (‘iceberg.catalog'='location_based_table’);

  1. 建表
CREATE TABLE `config` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `key` varchar(10) NOT NULL,
  `value` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `key` (`key`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

  1. 导入数据
INSERT INTO `test_cdc`.`config` (`id`, `key`, `value`) VALUES ('1', '1', '北京');
INSERT INTO `test_cdc`.`config` (`id`, `key`, `value`) VALUES ('2', '2', '天津');
INSERT INTO `test_cdc`.`config` (`id`, `key`, `value`) VALUES ('3', '3', '河南');
INSERT INTO `test_cdc`.`config` (`id`, `key`, `value`) VALUES ('4', '4', '河北');

QA #

覆盖写TableMeta文件异常 #

触发checkpoint #

org.apache.iceberg.flink.sink.IcebergFilesCommitter#snapshotState

完成checkpoint #

  IcebergFilesCommitter.commitOperation(SnapshotUpdate<?>, int, int, String, String, long)  (org.apache.iceberg.flink.sink)
IcebergFilesCommitter.commitDeltaTxn(NavigableMap<Long, WriteResult>, String, long)(2 usages)  (org.apache.iceberg.flink.sink)

IcebergFilesCommitter.commitUpToCheckpoint(NavigableMap<Long, byte[]>, String, long) (org.apache.iceberg.flink.sink) IcebergFilesCommitter.notifyCheckpointComplete(long) (org.apache.iceberg.flink.sink)