Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.Hint;
import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;
import org.apache.iotdb.db.utils.cte.CteDataStore;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;

Expand Down Expand Up @@ -125,6 +126,8 @@ public enum ExplainType {

private boolean userQuery = false;

private Map<String, Hint> hintMap = new HashMap<>();

private Map<NodeRef<Table>, Query> cteQueries = new HashMap<>();

// Stores the EXPLAIN/EXPLAIN ANALYZE results for Common Table Expressions (CTEs)
Expand All @@ -143,7 +146,7 @@ public enum ExplainType {
private boolean innerTriggeredQuery = false;

// Tables in the subquery
private final Map<NodeRef<Query>, List<Identifier>> subQueryTables = new HashMap<>();
private final Map<NodeRef<Query>, Set<Identifier>> subQueryTables = new HashMap<>();

public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
Expand Down Expand Up @@ -539,12 +542,12 @@ public void setCteQueries(Map<NodeRef<Table>, Query> cteQueries) {
this.cteQueries = cteQueries;
}

public void addSubQueryTables(Query query, List<Identifier> tables) {
public void addSubQueryTables(Query query, Set<Identifier> tables) {
subQueryTables.put(NodeRef.of(query), tables);
}

public List<Identifier> getTables(Query query) {
return subQueryTables.getOrDefault(NodeRef.of(query), ImmutableList.of());
public Set<Identifier> getTables(Query query) {
return subQueryTables.getOrDefault(NodeRef.of(query), ImmutableSet.of());
}

public void addCteExplainResult(Table table, Pair<Integer, List<String>> cteExplainResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public StatementMemorySource visitExplain(

// Generate table model distributed plan
final TableDistributedPlanGenerator.PlanContext planContext =
new TableDistributedPlanGenerator.PlanContext();
new TableDistributedPlanGenerator.PlanContext(context.getAnalysis().getHintMap());
final PlanNode outputNodeWithExchange =
new TableDistributedPlanner(
context.getAnalysis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,9 @@ public List<String> visitTableScan(TableScanNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(node.toString());
boxValue.add(String.format("QualifiedTableName: %s", node.getQualifiedObjectName().toString()));
if (node.getAlias() != null) {
boxValue.add(String.format("Alias: %s", node.getAlias().getValue()));
}
boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));

if (deviceTableScanNode != null) {
Expand Down Expand Up @@ -727,6 +730,9 @@ public List<String> visitAggregationTableScan(
List<String> boxValue = new ArrayList<>();
boxValue.add(node.toString());
boxValue.add(String.format("QualifiedTableName: %s", node.getQualifiedObjectName().toString()));
if (node.getAlias() != null) {
boxValue.add(String.format("Alias: %s", node.getAlias().getValue()));
}
boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
int i = 0;
for (org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;

import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
Expand All @@ -32,8 +33,10 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -78,6 +81,15 @@ public void markAsGeneratedByPipe() {

public abstract void addChild(PlanNode child);

public Set<Identifier> getInputTables() {
Set<Identifier> tables = new HashSet<>();
for (PlanNode child : getChildren()) {
tables.addAll(child.getInputTables());
}
;
return tables;
}

/**
* If this plan node has to be serialized or deserialized, override this method. If this method is
* overridden, the serialization and deserialization methods must be implemented.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.Hint;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;

import com.google.common.collect.ArrayListMultimap;
Expand Down Expand Up @@ -217,7 +218,7 @@ public class Analysis implements IAnalysis {

private final Map<NodeRef<Relation>, QualifiedName> relationNames = new LinkedHashMap<>();

private final Set<NodeRef<Relation>> aliasedRelations = new LinkedHashSet<>();
private final Map<NodeRef<Relation>, Identifier> aliasedRelations = new LinkedHashMap<>();

private final Map<NodeRef<TableFunctionInvocation>, TableFunctionInvocationAnalysis>
tableFunctionAnalyses = new LinkedHashMap<>();
Expand Down Expand Up @@ -258,6 +259,9 @@ public class Analysis implements IAnalysis {

private boolean isQuery = false;

// Hint map
private Map<String, Hint> hintMap = new HashMap<>();

// SqlParser is needed during query planning phase for executing uncorrelated scalar subqueries
// in advance (predicate folding). The planner needs to parse and execute these subqueries
// independently to utilize predicate pushdown optimization.
Expand All @@ -268,6 +272,14 @@ public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> pa
this.parameters = ImmutableMap.copyOf(requireNonNull(parameters, "parameters is null"));
}

public void setHintMap(Map<String, Hint> hintMap) {
this.hintMap = hintMap;
}

public Map<String, Hint> getHintMap() {
return hintMap;
}

public Map<NodeRef<Parameter>, Expression> getParameters() {
return parameters;
}
Expand Down Expand Up @@ -850,12 +862,20 @@ public QualifiedName getRelationName(final Relation relation) {
return relationNames.get(NodeRef.of(relation));
}

public void addAliased(final Relation relation) {
aliasedRelations.add(NodeRef.of(relation));
public Map<NodeRef<Relation>, QualifiedName> getRelationNames() {
return relationNames;
}

public void addAliased(final Relation relation, Identifier alias) {
aliasedRelations.put(NodeRef.of(relation), alias);
}

public Identifier getAliased(Relation relation) {
return aliasedRelations.get(NodeRef.of(relation));
}

public boolean isAliased(Relation relation) {
return aliasedRelations.contains(NodeRef.of(relation));
return aliasedRelations.containsKey(NodeRef.of(relation));
}

public void addTableSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@

import com.google.common.collect.ImmutableMap;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -57,7 +58,7 @@ public class Scope {
// Tables to access for the current relation. For CTE materialization and constant folding
// subqueries, non-materialized CTEs in tables must be identified, and their definitions
// attached to the subquery context.
private List<Identifier> tables;
private Set<Identifier> tables;

public static Scope create() {
return builder().build();
Expand All @@ -73,24 +74,24 @@ private Scope(
RelationId relationId,
RelationType relation,
Map<String, WithQuery> namedQueries,
List<Identifier> tables) {
Set<Identifier> tables) {
this.parent = requireNonNull(parent, "parent is null");
this.relationId = requireNonNull(relationId, "relationId is null");
this.queryBoundary = queryBoundary;
this.relation = requireNonNull(relation, "relation is null");
this.namedQueries = ImmutableMap.copyOf(requireNonNull(namedQueries, "namedQueries is null"));
this.tables = new ArrayList<>(requireNonNull(tables, "tables is null"));
this.tables = new HashSet<>(requireNonNull(tables, "tables is null"));
}

public void addTable(Table table) {
tables.add(new Identifier(table.getName().getSuffix()));
}

public void setTables(List<Identifier> tables) {
public void setTables(Set<Identifier> tables) {
this.tables = tables;
}

public List<Identifier> getTables() {
public Set<Identifier> getTables() {
return tables;
}

Expand Down Expand Up @@ -349,7 +350,7 @@ public static final class Builder {
private RelationId relationId = RelationId.anonymous();
private RelationType relationType = new RelationType();
private final Map<String, WithQuery> namedQueries = new HashMap<>();
private final List<Identifier> tables = new ArrayList<>();
private final Set<Identifier> tables = new HashSet<>();
private Optional<Scope> parent = Optional.empty();
private boolean queryBoundary;

Expand Down Expand Up @@ -388,7 +389,7 @@ public Builder withNamedQuery(String name, WithQuery withQuery) {
return this;
}

public Builder withTables(List<Identifier> tables) {
public Builder withTables(Set<Identifier> tables) {
this.tables.addAll(tables);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Offset;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.OrderBy;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ParameterizedHintItem;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternRecognitionRelation;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property;
Expand All @@ -146,6 +147,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Select;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SelectHint;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SelectItem;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetOperation;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetProperties;
Expand All @@ -160,6 +162,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleGroupBy;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleHintItem;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SingleColumn;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SkipTo;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SortItem;
Expand Down Expand Up @@ -193,6 +196,11 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
import org.apache.iotdb.db.queryengine.plan.relational.type.CompatibleResolver;
import org.apache.iotdb.db.queryengine.plan.relational.type.TypeManager;
import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.FollowerHint;
import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.Hint;
import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.HintFactory;
import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.LeaderHint;
import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.LeadingHint;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
Expand Down Expand Up @@ -360,6 +368,16 @@ private enum UpdateKind {
MERGE,
}

// Hint definition and processing methods
private static final Map<String, HintFactory> HINT_DEFINITIONS =
ImmutableMap.of(
"LEADER",
new HintFactory(LeaderHint.hintName, LeaderHint::new, true),
"FOLLOWER",
new HintFactory(FollowerHint.hintName, FollowerHint::new, true),
"LEADING",
new HintFactory(LeadingHint.hintName, LeadingHint::new, false));

/**
* Visitor context represents local query scope (if exists). The invariant is that the local query
* scopes hierarchy should always have outer query scope (if provided) as ancestor.
Expand Down Expand Up @@ -1238,6 +1256,8 @@ protected Scope visitQuerySpecification(QuerySpecification node, Optional<Scope>
orderByScope.orElseThrow(() -> new NoSuchElementException("No value present")));
}

// select hint
analyzeHint(node, sourceScope);
return outputScope;
}

Expand Down Expand Up @@ -1491,6 +1511,71 @@ private void analyzeWhere(Node node, Scope scope, Expression predicate) {
analysis.setWhere(node, predicate);
}

private void analyzeHint(QuerySpecification node, Scope scope) {
Optional<SelectHint> selectHint = node.getSelectHint();
selectHint.ifPresent(hint -> process(hint, scope));
}

@Override
public Scope visitSelectHint(SelectHint node, final Optional<Scope> context) {
Map<String, Hint> hintMap = new HashMap<>();
for (Node hintItem : node.getHintItems()) {
if (hintItem instanceof ParameterizedHintItem) {
ParameterizedHintItem paramHint = (ParameterizedHintItem) hintItem;
String hintName = paramHint.getHintName();
List<String> params = paramHint.getParameters();
addHint(hintName, params, hintMap);
} else if (hintItem instanceof SimpleHintItem) {
SimpleHintItem simpleHint = (SimpleHintItem) hintItem;
String hintName = simpleHint.getHintName();
addHint(hintName, null, hintMap);
}
}
analysis.setHintMap(hintMap);
return createAndAssignScope(node, context);
}

private List<String> intersect(List<String> a, List<String> b) {
return a.stream().filter(b::contains).collect(toImmutableList());
}

private void addHint(String hintName, List<String> parameters, Map<String, Hint> hintMap) {
HintFactory definition = HINT_DEFINITIONS.get(hintName);
if (definition != null) {
List<String> existingTables =
analysis.getRelationNames().values().stream()
.map(QualifiedName::getSuffix)
.collect(toImmutableList());

if (definition.shouldExpandParameters()) {
List<String> tablesToProcess = parameters == null ? existingTables : parameters;
tablesToProcess.stream()
.filter(table -> parameters == null || existingTables.contains(table))
.forEach(
table -> {
Hint hint = definition.createHint(ImmutableList.of(table));
String hintKey = hint.getKey();
hintMap.putIfAbsent(hintKey, hint);
});
} else {
// Skip if parameters contain tables that don't exist in the query
if (parameters != null) {
boolean hasInvalidTable =
parameters.stream().anyMatch(table -> !existingTables.contains(table));
if (hasInvalidTable) {
return;
}
}

Hint hint = definition.createHint(parameters);
String hintKey = hint.getKey();
if (!hintMap.containsKey(hintKey)) {
hintMap.put(hintKey, hint);
}
}
}
}

private List<Expression> analyzeSelect(QuerySpecification node, Scope scope) {
ImmutableList.Builder<Expression> outputExpressionBuilder = ImmutableList.builder();
ImmutableList.Builder<Analysis.SelectExpression> selectExpressionBuilder =
Expand Down Expand Up @@ -3592,7 +3677,7 @@ protected Scope visitValues(Values node, Optional<Scope> scope) {
@Override
protected Scope visitAliasedRelation(AliasedRelation relation, Optional<Scope> scope) {
analysis.setRelationName(relation, QualifiedName.of(ImmutableList.of(relation.getAlias())));
analysis.addAliased(relation.getRelation());
analysis.addAliased(relation.getRelation(), relation.getAlias());
Scope relationScope = process(relation.getRelation(), scope);
RelationType relationType = relationScope.getRelationType();

Expand Down Expand Up @@ -3643,7 +3728,7 @@ protected Scope visitJoin(Join node, Optional<Scope> scope) {
joinConditionCheck(criteria);

// Remember original tables before processing left
List<Identifier> originalTables = new ArrayList<>();
Set<Identifier> originalTables = new HashSet<>();
scope.ifPresent(s -> originalTables.addAll(s.getTables()));

Scope left = process(node.getLeft(), scope);
Expand Down
Loading