diff --git a/tests/systemtests/audit_peer_observation_completeness_test.go b/tests/systemtests/audit_peer_observation_completeness_test.go index 9f427ce0..7feb7493 100644 --- a/tests/systemtests/audit_peer_observation_completeness_test.go +++ b/tests/systemtests/audit_peer_observation_completeness_test.go @@ -10,8 +10,9 @@ import ( "github.com/tidwall/sjson" ) -// This test validates that ACTIVE probers must submit storage challenge observations for all assigned targets. -func TestAuditSubmitReport_ProberRequiresAllPeerObservations(t *testing.T) { +// This test validates that incomplete peer observations are accepted, persisted, and +// penalize the reporter once without immediate supernode postponement. +func TestAuditSubmitReport_IncompletePeerObservationsAcceptedWithReporterPenalty(t *testing.T) { const ( // Keep epochs long enough in real time to avoid end-blocker enforcement during the test. epochLengthBlocks = uint64(20) @@ -44,5 +45,14 @@ func TestAuditSubmitReport_ProberRequiresAllPeerObservations(t *testing.T) { host := auditHostReportJSON([]string{"PORT_STATE_OPEN"}) _, prober, _ := findAssignedProberAndTarget(t, epochID, []testNodeIdentity{n0, n1}) txResp := submitEpochReport(t, cli, prober.nodeName, epochID, host, nil) - RequireTxFailure(t, txResp, "expected storage challenge observations") + RequireTxSuccess(t, txResp) + + report := auditQueryReport(t, epochID, prober.accAddr) + require.Len(t, report.StorageChallengeObservations, 0) + + reliability := auditQueryReporterReliabilityState(t, prober.accAddr) + require.Equal(t, int64(8), reliability.ReliabilityScore) + require.Equal(t, epochID, reliability.LastUpdatedEpoch) + require.Equal(t, "REPORTER_TRUST_BAND_NORMAL", reliability.TrustBand.String()) + require.Equal(t, "SUPERNODE_STATE_ACTIVE", querySupernodeLatestState(t, cli, prober.valAddr)) } diff --git a/tests/systemtests/audit_test_helpers_test.go b/tests/systemtests/audit_test_helpers_test.go index edc0aa12..c0561c0b 100644 --- a/tests/systemtests/audit_test_helpers_test.go +++ b/tests/systemtests/audit_test_helpers_test.go @@ -379,6 +379,18 @@ func auditQueryReport(t *testing.T, epochID uint64, reporterSupernodeAccount str return resp.Report } +func auditQueryReporterReliabilityState(t *testing.T, reporterSupernodeAccount string) audittypes.ReporterReliabilityState { + t.Helper() + qc, _ := newAuditQueryClient(t) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + resp, err := qc.ReporterReliabilityState(ctx, &audittypes.QueryReporterReliabilityStateRequest{ + ReporterSupernodeAccount: reporterSupernodeAccount, + }) + require.NoError(t, err) + return resp.State +} + func auditQueryAssignedTargets(t *testing.T, epochID uint64, filterByEpochID bool, proberSupernodeAccount string) audittypes.QueryAssignedTargetsResponse { t.Helper() qc, _ := newAuditQueryClient(t) diff --git a/x/audit/v1/keeper/msg_submit_epoch_report.go b/x/audit/v1/keeper/msg_submit_epoch_report.go index b9ecc5f5..144b8088 100644 --- a/x/audit/v1/keeper/msg_submit_epoch_report.go +++ b/x/audit/v1/keeper/msg_submit_epoch_report.go @@ -2,6 +2,7 @@ package keeper import ( "context" + "strconv" errorsmod "cosmossdk.io/errors" sdk "github.com/cosmos/cosmos-sdk/types" @@ -9,6 +10,11 @@ import ( "github.com/LumeraProtocol/lumera/x/audit/v1/types" ) +const ( + incompleteReportReason = "INCOMPLETE_REPORT" + incompleteReportReporterReliabilityPenalty = int64(8) +) + func (m msgServer) SubmitEpochReport(ctx context.Context, req *types.MsgSubmitEpochReport) (*types.MsgSubmitEpochReportResponse, error) { if req == nil { return nil, errorsmod.Wrap(types.ErrInvalidSigner, "empty request") @@ -83,17 +89,15 @@ func (m msgServer) SubmitEpochReport(ctx context.Context, req *types.MsgSubmitEp len(req.HostReport.InboundPortStates), requiredPortsLen, ) } + incompleteReport := false if !isProber { // Not a prober for this epoch (e.g. POSTPONED). Peer observations are not accepted. if len(req.StorageChallengeObservations) > 0 { return nil, errorsmod.Wrap(types.ErrInvalidReporterState, "reporter not eligible for storage challenge observations in this epoch") } } else { - // Probers must submit peer observations for all assigned targets for the epoch. - if len(req.StorageChallengeObservations) != len(allowedTargets) { - return nil, errorsmod.Wrapf(types.ErrInvalidPeerObservations, "expected storage challenge observations for %d assigned targets; got %d", len(allowedTargets), len(req.StorageChallengeObservations)) - } - + // Probers may submit a subset of assigned peer observations; missing + // assigned targets are accepted but penalize the reporter once per epoch. seenTargets := make(map[string]struct{}, len(req.StorageChallengeObservations)) for _, obs := range req.StorageChallengeObservations { if obs == nil { @@ -118,9 +122,7 @@ func (m msgServer) SubmitEpochReport(ctx context.Context, req *types.MsgSubmitEp return nil, errorsmod.Wrapf(types.ErrInvalidPortStatesLength, "port_states length %d does not match required_open_ports length %d", len(obs.PortStates), requiredPortsLen) } } - if len(seenTargets) != len(allowedTargets) { - return nil, errorsmod.Wrap(types.ErrInvalidPeerObservations, "peer observations do not cover all assigned targets") - } + incompleteReport = len(seenTargets) < len(allowedTargets) } // Per PR #118 / Zee F2 — cap storage proof results to bound processing cost. if len(req.StorageProofResults) > types.MaxStorageProofResultsPerReport { @@ -178,6 +180,40 @@ func (m msgServer) SubmitEpochReport(ctx context.Context, req *types.MsgSubmitEp if err := m.applyStorageTruthScores(sdkCtx, req.EpochId, reporterAccount, req.StorageProofResults); err != nil { return nil, err } + if incompleteReport { + if err := m.applyIncompleteReportPenalty(sdkCtx, req.EpochId, reporterAccount, assignParams); err != nil { + return nil, err + } + } return &types.MsgSubmitEpochReportResponse{}, nil } + +func (k Keeper) applyIncompleteReportPenalty(ctx sdk.Context, epochID uint64, reporterAccount string, params types.Params) error { + state, updated, err := k.applyReporterReliabilityDelta( + ctx, + epochID, + reporterAccount, + incompleteReportReporterReliabilityPenalty, + params.StorageTruthReporterReliabilityDecayPerEpoch, + 0, + params, + ) + if err != nil { + return err + } + if !updated { + return nil + } + + ctx.EventManager().EmitEvent(sdk.NewEvent( + types.EventTypeStorageTruthScoreUpdated, + sdk.NewAttribute(sdk.AttributeKeyModule, types.ModuleName), + sdk.NewAttribute(types.AttributeKeyEpochID, strconv.FormatUint(epochID, 10)), + sdk.NewAttribute(types.AttributeKeyReporterSupernodeAccount, reporterAccount), + sdk.NewAttribute(types.AttributeKeyReporterReliabilityScore, strconv.FormatInt(state.ReliabilityScore, 10)), + sdk.NewAttribute(types.AttributeKeyReporterTrustBand, state.TrustBand.String()), + sdk.NewAttribute("reason", incompleteReportReason), + )) + return nil +} diff --git a/x/audit/v1/keeper/msg_submit_epoch_report_test.go b/x/audit/v1/keeper/msg_submit_epoch_report_test.go index de5c4bee..30d4af3c 100644 --- a/x/audit/v1/keeper/msg_submit_epoch_report_test.go +++ b/x/audit/v1/keeper/msg_submit_epoch_report_test.go @@ -132,6 +132,213 @@ func TestSubmitEpochReport_ValidatesInboundPortStatesLength(t *testing.T) { require.Error(t, err) } +func TestSubmitEpochReport_AcceptsPartialStorageChallengeObservationsAndPenalizesReporter(t *testing.T) { + f := initFixture(t) + f.ctx = f.ctx.WithBlockHeight(1).WithEventManager(sdk.NewEventManager()) + + params := types.DefaultParams().WithDefaults() + params.StorageTruthEnforcementMode = types.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED + require.NoError(t, f.keeper.SetParams(f.ctx, params)) + + ms := keeper.NewMsgServerImpl(f.keeper) + + reporter := "sn-aaa-reporter" + target1 := "sn-bbb-target" + target2 := "sn-ccc-target" + target3 := "sn-ddd-target" + activeAndTargets := []string{reporter, target1, target2, target3} + + f.supernodeKeeper.EXPECT(). + GetSuperNodeByAccount(gomock.Any(), reporter). + Return(sntypes.SuperNode{}, true, nil). + AnyTimes() + + seedEpochAnchorForReportTest(t, f, 0, activeAndTargets, activeAndTargets) + + portStates := make([]types.PortState, len(types.DefaultRequiredOpenPorts)) + for i := range portStates { + portStates[i] = types.PortState_PORT_STATE_OPEN + } + + _, err := ms.SubmitEpochReport(f.ctx, &types.MsgSubmitEpochReport{ + Creator: reporter, + EpochId: 0, + HostReport: types.HostReport{ + InboundPortStates: portStates, + }, + StorageChallengeObservations: []*types.StorageChallengeObservation{ + {TargetSupernodeAccount: target1, PortStates: portStates}, + {TargetSupernodeAccount: target2, PortStates: portStates}, + }, + }) + require.NoError(t, err) + + report, found := f.keeper.GetReport(f.ctx, 0, reporter) + require.True(t, found) + require.Len(t, report.StorageChallengeObservations, 2) + + state, found := f.keeper.GetReporterReliabilityState(f.ctx, reporter) + require.True(t, found) + require.Equal(t, int64(8), state.ReliabilityScore) + require.Equal(t, uint64(0), state.LastUpdatedEpoch) + require.Equal(t, types.ReporterTrustBand_REPORTER_TRUST_BAND_NORMAL, state.TrustBand) + + var foundReason bool + for _, event := range f.ctx.EventManager().Events() { + if event.Type != types.EventTypeStorageTruthScoreUpdated { + continue + } + attrs := make(map[string]string, len(event.Attributes)) + for _, attr := range event.Attributes { + attrs[string(attr.Key)] = string(attr.Value) + } + if attrs["reason"] == "INCOMPLETE_REPORT" { + foundReason = true + require.Equal(t, reporter, attrs[types.AttributeKeyReporterSupernodeAccount]) + require.Equal(t, "8", attrs[types.AttributeKeyReporterReliabilityScore]) + } + } + require.True(t, foundReason, "expected INCOMPLETE_REPORT scoring event") +} + +func TestSubmitEpochReport_FullStorageChallengeCoverageDoesNotPenalizeReporter(t *testing.T) { + f := initFixture(t) + f.ctx = f.ctx.WithBlockHeight(1).WithEventManager(sdk.NewEventManager()) + + params := types.DefaultParams().WithDefaults() + params.StorageTruthEnforcementMode = types.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED + require.NoError(t, f.keeper.SetParams(f.ctx, params)) + + ms := keeper.NewMsgServerImpl(f.keeper) + + reporter := "sn-aaa-reporter" + target1 := "sn-bbb-target" + target2 := "sn-ccc-target" + target3 := "sn-ddd-target" + activeAndTargets := []string{reporter, target1, target2, target3} + + f.supernodeKeeper.EXPECT(). + GetSuperNodeByAccount(gomock.Any(), reporter). + Return(sntypes.SuperNode{}, true, nil). + AnyTimes() + + seedEpochAnchorForReportTest(t, f, 0, activeAndTargets, activeAndTargets) + + portStates := make([]types.PortState, len(types.DefaultRequiredOpenPorts)) + for i := range portStates { + portStates[i] = types.PortState_PORT_STATE_OPEN + } + + _, err := ms.SubmitEpochReport(f.ctx, &types.MsgSubmitEpochReport{ + Creator: reporter, + EpochId: 0, + HostReport: types.HostReport{ + InboundPortStates: portStates, + }, + StorageChallengeObservations: []*types.StorageChallengeObservation{ + {TargetSupernodeAccount: target1, PortStates: portStates}, + {TargetSupernodeAccount: target2, PortStates: portStates}, + {TargetSupernodeAccount: target3, PortStates: portStates}, + }, + }) + require.NoError(t, err) + + _, found := f.keeper.GetReporterReliabilityState(f.ctx, reporter) + require.False(t, found, "complete peer-observation coverage must not create reporter reliability penalty state") + for _, event := range f.ctx.EventManager().Events() { + if event.Type != types.EventTypeStorageTruthScoreUpdated { + continue + } + for _, attr := range event.Attributes { + require.NotEqual(t, "INCOMPLETE_REPORT", string(attr.Value)) + } + } +} + +func TestSubmitEpochReport_StillRejectsInvalidStorageChallengeObservations(t *testing.T) { + testCases := []struct { + name string + observations func(portStates []types.PortState) []*types.StorageChallengeObservation + wantErr error + wantSubstring string + }{ + { + name: "nil observation", + observations: func(_ []types.PortState) []*types.StorageChallengeObservation { + return []*types.StorageChallengeObservation{nil} + }, + wantErr: types.ErrInvalidPeerObservations, + wantSubstring: "nil storage challenge observation", + }, + { + name: "unassigned target", + observations: func(portStates []types.PortState) []*types.StorageChallengeObservation { + return []*types.StorageChallengeObservation{{TargetSupernodeAccount: "sn-zzz-unassigned", PortStates: portStates}} + }, + wantErr: types.ErrInvalidPeerObservations, + wantSubstring: "is not assigned to reporter", + }, + { + name: "duplicate target", + observations: func(portStates []types.PortState) []*types.StorageChallengeObservation { + return []*types.StorageChallengeObservation{ + {TargetSupernodeAccount: "sn-bbb-target", PortStates: portStates}, + {TargetSupernodeAccount: "sn-bbb-target", PortStates: portStates}, + } + }, + wantErr: types.ErrInvalidPeerObservations, + wantSubstring: "duplicate storage challenge observation", + }, + { + name: "wrong port-state length", + observations: func(_ []types.PortState) []*types.StorageChallengeObservation { + return []*types.StorageChallengeObservation{{TargetSupernodeAccount: "sn-bbb-target", PortStates: []types.PortState{types.PortState_PORT_STATE_OPEN}}} + }, + wantErr: types.ErrInvalidPortStatesLength, + wantSubstring: "port_states length", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + f := initFixture(t) + f.ctx = f.ctx.WithBlockHeight(1) + + params := types.DefaultParams().WithDefaults() + params.StorageTruthEnforcementMode = types.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED + require.NoError(t, f.keeper.SetParams(f.ctx, params)) + + ms := keeper.NewMsgServerImpl(f.keeper) + reporter := "sn-aaa-reporter" + activeAndTargets := []string{reporter, "sn-bbb-target", "sn-ccc-target", "sn-ddd-target"} + + f.supernodeKeeper.EXPECT(). + GetSuperNodeByAccount(gomock.Any(), reporter). + Return(sntypes.SuperNode{}, true, nil). + AnyTimes() + + seedEpochAnchorForReportTest(t, f, 0, activeAndTargets, activeAndTargets) + + portStates := make([]types.PortState, len(types.DefaultRequiredOpenPorts)) + for i := range portStates { + portStates[i] = types.PortState_PORT_STATE_OPEN + } + + _, err := ms.SubmitEpochReport(f.ctx, &types.MsgSubmitEpochReport{ + Creator: reporter, + EpochId: 0, + HostReport: types.HostReport{ + InboundPortStates: portStates, + }, + StorageChallengeObservations: tc.observations(portStates), + }) + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr.Error()) + require.Contains(t, err.Error(), tc.wantSubstring) + }) + } +} + func TestSubmitEpochReport_PersistsStorageProofResults(t *testing.T) { f := initFixture(t) f.ctx = f.ctx.WithBlockHeight(1)