Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.api.request.tenant.Tenant;
import io.stargate.sgv2.jsonapi.api.v1.metrics.JsonApiMetricsConfig;
import io.stargate.sgv2.jsonapi.api.v1.metrics.MetricsConfig;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -22,60 +22,55 @@ public class JsonProcessingMetricsReporter {
private final MeterRegistry meterRegistry;
private final JsonApiMetricsConfig jsonApiMetricsConfig;

private final RequestContext requestContext;
private final MetricsConfig.TenantRequestCounterConfig tenantConfig;

@Inject
public JsonProcessingMetricsReporter(
MeterRegistry meterRegistry,
JsonApiMetricsConfig jsonApiMetricsConfig,
RequestContext requestContext,
MetricsConfig metricsConfig) {
this.meterRegistry = meterRegistry;
this.jsonApiMetricsConfig = jsonApiMetricsConfig;
this.requestContext = requestContext;
tenantConfig = metricsConfig.tenantRequestCounter();
}

public void reportJsonWriteBytesMetrics(String commandName, long docJsonSize) {
public void reportJsonWriteBytesMetrics(Tenant tenant, String commandName, long docJsonSize) {
DistributionSummary ds =
DistributionSummary.builder(jsonApiMetricsConfig.jsonBytesWritten())
.tags(getCustomTags(commandName))
.tags(getCustomTags(tenant, commandName))
.register(meterRegistry);
ds.record(docJsonSize);
}

public void reportJsonReadBytesMetrics(String commandName, long docJsonSize) {
public void reportJsonReadBytesMetrics(Tenant tenant, String commandName, long docJsonSize) {
DistributionSummary ds =
DistributionSummary.builder(jsonApiMetricsConfig.jsonBytesRead())
.tags(getCustomTags(commandName))
.tags(getCustomTags(tenant, commandName))
.register(meterRegistry);
ds.record(docJsonSize);
}

public void reportJsonWrittenDocsMetrics(String commandName, int docCount) {
public void reportJsonWrittenDocsMetrics(Tenant tenant, String commandName, int docCount) {
DistributionSummary ds =
DistributionSummary.builder(jsonApiMetricsConfig.jsonDocsWritten())
.tags(getCustomTags(commandName))
.tags(getCustomTags(tenant, commandName))
.register(meterRegistry);
ds.record(docCount);
}

public void reportJsonReadDocsMetrics(String commandName, int docCount) {
public void reportJsonReadDocsMetrics(Tenant tenant, String commandName, int docCount) {
DistributionSummary ds =
DistributionSummary.builder(jsonApiMetricsConfig.jsonDocsRead())
.tags(getCustomTags(commandName))
.tags(getCustomTags(tenant, commandName))
.register(meterRegistry);
ds.record(docCount);
}

private Tags getCustomTags(String commandName) {
private Tags getCustomTags(Tenant tenant, String commandName) {
// TODO: This is a bug? Added null check to match previous optional behavior
// Not making tenant refactor PR too large
Tag tenantTag =
Tag.of(
tenantConfig.tenantTag(),
requestContext.tenant() == null ? UNKNOWN_VALUE : requestContext.tenant().toString());
Tag.of(tenantConfig.tenantTag(), tenant == null ? UNKNOWN_VALUE : tenant.toString());
Tag commandTag = Tag.of(jsonApiMetricsConfig.command(), commandName);
return Tags.of(commandTag, tenantTag);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.stargate.sgv2.jsonapi.service.operation.collections;

import com.fasterxml.jackson.databind.JsonNode;
import io.stargate.sgv2.jsonapi.api.request.tenant.Tenant;
import io.stargate.sgv2.jsonapi.service.operation.InsertAttemptBuilder;
import io.stargate.sgv2.jsonapi.service.schema.collections.CollectionSchemaObject;
import io.stargate.sgv2.jsonapi.service.shredding.collections.DocumentId;
Expand All @@ -14,6 +15,7 @@ public class CollectionInsertAttemptBuilder

private final DocumentShredder documentShredder;
private final CollectionSchemaObject collectionSchemaObject;
private final Tenant tenant;

// TODO: remove commandName - we only need commandName for handing to the shredder to report
// metics
Expand All @@ -25,11 +27,13 @@ public class CollectionInsertAttemptBuilder
public CollectionInsertAttemptBuilder(
CollectionSchemaObject collectionSchemaObject,
DocumentShredder documentShredder,
Tenant tenant,
String commandName) {
this.collectionSchemaObject =
Objects.requireNonNull(collectionSchemaObject, "collectionSchemaObject must not be null");
this.documentShredder =
Objects.requireNonNull(documentShredder, "documentShredder must not be null");
this.tenant = tenant;
this.commandName = commandName;
}

Expand All @@ -46,6 +50,7 @@ public CollectionInsertAttempt build(JsonNode jsonNode) {
jsonNode,
null,
collectionSchemaObject.indexingProjector(),
tenant,
commandName,
collectionSchemaObject,
docIdRef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.api.request.tenant.Tenant;
import io.stargate.sgv2.jsonapi.exception.*;
import io.stargate.sgv2.jsonapi.metrics.JsonProcessingMetricsReporter;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor;
Expand Down Expand Up @@ -78,6 +79,7 @@ default Uni<FindResponse> findDocument(
DocumentProjector projection,
int limit,
boolean vectorSearch,
Tenant tenant,
String commandName,
JsonProcessingMetricsReporter jsonProcessingMetricsReporter) {
return Multi.createFrom()
Expand Down Expand Up @@ -109,7 +111,7 @@ default Uni<FindResponse> findDocument(
// create metrics
// TODO Use the column names!
jsonProcessingMetricsReporter.reportJsonReadBytesMetrics(
commandName, row.getString(2).length());
tenant, commandName, row.getString(2).length());

if (projection.doIncludeSimilarityScore()) {
float score = row.getFloat(3); // similarity_score
Expand Down Expand Up @@ -204,6 +206,7 @@ default Uni<FindResponse> findOrderDocument(
int errorLimit,
DocumentProjector projection,
boolean vectorSearch,
Tenant tenant,
String commandName,
JsonProcessingMetricsReporter jsonProcessingMetricsReporter) {
final AtomicInteger documentCounter = new AtomicInteger(0);
Expand Down Expand Up @@ -310,7 +313,7 @@ default Uni<FindResponse> findOrderDocument(
sortValues);
documents.add(document);
jsonProcessingMetricsReporter.reportJsonReadBytesMetrics(
commandName, row.getString(2).length());
tenant, commandName, row.getString(2).length());
}
return Uni.createFrom().item(documents);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public Uni<Supplier<CommandResult>> execute(
commandContext
.jsonProcessingMetricsReporter()
.reportJsonReadDocsMetrics(
commandContext().commandName(), deletedInformation.size());
commandContext().requestContext().tenant(),
commandContext().commandName(),
deletedInformation.size());
return new DeleteOperationPage(
deletedInformation, moreData.get(), returnDocumentInResponse, deleteLimit == 1);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,14 @@ public Uni<Supplier<CommandResult>> execute(
// map the response to result
.map(
docs -> {
// TODO: why is this here and not higher up where it can happen for any command result
// ?
// TODO: why is this here and not higher up where it can happen for any command
// result?
commandContext
.jsonProcessingMetricsReporter()
.reportJsonReadDocsMetrics(commandContext().commandName(), docs.docs().size());
.reportJsonReadDocsMetrics(
commandContext.requestContext().tenant(),
commandContext().commandName(),
docs.docs().size());
return new ReadOperationPage(
docs.docs(), singleResponse, docs.pageState(), includeSortVector(), vector());
});
Expand Down Expand Up @@ -421,6 +424,7 @@ public Uni<FindResponse> getDocuments(
maxSortReadLimit(),
projection(),
vector() != null,
commandContext.requestContext().tenant(),
commandContext.commandName(),
commandContext.jsonProcessingMetricsReporter());
}
Expand All @@ -437,6 +441,7 @@ public Uni<FindResponse> getDocuments(
projection,
limit(),
vector() != null,
commandContext.requestContext().tenant(),
commandContext.commandName(),
commandContext.jsonProcessingMetricsReporter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public Uni<Supplier<CommandResult>> execute(
if (commandContext.jsonProcessingMetricsReporter() != null) {
commandContext
.jsonProcessingMetricsReporter()
.reportJsonWrittenDocsMetrics(commandContext().commandName(), insertions.size());
.reportJsonWrittenDocsMetrics(
commandContext().requestContext().tenant(),
commandContext().commandName(),
insertions.size());
}
if (ordered) {
return insertOrdered(dataApiRequestInfo, queryExecutor, vectorEnabled, insertions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,11 @@ public Uni<Supplier<CommandResult>> execute(
if (upsert() && docs.isEmpty() && matchedCount.get() == 0) {
// TODO: creating the new document here, with the defaults from the filter, makes it
// harder because the new document created here may nto have an _id if there was
// none
// in the filter. A better approach may be to have the documentUpdater create
// none in the filter. A better approach may be to have the documentUpdater create
// the upsert document totally in once place.
// Currently creating to upsert document is in multiple places. To do this we would
// create
// UpdateOperations from the filter and give them to the document updated when it is
// created.
// create UpdateOperations from the filter and give them to the document updated
// when it is created.
return Multi.createFrom().item(findCollectionOperation().getNewDocument());
} else {
matchedCount.addAndGet(docs.size());
Expand Down Expand Up @@ -146,11 +144,16 @@ public Uni<Supplier<CommandResult>> execute(
// create json doc read/write metrics
commandContext
.jsonProcessingMetricsReporter()
.reportJsonReadDocsMetrics(commandContext().commandName(), matchedCount.get());
.reportJsonReadDocsMetrics(
commandContext().requestContext().tenant(),
commandContext().commandName(),
matchedCount.get());
commandContext
.jsonProcessingMetricsReporter()
.reportJsonWrittenDocsMetrics(
commandContext().commandName(), modifiedCount.get());
commandContext().requestContext().tenant(),
commandContext().commandName(),
modifiedCount.get());
return new UpdateCollectionOperationPage(
matchedCount.get(),
modifiedCount.get(),
Expand Down Expand Up @@ -299,16 +302,16 @@ static String buildUpdateQuery(
.append("\" SET ")
.append(
"""
tx_id = now(),
exist_keys = ?,
array_size = ?,
array_contains = ?,
query_bool_values = ?,
query_dbl_values = ?,
query_text_values = ?,
query_null_values = ?,
query_timestamp_values = ?,
""");
tx_id = now(),
exist_keys = ?,
array_size = ?,
array_contains = ?,
query_bool_values = ?,
query_dbl_values = ?,
query_text_values = ?,
query_null_values = ?,
query_timestamp_values = ?,
""");
if (vectorEnabled) {
updateQuery.append("\nquery_vector_value = ?,");
}
Expand All @@ -317,10 +320,10 @@ static String buildUpdateQuery(
}
updateQuery.append(
"""
doc_json = ?
WHERE key = ?
IF tx_id = ?
""");
doc_json = ?
WHERE key = ?
IF tx_id = ?
""");

return updateQuery.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public Operation<CollectionSchemaObject> resolveCollectionCommand(
final boolean returnDocumentResponses = (null != options) && options.returnDocumentResponses();

var builder =
new CollectionInsertAttemptBuilder(ctx.schemaObject(), documentShredder, ctx.commandName());
new CollectionInsertAttemptBuilder(
ctx.schemaObject(), documentShredder, ctx.requestContext().tenant(), ctx.commandName());
var attempts = command.documents().stream().map(builder::build).toList();

return new InsertCollectionOperation(ctx, attempts, ordered, false, returnDocumentResponses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public Operation<CollectionSchemaObject> resolveCollectionCommand(
CommandContext<CollectionSchemaObject> ctx, InsertOneCommand command) {

var builder =
new CollectionInsertAttemptBuilder(ctx.schemaObject(), documentShredder, ctx.commandName());
new CollectionInsertAttemptBuilder(
ctx.schemaObject(), documentShredder, ctx.requestContext().tenant(), ctx.commandName());

var attemps = List.of(builder.build(command.document()));
return new InsertCollectionOperation(ctx, attemps, false, false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.NoArgGenerator;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.request.tenant.Tenant;
import io.stargate.sgv2.jsonapi.config.DocumentLimitsConfig;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.exception.DocumentException;
Expand Down Expand Up @@ -70,6 +71,7 @@ public WritableShreddedDocument shred(
doc,
txId,
ctx.schemaObject().indexingProjector(),
ctx.requestContext().tenant(),
ctx.commandName(),
ctx.schemaObject(),
null);
Expand All @@ -95,6 +97,7 @@ public WritableShreddedDocument shred(
doc,
txId,
ctx.schemaObject().indexingProjector(),
ctx.requestContext().tenant(),
ctx.commandName(),
ctx.schemaObject(),
docIdToReturn);
Expand All @@ -104,6 +107,7 @@ public WritableShreddedDocument shred(
JsonNode doc,
UUID txId,
IndexingProjector indexProjector,
Tenant tenant,
String commandName,
CollectionSchemaObject collectionSettings,
AtomicReference<DocumentId> docIdToReturn) {
Expand Down Expand Up @@ -142,15 +146,16 @@ public WritableShreddedDocument shred(

// Create json bytes written metrics
if (jsonProcessingMetricsReporter != null) {
jsonProcessingMetricsReporter.reportJsonWriteBytesMetrics(commandName, docJson.length());
jsonProcessingMetricsReporter.reportJsonWriteBytesMetrics(
tenant, commandName, docJson.length());
}

final WritableShreddedDocument.Builder b =
WritableShreddedDocument.builder(docId, txId, docJson, docWithId);

// Before value validation, indexing, may need to drop "non-indexed" properties. But if so,
// need to ensure we do not modify original document, so let's create a copy (may need
// to be returned as "after" Document)
// Before value validation, indexing, may need to drop "non-indexed" properties. But if so, need
// to ensure we do not modify original document, so let's create a copy (may need to be returned
// as "after" Document)
ObjectNode indexableDocument;

if (indexProjector != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ public void deleteOneAndReturnById() {
line -> {
assertThat(line).contains("command=\"jsonBytesReadDeleteCommand\"");
assertThat(line).contains("module=\"sgv2-jsonapi\"");
assertThat(line).contains("tenant=\"unknown\"");
assertThat(line)
.contains(
"tenant=\"%s\"".formatted(COMMAND_CONTEXT.requestContext().tenant()));
});
});
// verify count metric -- command called once, should be one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ public void findAll() throws Exception {
line -> {
assertThat(line).contains("command=\"jsonBytesReadCommand\"");
assertThat(line).contains("module=\"sgv2-jsonapi\"");
assertThat(line).contains("tenant=\"unknown\"");
assertThat(line)
.contains(
"tenant=\"%s\"".formatted(COMMAND_CONTEXT.requestContext().tenant()));
});
});
// verify count metric -- command called once, should be one
Expand Down
Loading