🔴 8.1 Blink

Blink #

StreamTableEnvironment 详解 #

sqlQuery #

Table result =
  tEnv.sqlQuery(
          "SELECT * FROM "
                  + tableA
                  + " WHERE amount > 2 UNION ALL "
                  + "SELECT * FROM OrderB WHERE amount < 2");

Sql -> SqlNode #

SqlNode -> RelNode -> Operation #

QueryOperation #

QueryOperationVisitor #

QueryOperation的访问者

初始化 #

SqlToOperationConverter #

Table转换DataStream #

tEnv.toAppendStream(result, Order.class).print();

ModifyOperation 转 RelNode #

将ModifyOperation的关系树转换为 Calcite 关系表达式。

doOptimize #


Table result =
    tEnv.sqlQuery(
      "SELECT * FROM "
              + tableA
              + " WHERE amount > 2 UNION ALL "
              + "SELECT * FROM OrderB WHERE amount < 2");
调用链路 #

Flink 优化 #

FlinkOptimizeProgram 详解

FlinkChainedProgram #

subquery_rewrite #

temporal_join_rewrite #

decorrelate #

time_indicator #

FlinkRelTimeIndicatorProgram

default_rewrite #

predicate_pushdown(谓词下推) #

  • 如何判断谓词下推

logical #

logical_rewrite #

physical #

physical_rewrite #

Rules 优化规则 #

FlinkPhysicalRel DAG 转换 ExecNode DAG #

调用链
类结构

ExecNode 转换 Transformation #

调用链
LogicalLegacySink(name=[DataStreamTableSink], fields=[user, product, amount])
  LogicalUnion(all=[true])
    LogicalProject(user=[$0], product=[$1], amount=[$2])
      LogicalFilter(condition=[>($2, 2)])
        LogicalTableScan(table=[[default_catalog, default_database, UnnamedTable$0]])
    LogicalProject(user=[$0], product=[$1], amount=[$2])
      LogicalFilter(condition=[<($2, 2)])
        LogicalTableScan(table=[[default_catalog, default_database, OrderB]])

StreamExecLegacySink(name=[DataStreamTableSink], fields=[user, product, amount])
  StreamExecUnion(all=[true], union=[user, product, amount])
    StreamExecCalc(select=[user, product, amount], where=[>(amount, 2)])
      StreamExecDataStreamScan(table=[[default_catalog, default_database, UnnamedTable$0]], fields=[user, product, amount])
    StreamExecCalc(select=[user, product, amount], where=[<(amount, 2)])
      StreamExecDataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount])