0%

Impala SQL 转换到 Spark SQL 实践

在 NDH 商业化的过程中,我们发现许多客户会有一些的批处理任务在 Impala 上执行,但是总体的来说,虽然 Impala 可以正确的运行这些大数据量的批处理任务,但是由于架构和设计时考虑的场景限制,Impala 更加适合来运行数据量适中的查询,运行时间更长的查询和批处理任务更适合交给 Spark 来做。但是种种原因,Impala 和 Spark 在 SQL 语法以及支持的函数都不尽相同,这就造成了在将 Impala 的任务迁移到 Spark 时比较困难,业务迁移时需要对两者的语法和函数差异点非常了解,导致了迁移成本较高。在这样的需求场景下,我们就计划实现一个 SQL 翻译器将 Impala 里的 SQL 翻译到 Spark SQL 当中。

最终实现的 SQL 翻译器会先将 SQL 通过 Impala Parser 转换为 Impala 当中 SQL 的 AST 表达式 StatementBase,随后将所有类型的 StatementBase 都转换到 calcite 的 SqlNode 结构,并处理 Impala 和 Spark SQL 在语法和函数层面的不支持处,最终通过 calcite 框架转换到 Spark SQL。

解析 Impala SQL

基于 Calcite Parser 的尝试

Calcite 是 Apache 社区下的一款开源的动态数据管理框架,提供了很多强大的特性,比如 SQL 解析、查询优化、物化视图等功能,但是本文中我们只需要了解 Calcite SQL 解析和 SqlNode 相关的操作就行了。

在最开始的时候,计划直接使用使用 calcite 的 parser 来解析 SQL,但是实际使用下来发现 calcite 的 parser 在很多情况下与 impala 的 parser 解析结果会不一致,尤其是在解析 DDL 和 DML 的场景下,几乎是无法解析的。

以在批处理场景下比较常见的 INSERT OVERWRITE 语句为例

1
2
3
4
5
6
String sql = "INSERT OVERWRITE TABLE t select * from src where b > 5";
SqlParser parser = SqlParser.create(sql, SqlParser.config()
.withParserFactory(SqlDdlParserImpl.FACTORY)
.withConformance(SqlConformanceEnum.BABEL)
);
SqlNode sqlNode = parser.parseStmt();

calcite 的解析器就会抛出以下错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Exception in thread "main" org.apache.calcite.sql.parser.SqlParseException: Encountered "OVERWRITE" at line 1, column 8.
Was expecting:
"INTO" ...

at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.convertException(SqlDdlParserImpl.java:415)
at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.normalizeException(SqlDdlParserImpl.java:161)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:159)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:174)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:199)
at com.netease.Main.main(Main.java:19)
Caused by: org.apache.calcite.sql.parser.ddl.ParseException: Encountered "OVERWRITE" at line 1, column 8.
Was expecting:
"INTO" ...

at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.generateParseException(SqlDdlParserImpl.java:44056)
at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.jj_consume_token(SqlDdlParserImpl.java:43867)
at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlInsert(SqlDdlParserImpl.java:9540)
at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlStmt(SqlDdlParserImpl.java:4115)
at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.SqlStmtEof(SqlDdlParserImpl.java:4143)
at org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl.parseSqlStmtEof(SqlDdlParserImpl.java:209)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:172)
... 2 more

可以看到 calcite 自带的 DDL parser 只支持在 INSERT 后面加 INTO 关键词,不支持 INSERT OVERWRITE 语句。虽然 calcite 也支持使用 JavaCC 作为语法解析器, freemarker 来作为模板,来扩展自定义的 SQL 语法,但是这样要想完美的支持 Impala SQL 的解析,相当于要将 Impala 基于 CUP 和 flex 的解析器实现转换为 calcite 的 JavaCC 和 freemarker 的语法,这就无疑需要比较大的工作量了,而且在实现过程中也难以保证与之前的行为完全一致,需要很大的调试工作量。

使用 Impala parser

所以之后就选择了在语法解析时引入 Impala 原生的 Parser 来进行 SQL 解析。使用 Impala parser 的步骤比较简单,在编译 Impala 后会将 jar 包安装到本地 maven 仓库中,之后在 pom.xml 中引入相应版本即可。

1
2
3
4
5
<dependency>
<groupId>org.apache.impala</groupId>
<artifactId>impala-frontend</artifactId>
<version>${impala.version}</version>
</dependency>

注意由于在 impala 的 pom.xml 中需要很多的环境变量,在编译新项目时也需要设置相应的环境变量。

由于 Impala 有 C++ 和 Java 两部分组成,在解析 SQL 时我们希望只调用 Java 部分就够了,不用依赖 C++ 部分,不然就无法控制 C++ 部分日志输出位置了。因此这里我们需要修改下 impala 部分的代码,去除掉在 SQL 解析时可能存在调用 C++ 部分的代码。

我们首先在 impala 代码里的 RuntimeEnv 当中定义一个变量 isTranslatorEnv_ ,来判断是在当前代码是在 SQL 翻译器环境当中还是 Impala 当中。

1
2
3
4
5
6
7
8
9
10
11
12
public class RuntimeEnv {
//...
private boolean isTestEnv_;
//...
public boolean isTranslatorEnv() {
return isTranslatorEnv_;
}

public void setTranslatorEnv(boolean translatorEnv) {
isTranslatorEnv_ = translatorEnv;
}
}

在原生的 SQL 解析逻辑当中调用 C++ 部分不多,只有两处,首先是 Function 类的 lookupSymbol 函数,如果在翻译器环境中,让他直接返回 null 就可以了。

1
2
3
4
5
6
7
8
public String lookupSymbol(String symbol, TSymbolType symbolType, Type retArgType,
boolean hasVarArgs, Type... argTypes) throws AnalysisException {
if (RuntimeEnv.INSTANCE.isTranslatorEnv()) {
return null;
}
// 原先逻辑
// ...
}

另外一处在解析日期时,在 DateLiteral 构造函数中,加上判断如果在翻译器环境当中,就选择用 java 来重新实现日期解析,不再需要去调用 C++ 部分了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public DateLiteral(String strDate) throws AnalysisException {
type_ = Type.DATE;

// Parse date string.
// Accept different variations. E.g.: '2011-01-01', '2011-1-01', '2011-01-1',
// '2011-1-1'.
if (RuntimeEnv.INSTANCE.isTranslatorEnv()) {
LocalDate epochDay = LocalDate.ofEpochDay(0);
LocalDate localDate = LocalDate.parse(strDate, DateTimeFormatter.ofPattern("yyyy-M-d"));
daysSinceEpoch_ = (int) ChronoUnit.DAYS.between(localDate, epochDay);
strDate_ = localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
} else {
TParseDateStringResult result;
try {
result = FeSupport.parseDateString(strDate);
} catch (InternalException e) {
throw new AnalysisException("Error parsing date literal: " + e.getMessage(), e);
}
if (!result.valid || !result.isSetDays_since_epoch()) {
throw new AnalysisException("Invalid date literal: '" + strDate + "'");
}
daysSinceEpoch_ = result.getDays_since_epoch();
if (result.isSetCanonical_date_string()) {
strDate_ = result.getCanonical_date_string();
} else {
strDate_ = strDate;
}
}
}

改完 Impala 代码重新编译后,我们需要先用如下代码来初始化 SqlScanner 之后就可以在代码里使用 Impala 的 Parser 了。

1
2
3
4
5
6
7
8
9
10
RuntimeEnv.INSTANCE.setTranslatorEnv(true);
try {
Field field = FeSupport.class.getDeclaredField("loaded_");
field.setAccessible(true);
field.set(null, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
BackendConfig.create(new TBackendGflags().setUnlock_zorder_sort(true), true);

使用 Parser.parse 方法,将需要解析的 impala sql 作为参数就可以得到这个 SQL 解析之后的详细信息了,在解析后就可以得到这个查询具体的 select、where、from 等等的参数。

转换到 Calcite SqlNode

Impala 解析完成后的数据结构是 StatementBase 对象,为了最终使用 Calcite 的toSqlString 方法来转换为 Spark SQL,这里还需要转换到 Calcite 里 SqlNode 的结构。StatementBase 和 SqlNode 这两个对象表达的都是 SQL 当中的 AST 结构,只是在两个系统之间的表达方式不同,基本都是可以相互转换的,例如 Impala 的 SelectStmt 类根据不同的参数可以在在 calcite 中可以用 SqlSelect、SqlWith 和 SqlOrderBy 来表示。

自定义 SQL 语法

当然 calcite 自带的 SqlNode 并不能完整的表示 Impala SQL 的所有可能性,例如之前提到的 Insert overwrite 语句,现在的 calcite 是不支持表示这样的 SQL 的,向 calcite 添加自定义的 SqlNode 比较简单,只需要继承 SqlNode,然后重写将 SqlNode 改写为 SQL 的 unparse 方法即可,同样以 Insert overwrite 语句为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SqlInsertOverwriteTable extends SqlDdl {
private static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("INSERT OVERWRITE TABLE", SqlKind.OTHER_DDL);
private final SqlNode target;
private final SqlNode source;
private final SqlNodeList partitionList;
private final boolean ifNotExists;
private final SqlNodeList columnList;
// ...
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("INSERT OVERWRITE TABLE");
getTarget().unparse(writer, leftPrec, rightPrec);
writer.newlineAndIndent();

if (columnList != null && !columnList.isEmpty()) {
SqlWriter.Frame frame = writer.startList("(", ")");
columnList.unparse(writer, leftPrec, rightPrec);
writer.endList(frame);
writer.newlineAndIndent();
}

if (partitionList != null && !partitionList.isEmpty()) {
writer.keyword("PARTITION");
SqlWriter.Frame frame = writer.startList("(", ")");
partitionList.unparse(writer, leftPrec, rightPrec);
writer.endList(frame);
if (ifNotExists) {
writer.keyword("IF NOT EXISTS");
}
writer.newlineAndIndent();
}

getSource().unparse(writer, leftPrec, rightPrec);
}
}

这里的关键逻辑就是如何将 SqlNode 对应结构来重新转换为 SQL。

AST 改写

之后还需要转换不支持语法和函数的转换,当然并不是所有的 SQL 不兼容处都需要在这里处理了。例如 Impala 的字段别名支持使用 ‘’,但是 Spark 的字段别名则必须使用 `` 这个不支持的地方。

1
2
3
4
-- spark 不支持
select a as 'x' from foo
-- spark 支持
select a as `x` from foo

虽然写法不同,但是这两个 SQL 的 AST 是一致的,在 impala sql 转换到 AST 的过程当中就把这两者的差异给处理了,所以无需在这里处理这样的不兼容 SQL 了。

在这里需要转换的 SQL 指的是 AST 层面进行需要转换,举例就是 Spark 3.1 版本不支持在 insert 的来源部分有直接的 with 语句

1
insert into t1 with t as (select 1) select * from t;

需要改写成为将查询部分的直接 with 语句提取到 insert 外层。

1
with t as (select 1) insert into t1 select * from t;

这里我们就需要对 AST 部分进行如下的转换。

同理,不支持函数的转换也是 AST 层面的转换,例如 Spark 不支持中位数函数 appx_median,需要使用 approx_percentile 函数进行改写

1
2
3
select appx_median(distinct a) from t
-- 改写为
select approx_percentile(distinct a, 0.5) from t

我们可以在 impala 的 StatementBase 转换到 calcite 的 SqlNode 结构之前通过对 StatementBase 进行翻译,也可以在之后对 SqlNode 进行翻译,本质上这两个结构是同构的,无论选择在之前还是之后都行。在实际使用时根据需要选择转换时机即可。

将 SqlNode 转换为 Spark SQL

将 Calcite 的 SqlNode 转换为 Spark SQL 可以直接使用 SqlNode 的 toSqlString 方法,该函数会调用 SqlNode 的 unparse 方法来将 SqlNode 转换到 SQL,该方法的参数会影响最终 SQL 的格式,例如是否需要换行,缩进几格等。在测试中发现 calcite 默认的 spark dialect 实现在转换有 unicode 字符的字符串是有问题的,需要重写下对应的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SparkSqlDialect dialect = new SparkSqlDialect(SparkSqlDialect.DEFAULT_CONTEXT.withIdentifierQuoteString("`")) {
@Override
public void quoteStringLiteralUnicode(StringBuilder buf, String val) {
buf.append(literalQuoteString);
buf.append(val.replace(literalEndQuoteString, literalEscapedQuote));
buf.append(literalEndQuoteString);
}
};
return sql.toSqlString(c -> c.withDialect(dialect)
.withAlwaysUseParentheses(false)
.withSelectListItemsOnSeparateLines(false)
.withUpdateSetListNewline(false)
.withIndentation(2)).getSql();

使用 Spark Parser 进行验证

最后在测试代码时引入了 Spark parser 来可以来快速验证转换后的 SQL 是可以被 Spark 正确解析的。

在 pom.xml 当中引入对应版本的 spark ql依赖

1
2
3
4
5
6
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>

然后使用 spark 当中的 sql 解析器来解析转换后的 Sql。

1
2
3
4
SqlNode sqlNode = new ImpalaToSpark31SqlNodeConvertor().parse(impalaSql);
String sparkSql = new SparkNodeToSqlConvertor().toSql(sqlNode);
SparkSqlParser sparkSqlParser = new SparkSqlParser();
sparkSqlParser.parsePlan(sql);

验证的测试用例可以参考 impala 的单元测试,这样就可以初步快速验证我们改写的 SQL 是否正确了,当然要想完全验证改写是否正确需要将 SQL 提交到 Impala 和 spark 当中,验证结果集是否一致。

验证结果时会发现少数转换后的 SQL 仍然无法解析或者运行时出错,排查后发现是由于以下两个 Spark BUG 引起的。

[SPARK-42552] Get ParseException when run sql: "SELECT 1 UNION SELECT 1;" - ASF JIRA

[SPARK-28386] Cannot resolve ORDER BY columns with GROUP BY and HAVING - ASF JIRA