/** * Update index(user properties) of vertex or edge * @param ilId the id of index label * @param element the properties owner * @param removed remove or add index */ protectedvoidupdateIndex(Id ilId, HugeElement element, boolean removed){ SchemaTransaction schema = this.params().schemaTransaction(); IndexLabel indexLabel = schema.getIndexLabel(ilId); E.checkArgument(indexLabel != null, "Not exist index label with id '%s'", ilId);
// Collect property values of index fields List<Object> allPropValues = new ArrayList<>(); int fieldsNum = indexLabel.indexFields().size(); int firstNullField = fieldsNum; for (Id fieldId : indexLabel.indexFields()) { HugeProperty<Object> property = element.getProperty(fieldId); if (property == null) { E.checkState(hasNullableProp(element, fieldId), "Non-null property '%s' is null for '%s'", this.graph().propertyKey(fieldId) , element); if (firstNullField == fieldsNum) { firstNullField = allPropValues.size(); } allPropValues.add(INDEX_SYM_NULL); } else { E.checkArgument(!INDEX_SYM_NULL.equals(property.value()), "Illegal value of index property: '%s'", INDEX_SYM_NULL); allPropValues.add(property.value()); } }
if (firstNullField == 0 && !indexLabel.indexType().isUnique()) { // The property value of first index field is null return; } // Not build index for record with nullable field (except unique index) List<Object> propValues = allPropValues.subList(0, firstNullField);
// Expired time long expiredTime = element.expiredTime();
// Update index for each index type switch (indexLabel.indexType()) { case RANGE_INT: case RANGE_FLOAT: case RANGE_LONG: case RANGE_DOUBLE: E.checkState(propValues.size() == 1, "Expect only one property in range index"); Object value = NumericUtil.convertToNumber(propValues.get(0)); this.updateIndex(indexLabel, value, element.id(), expiredTime, removed); break; case SEARCH: E.checkState(propValues.size() == 1, "Expect only one property in search index"); value = propValues.get(0); Set<String> words = this.segmentWords(value.toString()); for (String word : words) { this.updateIndex(indexLabel, word, element.id(), expiredTime, removed); } break; case SECONDARY: // Secondary index maybe include multi prefix index for (int i = 0, n = propValues.size(); i < n; i++) { List<Object> prefixValues = propValues.subList(0, i + 1); // prefixValues is list or set , should create index for // each item if(prefixValues.get(0) instanceof Collection) { for (Object propValue : (Collection<Object>) prefixValues.get(0)) { value = escapeIndexValueIfNeeded(propValue.toString()); this.updateIndex(indexLabel, value, element.id(), expiredTime, removed); } }else { value = ConditionQuery.concatValues(prefixValues); value = escapeIndexValueIfNeeded((String) value); this.updateIndex(indexLabel, value, element.id(), expiredTime, removed); } } break; case SHARD: value = ConditionQuery.concatValues(propValues); value = escapeIndexValueIfNeeded((String) value); this.updateIndex(indexLabel, value, element.id(), expiredTime, removed); break; case UNIQUE: value = ConditionQuery.concatValues(allPropValues); assert !value.equals(""); Id id = element.id(); // TODO: add lock for updating unique index if (!removed && this.existUniqueValue(indexLabel, value, id)) { thrownew IllegalArgumentException(String.format( "Unique constraint %s conflict is found for %s", indexLabel, element)); } this.updateIndex(indexLabel, value, element.id(), expiredTime, removed); break; default: thrownew AssertionError(String.format( "Unknown index type '%s'", indexLabel.indexType())); } }
E.checkState(propValues.size() == 1, "Expect only one property in range index"); Object value = NumericUtil.convertToNumber(propValues.get(0)); this.updateIndex(indexLabel, value, element.id(), expiredTime, removed);
updateIndex 代码:
1 2 3 4 5 6 7 8 9 10 11 12
privatevoidupdateIndex(IndexLabel indexLabel, Object propValue, Id elementId, long expiredTime, boolean removed){ HugeIndex index = new HugeIndex(this.graph(), indexLabel); index.fieldValues(propValue); index.elementIds(elementId, expiredTime);
if (removed) { this.doEliminate(this.serializer.writeIndex(index)); } else { this.doAppend(this.serializer.writeIndex(index)); } }
构造索引,根据removed来决定是append还是删除。
通过GraphSerializer序列化索引
这里我们来探索Serializer是如何做的,比如Binary:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
Id id = index.id(); HugeType type = index.type(); byte[] value = null; if (!type.isNumericIndex() && indexIdLengthExceedLimit(id)) { id = index.hashId(); // Save field-values as column value if the key is a hash string value = StringEncoding.encode(index.fieldValues().toString()); }
publicstatic Id formatIndexId(HugeType type, Id indexLabelId, Object fieldValues){ if (type.isStringIndex()) { String value = ""; if (fieldValues instanceof Id) { value = IdGenerator.asStoredString((Id) fieldValues); } elseif (fieldValues != null) { value = fieldValues.toString(); } /* * Modify order between index label and field-values to put the * index label in front(hugegraph-1317) */ String strIndexLabelId = IdGenerator.asStoredString(indexLabelId); return SplicingIdGenerator.splicing(strIndexLabelId, value); } else { assert type.isRangeIndex(); int length = type.isRange4Index() ? 4 : 8; BytesBuffer buffer = BytesBuffer.allocate(4 + length); buffer.writeInt(SchemaElement.schemaId(indexLabelId)); if (fieldValues != null) { E.checkState(fieldValues instanceof Number, "Field value of range index must be number:" + " %s", fieldValues.getClass().getSimpleName()); byte[] bytes = number2bytes((Number) fieldValues); buffer.write(bytes); } return buffer.asId(); } }
protected <R> QueryList<R> optimizeQueries(Query query, QueryResults.Fetcher<R> fetcher){ QueryList<R> queries = new QueryList<>(query, fetcher); for (ConditionQuery cq: ConditionQueryFlatten.flatten( (ConditionQuery) query)) { // Optimize by sysprop Query q = this.optimizeQuery(cq); /* * NOTE: There are two possibilities for this query: * 1.sysprop-query, which would not be empty. * 2.index-query result(ids after optimization), which may be empty. */ if (q == null) { queries.add(this.indexQuery(cq), this.batchSize); } elseif (!q.empty()) { queries.add(q); } } return queries; }
@Watched(prefix = "index") public IdHolderList queryIndex(ConditionQuery query){ // Index query must have been flattened in Graph tx query.checkFlattened();
// NOTE: Currently we can't support filter changes in memory if (this.hasUpdate()) { thrownew HugeException("Can't do index query when " + "there are changes in transaction"); }
// Can't query by index and by non-label sysprop at the same time List<Condition> conds = query.syspropConditions(); if (conds.size() > 1 || (conds.size() == 1 && !query.containsCondition(HugeKeys.LABEL))) { thrownew HugeException("Can't do index query with %s and %s", conds, query.userpropConditions()); }
// Query by index query.optimized(OptimizedType.INDEX); if (query.allSysprop() && conds.size() == 1 && query.containsCondition(HugeKeys.LABEL)) { // Query only by label returnthis.queryByLabel(query); } else { // Query by userprops (or userprops + label) returnthis.queryByUserprop(query); } }
@Watched(prefix = "index") private IdHolderList queryByUserprop(ConditionQuery query){ // Get user applied label or collect all qualified labels with // related index labels Set<MatchedIndex> indexes = this.collectMatchedIndexes(query); if (indexes.isEmpty()) { Id label = query.condition(HugeKeys.LABEL); throw noIndexException(this.graph(), query, label); }
// Value type of Condition not matched boolean paging = query.paging(); if (!validQueryConditionValues(this.graph(), query)) { return IdHolderList.empty(paging); }
// Do index query IdHolderList holders = new IdHolderList(paging); for (MatchedIndex index : indexes) { for (IndexLabel il : index.indexLabels()) { validateIndexLabel(il); } if (paging && index.indexLabels().size() > 1) { thrownew NotSupportException("joint index query in paging"); }
if (index.containsSearchIndex()) { // Do search-index query holders.addAll(this.doSearchIndex(query, index)); } else { // Do secondary-index, range-index or shard-index query IndexQueries queries = index.constructIndexQueries(query); assert !paging || queries.size() <= 1; IdHolder holder = this.doSingleOrJointIndex(queries); holders.add(holder); }
/* * NOTE: need to skip the offset if offset > 0, but can't handle * it here because the query may a sub-query after flatten, * so the offset will be handle in QueryList.IndexQuery * * TODO: finish early here if records exceeds required limit with * FixedIdHolder. */ } return holders; }
// Register results filter to compare property value and search text query.registerResultsFilter(elem -> { for (Condition cond : originQuery.conditions()) { Object key = cond.isRelation() ? ((Relation) cond).key() : null; if (key instanceof Id && indexFields.contains(key)) { // This is an index field of search index Id field = (Id) key; assert elem != null; HugeProperty<?> property = elem.getProperty(field); String propValue = propertyValueToString(property.value()); String fieldValue = (String) originQuery.userpropValue(field); if (this.matchSearchIndexWords(propValue, fieldValue)) { continue; } returnfalse; } if (!cond.test(elem)) { returnfalse; } } returntrue; });
return query; }
先分词
然后resetquery,Convert has(key, text) to has(key, textContainsAny(word1, word2))
@Watched(prefix = "index") private IdHolder doJointIndex(IndexQueries queries){ if (queries.oomRisk()) { LOG.warn("There is OOM risk if the joint operation is based on a " + "large amount of data, please use single index + filter " + "instead of joint index: {}", queries.rootQuery()); } // All queries are joined with AND Set<Id> intersectIds = null; boolean filtering = false; IdHolder resultHolder = null; for (Map.Entry<IndexLabel, ConditionQuery> e : queries.entrySet()) { IndexLabel indexLabel = e.getKey(); ConditionQuery query = e.getValue(); assert !query.paging(); if (!query.noLimit() && queries.size() > 1) { // Unset limit for intersection operation query.limit(Query.NO_LIMIT); } /* * Try to query by joint indexes: * 1 If there is any index exceeded the threshold, transform into * partial index query, then filter after back-table. * 1.1 Return the holder of the first index that not exceeded the * threshold if there exists one index, this holder will be used * as the only query condition. * 1.2 Return the holder of the first index if all indexes exceeded * the threshold. * 2 Else intersect holders for all indexes, and return intersection * ids of all indexes. */ IdHolder holder = this.doIndexQuery(indexLabel, query); if (resultHolder == null) { resultHolder = holder; } assertthis.indexIntersectThresh > 0; // default value is 1000 Set<Id> ids = ((BatchIdHolder) holder).peekNext( this.indexIntersectThresh).ids(); if (ids.size() >= this.indexIntersectThresh) { // Transform into filtering filtering = true; query.optimized(OptimizedType.INDEX_FILTER); } elseif (filtering) { assert ids.size() < this.indexIntersectThresh; resultHolder = holder; break; } else { if (intersectIds == null) { intersectIds = ids; } else { CollectionUtil.intersectWithModify(intersectIds, ids); } if (intersectIds.isEmpty()) { break; } } }
@Watched(prefix = "tx") public QueryResults<BackendEntry> query(Query query){ LOG.debug("Transaction query: {}", query); /* * NOTE: it's dangerous if an IdQuery/ConditionQuery is empty * check if the query is empty and its class is not the Query itself */ if (query.empty() && !query.getClass().equals(Query.class)) { thrownew BackendException("Query without any id or condition"); }
Query squery = this.serializer.writeQuery(query);
// Do rate limit if needed RateLimiter rateLimiter = this.graph.readRateLimiter(); if (rateLimiter != null && query.resultType().isGraph()) { double time = rateLimiter.acquire(1); if (time > 0) { LOG.debug("Waited for {}s to query", time); } BackendEntryIterator.checkInterrupted(); }
this.beforeRead(); try { returnnew QueryResults<>(this.store.query(squery), query); } finally { this.afterRead(); // TODO: not complete the iteration currently } }
逐级往下,核心代码在writeQueryCondition:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
@Override protected Query writeQueryCondition(Query query) { HugeType type = query.resultType(); if (!type.isIndex()) { return query; }
ConditionQuery cq = (ConditionQuery) query;
if (type.isNumericIndex()) { // Convert range-index/shard-index query to id range query returnthis.writeRangeIndexQuery(cq); } else { assert type.isSearchIndex() || type.isSecondaryIndex() || type.isUniqueIndex(); // Convert secondary-index or search-index query to id query returnthis.writeStringIndexQuery(cq); } }
private Query writeRangeIndexQuery(ConditionQuery query) { Id index = query.condition(HugeKeys.INDEX_LABEL_ID); E.checkArgument(index != null, "Please specify the index label");
List<Condition> fields = query.syspropConditions(HugeKeys.FIELD_VALUES); E.checkArgument(!fields.isEmpty(), "Please specify the index field values");
HugeType type = query.resultType(); Id start = null; if (query.paging() && !query.page().isEmpty()) { byte[] position = PageState.fromString(query.page()).position(); start = new BinaryId(position, null); }
RangeConditions range = new RangeConditions(fields); if (range.keyEq() != null) { Id id = formatIndexId(type, index, range.keyEq(), true); if (start == null) { returnnew IdPrefixQuery(query, id); } E.checkArgument(Bytes.compare(start.asBytes(), id.asBytes()) >= 0, "Invalid page out of lower bound"); returnnew IdPrefixQuery(query, start, id); }
Object keyMin = range.keyMin(); Object keyMax = range.keyMax(); boolean keyMinEq = range.keyMinEq(); boolean keyMaxEq = range.keyMaxEq(); if (keyMin == null) { E.checkArgument(keyMax != null, "Please specify at least one condition"); // Set keyMin to min value keyMin = NumericUtil.minValueOf(keyMax.getClass()); keyMinEq = true; }
Id min = formatIndexId(type, index, keyMin, false); if (!keyMinEq) { /* * Increase 1 to keyMin, index GT query is a scan with GT prefix, * inclusiveStart=false will also match index started with keyMin */ increaseOne(min.asBytes()); keyMinEq = true; }
if (start == null) { start = min; } else { E.checkArgument(Bytes.compare(start.asBytes(), min.asBytes()) >= 0, "Invalid page out of lower bound"); }
if (keyMax == null) { keyMax = NumericUtil.maxValueOf(keyMin.getClass()); keyMaxEq = true; } Id max = formatIndexId(type, index, keyMax, false); if (keyMaxEq) { keyMaxEq = false; increaseOne(max.asBytes()); } returnnew IdRangeQuery(query, start, keyMinEq, max, keyMaxEq); }
如果是其他索引,则转换为前缀匹配查询:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
private Query writeStringIndexQuery(ConditionQuery query){ E.checkArgument(query.allSysprop() && query.conditions().size() == 2, "There should be two conditions: " + "INDEX_LABEL_ID and FIELD_VALUES" + "in secondary index query");
Id index = query.condition(HugeKeys.INDEX_LABEL_ID); Object key = query.condition(HugeKeys.FIELD_VALUES);
E.checkArgument(index != null, "Please specify the index label"); E.checkArgument(key != null, "Please specify the index key");
protected BackendColumnIterator queryBy(Session session, Query query){ // Query all if (query.empty()) { returnthis.queryAll(session, query); }
// Query by prefix if (query instanceof IdPrefixQuery) { IdPrefixQuery pq = (IdPrefixQuery) query; returnthis.queryByPrefix(session, pq); }
// Query by range if (query instanceof IdRangeQuery) { IdRangeQuery rq = (IdRangeQuery) query; returnthis.queryByRange(session, rq); }
// Query by id if (query.conditions().isEmpty()) { assert !query.ids().isEmpty(); // NOTE: this will lead to lazy create rocksdb iterator returnnew BackendColumnIteratorWrapper(new FlatMapperIterator<>( query.ids().iterator(), id -> this.queryById(session, id) )); }