在 NDH 商业化的过程中,我们发现许多客户会有一些的批处理任务在 Impala 上执行,但是总体的来说,虽然 Impala 可以正确的运行这些大数据量的批处理任务,但是由于架构和设计时考虑的场景限制,Impala 更加适合来运行数据量适中的查询,运行时间更长的查询和批处理任务更适合交给 Spark 来做。但是种种原因,Impala 和 Spark 在 SQL 语法以及支持的函数都不尽相同,这就造成了在将 Impala 的任务迁移到 Spark 时比较困难,业务迁移时需要对两者的语法和函数差异点非常了解,导致了迁移成本较高。在这样的需求场景下,我们就计划实现一个 SQL 翻译器将 Impala 里的 SQL 翻译到 Spark SQL 当中。
最终实现的 SQL 翻译器会先将 SQL 通过 Impala Parser 转换为 Impala 当中 SQL 的 AST 表达式 StatementBase,随后将所有类型的 StatementBase 都转换到 calcite 的 SqlNode 结构,并处理 Impala 和 Spark SQL 在语法和函数层面的不支持处,最终通过 calcite 框架转换到 Spark SQL。
解析 Impala SQL 基于 Calcite Parser 的尝试 Calcite 是 Apache 社区下的一款开源的动态数据管理框架,提供了很多强大的特性,比如 SQL 解析、查询优化、物化视图等功能,但是本文中我们只需要了解 Calcite SQL 解析和 SqlNode 相关的操作就行了。
在最开始的时候,计划直接使用使用 calcite 的 parser 来解析 SQL,但是实际使用下来发现 calcite 的 parser 在很多情况下与 impala 的 parser 解析结果会不一致,尤其是在解析 DDL 和 DML 的场景下,几乎是无法解析的。
以在批处理场景下比较常见的 INSERT OVERWRITE 语句为例
1 2 3 4 5 6 String sql = "INSERT OVERWRITE TABLE t select * from src where b > 5" ;SqlParser parser = SqlParser.create(sql, SqlParser.config() .withParserFactory(SqlDdlParserImpl.FACTORY) .withConformance(SqlConformanceEnum.BABEL) ); SqlNode sqlNode = parser.parseStmt();
calcite 的解析器就会抛出以下错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Exception in thread "main" org.apache.calcite.sql.parser.SqlParseException: Encountered "OVERWRITE" at line 1, column 8. Was expecting: "INTO" ... at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.convertException(SqlDdlParserImpl.java:415) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.normalizeException(SqlDdlParserImpl.java:161) at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:159) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:174) at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:199) at com.netease.Main.main(Main.java:19) Caused by: org.apache.calcite.sql.parser.ddl.ParseException: Encountered "OVERWRITE" at line 1, column 8. Was expecting: "INTO" ... at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.generateParseException(SqlDdlParserImpl.java:44056) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.jj_consume_token(SqlDdlParserImpl.java:43867) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlInsert(SqlDdlParserImpl.java:9540) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlStmt(SqlDdlParserImpl.java:4115) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlStmtEof(SqlDdlParserImpl.java:4143) at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.parseSqlStmtEof(SqlDdlParserImpl.java:209) at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:172) ... 2 more
可以看到 calcite 自带的 DDL parser 只支持在 INSERT 后面加 INTO 关键词,不支持 INSERT OVERWRITE 语句。虽然 calcite 也支持使用 JavaCC 作为语法解析器, freemarker 来作为模板,来扩展自定义的 SQL 语法,但是这样要想完美的支持 Impala SQL 的解析,相当于要将 Impala 基于 CUP 和 flex 的解析器实现转换为 calcite 的 JavaCC 和 freemarker 的语法,这就无疑需要比较大的工作量了,而且在实现过程中也难以保证与之前的行为完全一致,需要很大的调试工作量。
使用 Impala parser 所以之后就选择了在语法解析时引入 Impala 原生的 Parser 来进行 SQL 解析。使用 Impala parser 的步骤比较简单,在编译 Impala 后会将 jar 包安装到本地 maven 仓库中,之后在 pom.xml 中引入相应版本即可。
1 2 3 4 5 <dependency > <groupId > org.apache.impala</groupId > <artifactId > impala-frontend</artifactId > <version > ${impala.version}</version > </dependency >
注意由于在 impala 的 pom.xml 中需要很多的环境变量,在编译新项目时也需要设置相应的环境变量。
由于 Impala 有 C++ 和 Java 两部分组成,在解析 SQL 时我们希望只调用 Java 部分就够了,不用依赖 C++ 部分,不然就无法控制 C++ 部分日志输出位置了。因此这里我们需要修改下 impala 部分的代码,去除掉在 SQL 解析时可能存在调用 C++ 部分的代码。
我们首先在 impala 代码里的 RuntimeEnv 当中定义一个变量 isTranslatorEnv_ ,来判断是在当前代码是在 SQL 翻译器环境当中还是 Impala 当中。
1 2 3 4 5 6 7 8 9 10 11 12 public class RuntimeEnv { private boolean isTestEnv_; public boolean isTranslatorEnv () { return isTranslatorEnv_; } public void setTranslatorEnv (boolean translatorEnv) { isTranslatorEnv_ = translatorEnv; } }
在原生的 SQL 解析逻辑当中调用 C++ 部分不多,只有两处,首先是 Function 类的 lookupSymbol 函数,如果在翻译器环境中,让他直接返回 null 就可以了。
1 2 3 4 5 6 7 8 public String lookupSymbol (String symbol, TSymbolType symbolType, Type retArgType, boolean hasVarArgs, Type... argTypes) throws AnalysisException { if (RuntimeEnv.INSTANCE.isTranslatorEnv()) { return null ; } }
另外一处在解析日期时,在 DateLiteral 构造函数中,加上判断如果在翻译器环境当中,就选择用 java 来重新实现日期解析,不再需要去调用 C++ 部分了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public DateLiteral (String strDate) throws AnalysisException { type_ = Type.DATE; if (RuntimeEnv.INSTANCE.isTranslatorEnv()) { LocalDate epochDay = LocalDate.ofEpochDay(0 ); LocalDate localDate = LocalDate.parse(strDate, DateTimeFormatter.ofPattern("yyyy-M-d" )); daysSinceEpoch_ = (int ) ChronoUnit.DAYS.between(localDate, epochDay); strDate_ = localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd" )); } else { TParseDateStringResult result; try { result = FeSupport.parseDateString(strDate); } catch (InternalException e) { throw new AnalysisException ("Error parsing date literal: " + e.getMessage(), e); } if (!result.valid || !result.isSetDays_since_epoch()) { throw new AnalysisException ("Invalid date literal: '" + strDate + "'" ); } daysSinceEpoch_ = result.getDays_since_epoch(); if (result.isSetCanonical_date_string()) { strDate_ = result.getCanonical_date_string(); } else { strDate_ = strDate; } } }
改完 Impala 代码重新编译后,我们需要先用如下代码来初始化 SqlScanner 之后就可以在代码里使用 Impala 的 Parser 了。
1 2 3 4 5 6 7 8 9 10 RuntimeEnv.INSTANCE.setTranslatorEnv(true ); try { Field field = FeSupport.class.getDeclaredField("loaded_" ); field.setAccessible(true ); field.set(null , true ); } catch (Exception e) { throw new RuntimeException (e); } BackendConfig.create(new TBackendGflags ().setUnlock_zorder_sort(true ), true );
使用 Parser.parse 方法,将需要解析的 impala sql 作为参数就可以得到这个 SQL 解析之后的详细信息了,在解析后就可以得到这个查询具体的 select、where、from 等等的参数。
转换到 Calcite SqlNode Impala 解析完成后的数据结构是 StatementBase 对象,为了最终使用 Calcite 的toSqlString 方法来转换为 Spark SQL,这里还需要转换到 Calcite 里 SqlNode 的结构。StatementBase 和 SqlNode 这两个对象表达的都是 SQL 当中的 AST 结构,只是在两个系统之间的表达方式不同,基本都是可以相互转换的,例如 Impala 的 SelectStmt 类根据不同的参数可以在在 calcite 中可以用 SqlSelect、SqlWith 和 SqlOrderBy 来表示。
自定义 SQL 语法 当然 calcite 自带的 SqlNode 并不能完整的表示 Impala SQL 的所有可能性,例如之前提到的 Insert overwrite 语句,现在的 calcite 是不支持表示这样的 SQL 的,向 calcite 添加自定义的 SqlNode 比较简单,只需要继承 SqlNode,然后重写将 SqlNode 改写为 SQL 的 unparse 方法即可,同样以 Insert overwrite 语句为例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class SqlInsertOverwriteTable extends SqlDdl { private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator ("INSERT OVERWRITE TABLE" , SqlKind.OTHER_DDL); private final SqlNode target; private final SqlNode source; private final SqlNodeList partitionList; private final boolean ifNotExists; private final SqlNodeList columnList; @Override public void unparse (SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("INSERT OVERWRITE TABLE" ); getTarget().unparse(writer, leftPrec, rightPrec); writer.newlineAndIndent(); if (columnList != null && !columnList.isEmpty()) { SqlWriter.Frame frame = writer.startList("(" , ")" ); columnList.unparse(writer, leftPrec, rightPrec); writer.endList(frame); writer.newlineAndIndent(); } if (partitionList != null && !partitionList.isEmpty()) { writer.keyword("PARTITION" ); SqlWriter.Frame frame = writer.startList("(" , ")" ); partitionList.unparse(writer, leftPrec, rightPrec); writer.endList(frame); if (ifNotExists) { writer.keyword("IF NOT EXISTS" ); } writer.newlineAndIndent(); } getSource().unparse(writer, leftPrec, rightPrec); } }
这里的关键逻辑就是如何将 SqlNode 对应结构来重新转换为 SQL。
AST 改写 之后还需要转换不支持语法和函数的转换,当然并不是所有的 SQL 不兼容处都需要在这里处理了。例如 Impala 的字段别名支持使用 ‘’,但是 Spark 的字段别名则必须使用 `` 这个不支持的地方。
1 2 3 4 select a as 'x' from fooselect a as `x` from foo
虽然写法不同,但是这两个 SQL 的 AST 是一致的,在 impala sql 转换到 AST 的过程当中就把这两者的差异给处理了,所以无需在这里处理这样的不兼容 SQL 了。
在这里需要转换的 SQL 指的是 AST 层面进行需要转换,举例就是 Spark 3.1 版本不支持在 insert 的来源部分有直接的 with 语句
1 insert into t1 with t as (select 1 ) select * from t;
需要改写成为将查询部分的直接 with 语句提取到 insert 外层。
1 with t as (select 1 ) insert into t1 select * from t;
这里我们就需要对 AST 部分进行如下的转换。
同理,不支持函数的转换也是 AST 层面的转换,例如 Spark 不支持中位数函数 appx_median,需要使用 approx_percentile 函数进行改写
1 2 3 select appx_median(distinct a) from tselect approx_percentile(distinct a, 0.5 ) from t
我们可以在 impala 的 StatementBase 转换到 calcite 的 SqlNode 结构之前通过对 StatementBase 进行翻译,也可以在之后对 SqlNode 进行翻译,本质上这两个结构是同构的,无论选择在之前还是之后都行。在实际使用时根据需要选择转换时机即可。
将 SqlNode 转换为 Spark SQL 将 Calcite 的 SqlNode 转换为 Spark SQL 可以直接使用 SqlNode 的 toSqlString 方法,该函数会调用 SqlNode 的 unparse 方法来将 SqlNode 转换到 SQL,该方法的参数会影响最终 SQL 的格式,例如是否需要换行,缩进几格等。在测试中发现 calcite 默认的 spark dialect 实现在转换有 unicode 字符的字符串是有问题的,需要重写下对应的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 SparkSqlDialect dialect = new SparkSqlDialect(SparkSqlDialect.DEFAULT_CONTEXT.withIdentifierQuoteString("`")) { @Override public void quoteStringLiteralUnicode(StringBuilder buf, String val) { buf.append(literalQuoteString); buf.append(val.replace(literalEndQuoteString, literalEscapedQuote)); buf.append(literalEndQuoteString); } }; return sql.toSqlString(c - > c.withDialect(dialect) .withAlwaysUseParentheses(false ) .withSelectListItemsOnSeparateLines(false ) .withUpdateSetListNewline(false ) .withIndentation(2 )).getSql();
使用 Spark Parser 进行验证 最后在测试代码时引入了 Spark parser 来可以来快速验证转换后的 SQL 是可以被 Spark 正确解析的。
在 pom.xml 当中引入对应版本的 spark ql依赖
1 2 3 4 5 6 <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.12</artifactId > <version > ${spark.version}</version > <scope > test</scope > </dependency >
然后使用 spark 当中的 sql 解析器来解析转换后的 Sql。
1 2 3 4 SqlNode sqlNode = new ImpalaToSpark31SqlNodeConvertor ().parse(impalaSql);String sparkSql = new SparkNodeToSqlConvertor ().toSql(sqlNode);SparkSqlParser sparkSqlParser = new SparkSqlParser ();sparkSqlParser.parsePlan(sql);
验证的测试用例可以参考 impala 的单元测试 ,这样就可以初步快速验证我们改写的 SQL 是否正确了,当然要想完全验证改写是否正确需要将 SQL 提交到 Impala 和 spark 当中,验证结果集是否一致。
验证结果时会发现少数转换后的 SQL 仍然无法解析或者运行时出错,排查后发现是由于以下两个 Spark BUG 引起的。
[SPARK-42552] Get ParseException when run sql: "SELECT 1 UNION SELECT 1;" - ASF JIRA
[SPARK-28386] Cannot resolve ORDER BY columns with GROUP BY and HAVING - ASF JIRA