在 NDH 商业化的过程中,我们发现许多客户会有一些的批处理任务在 Impala
上执行,但是总体的来说,虽然 Impala
可以正确的运行这些大数据量的批处理任务,但是由于架构和设计时考虑的场景限制,Impala
更加适合来运行数据量适中的查询,运行时间更长的查询和批处理任务更适合交给
Spark 来做。但是种种原因,Impala 和 Spark 在 SQL
语法以及支持的函数都不尽相同,这就造成了在将 Impala 的任务迁移到 Spark
时比较困难,业务迁移时需要对两者的语法和函数差异点非常了解,导致了迁移成本较高。在这样的需求场景下,我们就计划实现一个
SQL 翻译器将 Impala 里的 SQL 翻译到 Spark SQL 当中。
最终实现的 SQL 翻译器会先将 SQL 通过 Impala Parser 转换为 Impala 当中
SQL 的 AST 表达式 StatementBase,随后将所有类型的 StatementBase 都转换到
calcite 的 SqlNode 结构,并处理 Impala 和 Spark SQL
在语法和函数层面的不支持处,最终通过 calcite 框架转换到 Spark SQL。
解析 Impala SQL
基于 Calcite Parser 的尝试
Calcite 是 Apache
社区下的一款开源的动态数据管理框架,提供了很多强大的特性,比如 SQL
解析、查询优化、物化视图等功能,但是本文中我们只需要了解 Calcite SQL
解析和 SqlNode 相关的操作就行了。
在最开始的时候,计划直接使用使用 calcite 的 parser 来解析
SQL,但是实际使用下来发现 calcite 的 parser 在很多情况下与 impala 的
parser 解析结果会不一致,尤其是在解析 DDL 和 DML
的场景下,几乎是无法解析的。
以在批处理场景下比较常见的 INSERT OVERWRITE 语句为例
1 2 3 4 5 6 String sql = "INSERT OVERWRITE TABLE t select * from src where b > 5" ;SqlParser parser = SqlParser.create(sql, SqlParser.config() .withParserFactory(SqlDdlParserImpl.FACTORY) .withConformance(SqlConformanceEnum.BABEL) ); SqlNode sqlNode = parser.parseStmt();
calcite 的解析器就会抛出以下错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Exception in thread "main" org.apache.calcite.sql.parser.SqlParseException: Encountered "OVERWRITE" at line 1, column 8. Was expecting: "INTO" ... at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.convertException(SqlDdlParserImpl.java:415) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.normalizeException(SqlDdlParserImpl.java:161) at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:159) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:174) at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:199) at com.netease.Main.main(Main.java:19) Caused by: org.apache.calcite.sql.parser.ddl.ParseException: Encountered "OVERWRITE" at line 1, column 8. Was expecting: "INTO" ... at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.generateParseException(SqlDdlParserImpl.java:44056) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.jj_consume_token(SqlDdlParserImpl.java:43867) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlInsert(SqlDdlParserImpl.java:9540) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlStmt(SqlDdlParserImpl.java:4115) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlStmtEof(SqlDdlParserImpl.java:4143) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.parseSqlStmtEof(SqlDdlParserImpl.java:209) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:172) ... 2 more
可以看到 calcite 自带的 DDL parser 只支持在 INSERT 后面加 INTO
关键词,不支持 INSERT OVERWRITE 语句。虽然 calcite 也支持使用 JavaCC
作为语法解析器, freemarker 来作为模板,来扩展自定义的 SQL
语法,但是这样要想完美的支持 Impala SQL 的解析,相当于要将 Impala 基于
CUP 和 flex 的解析器实现转换为 calcite 的 JavaCC 和 freemarker
的语法,这就无疑需要比较大的工作量了,而且在实现过程中也难以保证与之前的行为完全一致,需要很大的调试工作量。
使用 Impala parser
所以之后就选择了在语法解析时引入 Impala 原生的 Parser 来进行 SQL
解析。使用 Impala parser 的步骤比较简单,在编译 Impala 后会将 jar
包安装到本地 maven 仓库中,之后在 pom.xml 中引入相应版本即可。
1 2 3 4 5 <dependency > <groupId > org.apache.impala</groupId > <artifactId > impala-frontend</artifactId > <version > ${impala.version}</version > </dependency >
注意由于在 impala 的 pom.xml
中需要很多的环境变量,在编译新项目时也需要设置相应的环境变量。
由于 Impala 有 C++ 和 Java 两部分组成,在解析 SQL 时我们希望只调用
Java 部分就够了,不用依赖 C++ 部分,不然就无法控制 C++
部分日志输出位置了。因此这里我们需要修改下 impala 部分的代码,去除掉在
SQL 解析时可能存在调用 C++ 部分的代码。
我们首先在 impala 代码里的 RuntimeEnv 当中定义一个变量
isTranslatorEnv_ ,来判断是在当前代码是在 SQL 翻译器环境当中还是 Impala
当中。
1 2 3 4 5 6 7 8 9 10 11 12 public class RuntimeEnv { private boolean isTestEnv_; public boolean isTranslatorEnv () { return isTranslatorEnv_; } public void setTranslatorEnv (boolean translatorEnv) { isTranslatorEnv_ = translatorEnv; } }
在原生的 SQL 解析逻辑当中调用 C++ 部分不多,只有两处,首先是 Function
类的 lookupSymbol 函数,如果在翻译器环境中,让他直接返回 null
就可以了。
1 2 3 4 5 6 7 8 public String lookupSymbol (String symbol, TSymbolType symbolType, Type retArgType, boolean hasVarArgs, Type... argTypes) throws AnalysisException { if (RuntimeEnv.INSTANCE.isTranslatorEnv()) { return null ; } }
另外一处在解析日期时,在 DateLiteral
构造函数中,加上判断如果在翻译器环境当中,就选择用 java
来重新实现日期解析,不再需要去调用 C++ 部分了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public DateLiteral (String strDate) throws AnalysisException { type_ = Type.DATE; if (RuntimeEnv.INSTANCE.isTranslatorEnv()) { LocalDate epochDay = LocalDate.ofEpochDay(0 ); LocalDate localDate = LocalDate.parse(strDate, DateTimeFormatter.ofPattern("yyyy-M-d" )); daysSinceEpoch_ = (int ) ChronoUnit.DAYS.between(localDate, epochDay); strDate_ = localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd" )); } else { TParseDateStringResult result; try { result = FeSupport.parseDateString(strDate); } catch (InternalException e) { throw new AnalysisException ("Error parsing date literal: " + e.getMessage(), e); } if (!result.valid || !result.isSetDays_since_epoch()) { throw new AnalysisException ("Invalid date literal: '" + strDate + "'" ); } daysSinceEpoch_ = result.getDays_since_epoch(); if (result.isSetCanonical_date_string()) { strDate_ = result.getCanonical_date_string(); } else { strDate_ = strDate; } } }
改完 Impala 代码重新编译后,我们需要先用如下代码来初始化 SqlScanner
之后就可以在代码里使用 Impala 的 Parser 了。
1 2 3 4 5 6 7 8 9 10 RuntimeEnv.INSTANCE.setTranslatorEnv(true ); try { Field field = FeSupport.class.getDeclaredField("loaded_" ); field.setAccessible(true ); field.set(null , true ); } catch (Exception e) { throw new RuntimeException (e); } BackendConfig.create(new TBackendGflags ().setUnlock_zorder_sort(true ), true );
使用 Parser.parse 方法,将需要解析的 impala sql
作为参数就可以得到这个 SQL
解析之后的详细信息了,在解析后就可以得到这个查询具体的
select、where、from 等等的参数。
转换到 Calcite SqlNode
Impala 解析完成后的数据结构是 StatementBase 对象,为了最终使用
Calcite 的toSqlString 方法来转换为 Spark SQL,这里还需要转换到 Calcite
里 SqlNode 的结构。StatementBase 和 SqlNode 这两个对象表达的都是 SQL
当中的 AST
结构,只是在两个系统之间的表达方式不同,基本都是可以相互转换的,例如
Impala 的 SelectStmt 类根据不同的参数可以在在 calcite 中可以用
SqlSelect、SqlWith 和 SqlOrderBy 来表示。
自定义 SQL 语法
当然 calcite 自带的 SqlNode 并不能完整的表示 Impala SQL
的所有可能性,例如之前提到的 Insert overwrite 语句,现在的 calcite
是不支持表示这样的 SQL 的,向 calcite 添加自定义的 SqlNode
比较简单,只需要继承 SqlNode,然后重写将 SqlNode 改写为 SQL 的 unparse
方法即可,同样以 Insert overwrite 语句为例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class SqlInsertOverwriteTable extends SqlDdl { private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator ("INSERT OVERWRITE TABLE" , SqlKind.OTHER_DDL); private final SqlNode target; private final SqlNode source; private final SqlNodeList partitionList; private final boolean ifNotExists; private final SqlNodeList columnList; @Override public void unparse (SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("INSERT OVERWRITE TABLE" ); getTarget().unparse(writer, leftPrec, rightPrec); writer.newlineAndIndent(); if (columnList != null && !columnList.isEmpty()) { SqlWriter.Frame frame = writer.startList("(" , ")" ); columnList.unparse(writer, leftPrec, rightPrec); writer.endList(frame); writer.newlineAndIndent(); } if (partitionList != null && !partitionList.isEmpty()) { writer.keyword("PARTITION" ); SqlWriter.Frame frame = writer.startList("(" , ")" ); partitionList.unparse(writer, leftPrec, rightPrec); writer.endList(frame); if (ifNotExists) { writer.keyword("IF NOT EXISTS" ); } writer.newlineAndIndent(); } getSource().unparse(writer, leftPrec, rightPrec); } }
这里的关键逻辑就是如何将 SqlNode 对应结构来重新转换为 SQL。
AST 改写
之后还需要转换不支持语法和函数的转换,当然并不是所有的 SQL
不兼容处都需要在这里处理了。例如 Impala 的字段别名支持使用 '',但是
Spark 的字段别名则必须使用 `` 这个不支持的地方。
1 2 3 4 select a as 'x' from fooselect a as `x` from foo
虽然写法不同,但是这两个 SQL 的 AST 是一致的,在 impala sql 转换到
AST 的过程当中就把这两者的差异给处理了,所以无需在这里处理这样的不兼容
SQL 了。
在这里需要转换的 SQL 指的是 AST 层面进行需要转换,举例就是 Spark 3.1
版本不支持在 insert 的来源部分有直接的 with 语句
1 insert into t1 with t as (select 1 ) select * from t;
需要改写成为将查询部分的直接 with 语句提取到 insert 外层。
1 with t as (select 1 ) insert into t1 select * from t;
这里我们就需要对 AST 部分进行如下的转换。
同理,不支持函数的转换也是 AST 层面的转换,例如 Spark
不支持中位数函数 appx_median,需要使用 approx_percentile
函数进行改写
1 2 3 select appx_median(distinct a) from tselect approx_percentile(distinct a, 0.5 ) from t
我们可以在 impala 的 StatementBase 转换到 calcite 的 SqlNode
结构之前通过对 StatementBase 进行翻译,也可以在之后对 SqlNode
进行翻译,本质上这两个结构是同构的,无论选择在之前还是之后都行。在实际使用时根据需要选择转换时机即可。
将 SqlNode 转换为 Spark SQL
将 Calcite 的 SqlNode 转换为 Spark SQL 可以直接使用 SqlNode 的
toSqlString 方法,该函数会调用 SqlNode 的 unparse 方法来将 SqlNode
转换到 SQL,该方法的参数会影响最终 SQL
的格式,例如是否需要换行,缩进几格等。在测试中发现 calcite 默认的 spark
dialect 实现在转换有 unicode
字符的字符串是有问题的,需要重写下对应的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 SparkSqlDialect dialect = new SparkSqlDialect(SparkSqlDialect.DEFAULT_CONTEXT.withIdentifierQuoteString("`")) { @Override public void quoteStringLiteralUnicode(StringBuilder buf, String val) { buf.append(literalQuoteString); buf.append(val.replace(literalEndQuoteString, literalEscapedQuote)); buf.append(literalEndQuoteString); } }; return sql.toSqlString(c - > c.withDialect(dialect) .withAlwaysUseParentheses(false ) .withSelectListItemsOnSeparateLines(false ) .withUpdateSetListNewline(false ) .withIndentation(2 )).getSql();
使用 Spark Parser 进行验证
最后在测试代码时引入了 Spark parser 来可以来快速验证转换后的 SQL
是可以被 Spark 正确解析的。
在 pom.xml 当中引入对应版本的 spark ql依赖
1 2 3 4 5 6 <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.12</artifactId > <version > ${spark.version}</version > <scope > test</scope > </dependency >
然后使用 spark 当中的 sql 解析器来解析转换后的 Sql。
1 2 3 4 SqlNode sqlNode = new ImpalaToSpark31SqlNodeConvertor ().parse(impalaSql);String sparkSql = new SparkNodeToSqlConvertor ().toSql(sqlNode);SparkSqlParser sparkSqlParser = new SparkSqlParser ();sparkSqlParser.parsePlan(sql);
验证的测试用例可以参考
impala 的单元测试 ,这样就可以初步快速验证我们改写的 SQL
是否正确了,当然要想完全验证改写是否正确需要将 SQL 提交到 Impala 和
spark 当中,验证结果集是否一致。
验证结果时会发现少数转换后的 SQL
仍然无法解析或者运行时出错,排查后发现是由于以下两个 Spark BUG
引起的。
[SPARK-42552]
Get ParseException when run sql: "SELECT 1 UNION SELECT 1;" - ASF
JIRA
[SPARK-28386]
Cannot resolve ORDER BY columns with GROUP BY and HAVING - ASF
JIRA