Blink #
StreamTableEnvironment 详解 #
sqlQuery #
Table result =
tEnv.sqlQuery(
"SELECT * FROM "
+ tableA
+ " WHERE amount > 2 UNION ALL "
+ "SELECT * FROM OrderB WHERE amount < 2");
Sql -> SqlNode #
SqlNode -> RelNode -> Operation #
QueryOperation #
QueryOperationVisitor #
QueryOperation的访问者
初始化 #
SqlToOperationConverter #
Table转换DataStream #
tEnv.toAppendStream(result, Order.class).print();
Flink SQL优化(Operation -> RelNode->Transformation ) #
ModifyOperation 转 RelNode #
将ModifyOperation的关系树转换为 Calcite 关系表达式。
doOptimize #
Table result =
tEnv.sqlQuery(
"SELECT * FROM "
+ tableA
+ " WHERE amount > 2 UNION ALL "
+ "SELECT * FROM OrderB WHERE amount < 2");
调用链路 #
Flink 优化 #
FlinkOptimizeProgram 详解
FlinkChainedProgram #
subquery_rewrite #
temporal_join_rewrite #
decorrelate #
time_indicator #
FlinkRelTimeIndicatorProgram
default_rewrite #
predicate_pushdown(谓词下推) #
- 如何判断谓词下推
logical #
logical_rewrite #
physical #
physical_rewrite #
Rules 优化规则 #
FlinkPhysicalRel DAG 转换 ExecNode DAG #
调用链
类结构
ExecNode 转换 Transformation #
调用链
LogicalLegacySink(name=[DataStreamTableSink], fields=[user, product, amount])
LogicalUnion(all=[true])
LogicalProject(user=[$0], product=[$1], amount=[$2])
LogicalFilter(condition=[>($2, 2)])
LogicalTableScan(table=[[default_catalog, default_database, UnnamedTable$0]])
LogicalProject(user=[$0], product=[$1], amount=[$2])
LogicalFilter(condition=[<($2, 2)])
LogicalTableScan(table=[[default_catalog, default_database, OrderB]])
StreamExecLegacySink(name=[DataStreamTableSink], fields=[user, product, amount])
StreamExecUnion(all=[true], union=[user, product, amount])
StreamExecCalc(select=[user, product, amount], where=[>(amount, 2)])
StreamExecDataStreamScan(table=[[default_catalog, default_database, UnnamedTable$0]], fields=[user, product, amount])
StreamExecCalc(select=[user, product, amount], where=[<(amount, 2)])
StreamExecDataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount])