entry : pluginClassLoader.entrySet()) {
- classPaths.addAll(Arrays.asList(entry.getValue().getURLs()));
- }
- return classPaths;
- }
-}
diff --git a/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplier.java b/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplier.java
deleted file mode 100644
index 859aa75f4..000000000
--- a/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.classloader;
-
-/**
- * Represents a supplier of results.
- *
- * There is no requirement that a new or distinct result be returned each
- * time the supplier is invoked.
- *
- *
This is a functional interface
- * whose functional method is {@link #get()}.
- *
- * @param the type of results supplied by this supplier
- *
- * @since 1.8
- */
-@FunctionalInterface
-public interface ClassLoaderSupplier {
-
- /**
- * Gets a result.
- *
- * @return a result
- */
- T get(ClassLoader cl) throws Exception;
-}
diff --git a/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplierCallBack.java b/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplierCallBack.java
deleted file mode 100644
index 51d37ef5e..000000000
--- a/core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderSupplierCallBack.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.classloader;
-
-/**
- * company: www.dtstack.com
- * author: toutian
- * create: 2019/10/14
- */
-public class ClassLoaderSupplierCallBack {
-
- public static R callbackAndReset(ClassLoaderSupplier supplier, ClassLoader toSetClassLoader) throws Exception {
- ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(toSetClassLoader);
- try {
- return supplier.get(toSetClassLoader);
- } finally {
- Thread.currentThread().setContextClassLoader(oldClassLoader);
- }
- }
-
-
-}
diff --git a/core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java b/core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java
deleted file mode 100644
index 54ae66bbc..000000000
--- a/core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package com.dtstack.flink.sql.config;
-
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.parser.SqlParser.Config;
-
-public class CalciteConfig {
-
- public static Config MYSQL_LEX_CONFIG = SqlParser
- .configBuilder()
- .setLex(Lex.MYSQL)
- .build();
-
-
-
-}
diff --git a/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java b/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java
index 76f5996c3..6b6551f9a 100644
--- a/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java
+++ b/core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java
@@ -29,9 +29,7 @@
*/
public class ConfigConstrant {
- public static final String SQL_CHECKPOINT_INTERVAL_KEY = "sql.checkpoint.interval";
- // 兼容上层
- public static final String FLINK_CHECKPOINT_INTERVAL_KEY = "flink.checkpoint.interval";
+ public static final String FLINK_CHECKPOINT_INTERVAL_KEY = "sql.checkpoint.interval";
public static final String FLINK_CHECKPOINT_MODE_KEY = "sql.checkpoint.mode";
@@ -39,11 +37,7 @@ public class ConfigConstrant {
public static final String FLINK_MAXCONCURRENTCHECKPOINTS_KEY = "sql.max.concurrent.checkpoints";
- public static final String SQL_CHECKPOINT_CLEANUPMODE_KEY = "sql.checkpoint.cleanup.mode";
-
- public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "flink.checkpoint.cleanup.mode";
-
-
+ public static final String FLINK_CHECKPOINT_CLEANUPMODE_KEY = "sql.checkpoint.cleanup.mode";
public static final String FLINK_CHECKPOINT_DATAURI_KEY = "flinkCheckpointDataURI";
diff --git a/core/src/main/java/com/dtstack/flink/sql/enums/EPluginLoadMode.java b/core/src/main/java/com/dtstack/flink/sql/enums/EPluginLoadMode.java
deleted file mode 100644
index 6cb027ac3..000000000
--- a/core/src/main/java/com/dtstack/flink/sql/enums/EPluginLoadMode.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.dtstack.flink.sql.enums;
-
-/**
- *
- * CLASSPATH: plugin jar depends on each machine node.
- * SHIPFILE: plugin jar only depends on the client submitted by the task.
- *
- */
-public enum EPluginLoadMode {
-
- CLASSPATH(0),
- SHIPFILE(1);
-
- private int type;
-
- EPluginLoadMode(int type){
- this.type = type;
- }
-
- public int getType(){
- return this.type;
- }
-}
diff --git a/core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java b/core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java
index 07860b608..10e34a5e6 100644
--- a/core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java
@@ -18,7 +18,7 @@
package com.dtstack.flink.sql.option;
-import com.google.common.collect.Lists;
+import avro.shaded.com.google.common.collect.Lists;
import com.dtstack.flink.sql.util.PluginUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
diff --git a/core/src/main/java/com/dtstack/flink/sql/option/Options.java b/core/src/main/java/com/dtstack/flink/sql/option/Options.java
index a653aa42e..eef54a617 100644
--- a/core/src/main/java/com/dtstack/flink/sql/option/Options.java
+++ b/core/src/main/java/com/dtstack/flink/sql/option/Options.java
@@ -19,7 +19,6 @@
package com.dtstack.flink.sql.option;
import com.dtstack.flink.sql.enums.ClusterMode;
-import com.dtstack.flink.sql.enums.EPluginLoadMode;
/**
@@ -72,9 +71,6 @@ public class Options {
@OptionRequired(description = "yarn session configuration,such as yid")
private String yarnSessionConf = "{}";
- @OptionRequired(description = "plugin load mode, by classpath or shipfile")
- private String pluginLoadMode = EPluginLoadMode.CLASSPATH.name();
-
public String getMode() {
return mode;
}
@@ -186,12 +182,4 @@ public String getYarnSessionConf() {
public void setYarnSessionConf(String yarnSessionConf) {
this.yarnSessionConf = yarnSessionConf;
}
-
- public String getPluginLoadMode() {
- return pluginLoadMode;
- }
-
- public void setPluginLoadMode(String pluginLoadMode) {
- this.pluginLoadMode = pluginLoadMode;
- }
}
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
index 670d98a7e..793dd6baa 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateFuncParser.java
@@ -32,7 +32,7 @@
public class CreateFuncParser implements IParser {
- private static final String funcPatternStr = "(?i)\\s*create\\s+(scala|table|aggregate)\\s+function\\s+(\\S+)\\s+WITH\\s+(\\S+)";
+ private static final String funcPatternStr = "(?i)\\s*create\\s+(scala|table)\\s+function\\s+(\\S+)\\s+WITH\\s+(\\S+)";
private static final Pattern funcPattern = Pattern.compile(funcPatternStr);
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
index ae6e1f708..5e126e786 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java
@@ -21,7 +21,7 @@
package com.dtstack.flink.sql.parser;
import com.dtstack.flink.sql.util.DtStringUtil;
-import com.google.common.collect.Maps;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
index de7141eb5..db18986b7 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java
@@ -25,7 +25,8 @@
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
-import com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
index a7c6db9eb..ff2bb9e4b 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
@@ -21,11 +21,17 @@
package com.dtstack.flink.sql.parser;
import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.*;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.commons.lang3.StringUtils;
-import com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import java.util.List;
@@ -113,10 +119,6 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
sqlParseResult.addSourceTable(identifierNode.toString());
}
break;
- case MATCH_RECOGNIZE:
- SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
- sqlParseResult.addSourceTable(node.getTableRef().toString());
- break;
case UNION:
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java
index a76c1b31a..e9fb68cfe 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java
@@ -25,8 +25,8 @@
import com.dtstack.flink.sql.table.TableInfoParser;
import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.commons.lang3.StringUtils;
-import com.google.common.collect.Lists;
-import com.google.common.base.Strings;
+import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
import java.util.List;
import java.util.Set;
diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java b/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
index 1b64b7c68..754de0819 100644
--- a/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
+++ b/core/src/main/java/com/dtstack/flink/sql/parser/SqlTree.java
@@ -22,8 +22,8 @@
import com.dtstack.flink.sql.table.TableInfo;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Lists;
+import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java
index 37b23d046..bc716ddaa 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/FieldReplaceInfo.java
@@ -20,7 +20,7 @@
package com.dtstack.flink.sql.side;
-import com.google.common.collect.HashBasedTable;
+import org.apache.flink.calcite.shaded.com.google.common.collect.HashBasedTable;
/**
* Reason:
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
index 6fde02493..03dbde5a6 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
@@ -22,10 +22,9 @@
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlNode;
-import com.google.common.base.Strings;
+import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import java.io.Serializable;
-import java.util.Map;
/**
* Join信息
@@ -41,8 +40,6 @@ public class JoinInfo implements Serializable {
//左表是否是维表
private boolean leftIsSideTable;
- //左表是 转换后的中间表
- private boolean leftIsMidTable;
//右表是否是维表
private boolean rightIsSideTable;
@@ -66,8 +63,6 @@ public class JoinInfo implements Serializable {
private SqlNode selectNode;
private JoinType joinType;
- // 左边是中间转换表,做表映射关系,给替换属性名称使用
- private Map leftTabMapping;
public String getSideTableName(){
if(leftIsSideTable){
@@ -92,22 +87,6 @@ public String getNewTableName(){
return leftStr + "_" + rightTableName;
}
- public boolean isLeftIsMidTable() {
- return leftIsMidTable;
- }
-
- public void setLeftIsMidTable(boolean leftIsMidTable) {
- this.leftIsMidTable = leftIsMidTable;
- }
-
- public Map getLeftTabMapping() {
- return leftTabMapping;
- }
-
- public void setLeftTabMapping(Map leftTabMapping) {
- this.leftTabMapping = leftTabMapping;
- }
-
public String getNewTableAlias(){
return leftTableAlias + "_" + rightTableAlias;
}
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java b/core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java
index c7a73e0d7..ba07e714a 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/JoinScope.java
@@ -23,8 +23,8 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java b/core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java
index 74d303c24..df242a390 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java
@@ -27,7 +27,7 @@
import org.apache.calcite.sql.SqlSelect;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import com.google.common.collect.Lists;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
@@ -41,12 +41,8 @@
public class ParserJoinField {
-
/**
- * build row by field
- * @param sqlNode select node
- * @param scope join left and right table all info
- * @param getAll true,get all fields from two tables; false, extract useful field from select node
+ * Need to parse the fields of information and where selectlist
* @return
*/
public static List getRowTypeInfo(SqlNode sqlNode, JoinScope scope, boolean getAll){
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java
index df41e1663..97e5e555f 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java
@@ -27,8 +27,8 @@
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.List;
diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
index c881d6344..b0ccc5feb 100644
--- a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
+++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
@@ -20,8 +20,7 @@
package com.dtstack.flink.sql.side;
-import com.dtstack.flink.sql.config.CalciteConfig;
-import com.dtstack.flink.sql.util.ParseUtils;
+import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.SqlBasicCall;
@@ -39,17 +38,9 @@
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Queues;
+
import java.util.Queue;
import java.util.Set;
@@ -63,20 +54,16 @@
*/
public class SideSQLParser {
- private static final Logger LOG = LoggerFactory.getLogger(SideSQLParser.class);
-
- private final char SPLIT = '_';
-
- private String tempSQL = "SELECT * FROM TMP";
public Queue