Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions tests/systemtests/audit_peer_observation_completeness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
"github.com/tidwall/sjson"
)

// This test validates that ACTIVE probers must submit storage challenge observations for all assigned targets.
func TestAuditSubmitReport_ProberRequiresAllPeerObservations(t *testing.T) {
// This test validates that incomplete peer observations are accepted, persisted, and
// penalize the reporter once without immediate supernode postponement.
func TestAuditSubmitReport_IncompletePeerObservationsAcceptedWithReporterPenalty(t *testing.T) {
const (
// Keep epochs long enough in real time to avoid end-blocker enforcement during the test.
epochLengthBlocks = uint64(20)
Expand Down Expand Up @@ -44,5 +45,14 @@ func TestAuditSubmitReport_ProberRequiresAllPeerObservations(t *testing.T) {
host := auditHostReportJSON([]string{"PORT_STATE_OPEN"})
_, prober, _ := findAssignedProberAndTarget(t, epochID, []testNodeIdentity{n0, n1})
txResp := submitEpochReport(t, cli, prober.nodeName, epochID, host, nil)
RequireTxFailure(t, txResp, "expected storage challenge observations")
RequireTxSuccess(t, txResp)

report := auditQueryReport(t, epochID, prober.accAddr)
require.Len(t, report.StorageChallengeObservations, 0)

reliability := auditQueryReporterReliabilityState(t, prober.accAddr)
require.Equal(t, int64(8), reliability.ReliabilityScore)
require.Equal(t, epochID, reliability.LastUpdatedEpoch)
require.Equal(t, "REPORTER_TRUST_BAND_NORMAL", reliability.TrustBand.String())
require.Equal(t, "SUPERNODE_STATE_ACTIVE", querySupernodeLatestState(t, cli, prober.valAddr))
}
12 changes: 12 additions & 0 deletions tests/systemtests/audit_test_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,18 @@ func auditQueryReport(t *testing.T, epochID uint64, reporterSupernodeAccount str
return resp.Report
}

func auditQueryReporterReliabilityState(t *testing.T, reporterSupernodeAccount string) audittypes.ReporterReliabilityState {
t.Helper()
qc, _ := newAuditQueryClient(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := qc.ReporterReliabilityState(ctx, &audittypes.QueryReporterReliabilityStateRequest{
ReporterSupernodeAccount: reporterSupernodeAccount,
})
require.NoError(t, err)
return resp.State
}

func auditQueryAssignedTargets(t *testing.T, epochID uint64, filterByEpochID bool, proberSupernodeAccount string) audittypes.QueryAssignedTargetsResponse {
t.Helper()
qc, _ := newAuditQueryClient(t)
Expand Down
52 changes: 44 additions & 8 deletions x/audit/v1/keeper/msg_submit_epoch_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package keeper

import (
"context"
"strconv"

errorsmod "cosmossdk.io/errors"
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/LumeraProtocol/lumera/x/audit/v1/types"
)

const (
incompleteReportReason = "INCOMPLETE_REPORT"
incompleteReportReporterReliabilityPenalty = int64(8)
)

func (m msgServer) SubmitEpochReport(ctx context.Context, req *types.MsgSubmitEpochReport) (*types.MsgSubmitEpochReportResponse, error) {
if req == nil {
return nil, errorsmod.Wrap(types.ErrInvalidSigner, "empty request")
Expand Down Expand Up @@ -83,17 +89,15 @@ func (m msgServer) SubmitEpochReport(ctx context.Context, req *types.MsgSubmitEp
len(req.HostReport.InboundPortStates), requiredPortsLen,
)
}
incompleteReport := false
if !isProber {
// Not a prober for this epoch (e.g. POSTPONED). Peer observations are not accepted.
if len(req.StorageChallengeObservations) > 0 {
return nil, errorsmod.Wrap(types.ErrInvalidReporterState, "reporter not eligible for storage challenge observations in this epoch")
}
} else {
// Probers must submit peer observations for all assigned targets for the epoch.
if len(req.StorageChallengeObservations) != len(allowedTargets) {
return nil, errorsmod.Wrapf(types.ErrInvalidPeerObservations, "expected storage challenge observations for %d assigned targets; got %d", len(allowedTargets), len(req.StorageChallengeObservations))
}

// Probers may submit a subset of assigned peer observations; missing
// assigned targets are accepted but penalize the reporter once per epoch.
seenTargets := make(map[string]struct{}, len(req.StorageChallengeObservations))
for _, obs := range req.StorageChallengeObservations {
if obs == nil {
Expand All @@ -118,9 +122,7 @@ func (m msgServer) SubmitEpochReport(ctx context.Context, req *types.MsgSubmitEp
return nil, errorsmod.Wrapf(types.ErrInvalidPortStatesLength, "port_states length %d does not match required_open_ports length %d", len(obs.PortStates), requiredPortsLen)
}
}
if len(seenTargets) != len(allowedTargets) {
return nil, errorsmod.Wrap(types.ErrInvalidPeerObservations, "peer observations do not cover all assigned targets")
}
incompleteReport = len(seenTargets) < len(allowedTargets)
}
// Per PR #118 / Zee F2 — cap storage proof results to bound processing cost.
if len(req.StorageProofResults) > types.MaxStorageProofResultsPerReport {
Expand Down Expand Up @@ -178,6 +180,40 @@ func (m msgServer) SubmitEpochReport(ctx context.Context, req *types.MsgSubmitEp
if err := m.applyStorageTruthScores(sdkCtx, req.EpochId, reporterAccount, req.StorageProofResults); err != nil {
return nil, err
}
if incompleteReport {
if err := m.applyIncompleteReportPenalty(sdkCtx, req.EpochId, reporterAccount, assignParams); err != nil {
return nil, err
}
}

return &types.MsgSubmitEpochReportResponse{}, nil
}

func (k Keeper) applyIncompleteReportPenalty(ctx sdk.Context, epochID uint64, reporterAccount string, params types.Params) error {
state, updated, err := k.applyReporterReliabilityDelta(
ctx,
epochID,
reporterAccount,
incompleteReportReporterReliabilityPenalty,
params.StorageTruthReporterReliabilityDecayPerEpoch,
0,
params,
)
if err != nil {
return err
}
if !updated {
return nil
}

ctx.EventManager().EmitEvent(sdk.NewEvent(
types.EventTypeStorageTruthScoreUpdated,
sdk.NewAttribute(sdk.AttributeKeyModule, types.ModuleName),
sdk.NewAttribute(types.AttributeKeyEpochID, strconv.FormatUint(epochID, 10)),
sdk.NewAttribute(types.AttributeKeyReporterSupernodeAccount, reporterAccount),
sdk.NewAttribute(types.AttributeKeyReporterReliabilityScore, strconv.FormatInt(state.ReliabilityScore, 10)),
sdk.NewAttribute(types.AttributeKeyReporterTrustBand, state.TrustBand.String()),
sdk.NewAttribute("reason", incompleteReportReason),
))
return nil
}
207 changes: 207 additions & 0 deletions x/audit/v1/keeper/msg_submit_epoch_report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,213 @@ func TestSubmitEpochReport_ValidatesInboundPortStatesLength(t *testing.T) {
require.Error(t, err)
}

func TestSubmitEpochReport_AcceptsPartialStorageChallengeObservationsAndPenalizesReporter(t *testing.T) {
f := initFixture(t)
f.ctx = f.ctx.WithBlockHeight(1).WithEventManager(sdk.NewEventManager())

params := types.DefaultParams().WithDefaults()
params.StorageTruthEnforcementMode = types.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED
require.NoError(t, f.keeper.SetParams(f.ctx, params))

ms := keeper.NewMsgServerImpl(f.keeper)

reporter := "sn-aaa-reporter"
target1 := "sn-bbb-target"
target2 := "sn-ccc-target"
target3 := "sn-ddd-target"
activeAndTargets := []string{reporter, target1, target2, target3}

f.supernodeKeeper.EXPECT().
GetSuperNodeByAccount(gomock.Any(), reporter).
Return(sntypes.SuperNode{}, true, nil).
AnyTimes()

seedEpochAnchorForReportTest(t, f, 0, activeAndTargets, activeAndTargets)

portStates := make([]types.PortState, len(types.DefaultRequiredOpenPorts))
for i := range portStates {
portStates[i] = types.PortState_PORT_STATE_OPEN
}

_, err := ms.SubmitEpochReport(f.ctx, &types.MsgSubmitEpochReport{
Creator: reporter,
EpochId: 0,
HostReport: types.HostReport{
InboundPortStates: portStates,
},
StorageChallengeObservations: []*types.StorageChallengeObservation{
{TargetSupernodeAccount: target1, PortStates: portStates},
{TargetSupernodeAccount: target2, PortStates: portStates},
},
})
require.NoError(t, err)

report, found := f.keeper.GetReport(f.ctx, 0, reporter)
require.True(t, found)
require.Len(t, report.StorageChallengeObservations, 2)

state, found := f.keeper.GetReporterReliabilityState(f.ctx, reporter)
require.True(t, found)
require.Equal(t, int64(8), state.ReliabilityScore)
require.Equal(t, uint64(0), state.LastUpdatedEpoch)
require.Equal(t, types.ReporterTrustBand_REPORTER_TRUST_BAND_NORMAL, state.TrustBand)

var foundReason bool
for _, event := range f.ctx.EventManager().Events() {
if event.Type != types.EventTypeStorageTruthScoreUpdated {
continue
}
attrs := make(map[string]string, len(event.Attributes))
for _, attr := range event.Attributes {
attrs[string(attr.Key)] = string(attr.Value)
}
if attrs["reason"] == "INCOMPLETE_REPORT" {
foundReason = true
require.Equal(t, reporter, attrs[types.AttributeKeyReporterSupernodeAccount])
require.Equal(t, "8", attrs[types.AttributeKeyReporterReliabilityScore])
}
}
require.True(t, foundReason, "expected INCOMPLETE_REPORT scoring event")
}

func TestSubmitEpochReport_FullStorageChallengeCoverageDoesNotPenalizeReporter(t *testing.T) {
f := initFixture(t)
f.ctx = f.ctx.WithBlockHeight(1).WithEventManager(sdk.NewEventManager())

params := types.DefaultParams().WithDefaults()
params.StorageTruthEnforcementMode = types.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED
require.NoError(t, f.keeper.SetParams(f.ctx, params))

ms := keeper.NewMsgServerImpl(f.keeper)

reporter := "sn-aaa-reporter"
target1 := "sn-bbb-target"
target2 := "sn-ccc-target"
target3 := "sn-ddd-target"
activeAndTargets := []string{reporter, target1, target2, target3}

f.supernodeKeeper.EXPECT().
GetSuperNodeByAccount(gomock.Any(), reporter).
Return(sntypes.SuperNode{}, true, nil).
AnyTimes()

seedEpochAnchorForReportTest(t, f, 0, activeAndTargets, activeAndTargets)

portStates := make([]types.PortState, len(types.DefaultRequiredOpenPorts))
for i := range portStates {
portStates[i] = types.PortState_PORT_STATE_OPEN
}

_, err := ms.SubmitEpochReport(f.ctx, &types.MsgSubmitEpochReport{
Creator: reporter,
EpochId: 0,
HostReport: types.HostReport{
InboundPortStates: portStates,
},
StorageChallengeObservations: []*types.StorageChallengeObservation{
{TargetSupernodeAccount: target1, PortStates: portStates},
{TargetSupernodeAccount: target2, PortStates: portStates},
{TargetSupernodeAccount: target3, PortStates: portStates},
},
})
require.NoError(t, err)

_, found := f.keeper.GetReporterReliabilityState(f.ctx, reporter)
require.False(t, found, "complete peer-observation coverage must not create reporter reliability penalty state")
for _, event := range f.ctx.EventManager().Events() {
if event.Type != types.EventTypeStorageTruthScoreUpdated {
continue
}
for _, attr := range event.Attributes {
require.NotEqual(t, "INCOMPLETE_REPORT", string(attr.Value))
}
}
}

func TestSubmitEpochReport_StillRejectsInvalidStorageChallengeObservations(t *testing.T) {
testCases := []struct {
name string
observations func(portStates []types.PortState) []*types.StorageChallengeObservation
wantErr error
wantSubstring string
}{
{
name: "nil observation",
observations: func(_ []types.PortState) []*types.StorageChallengeObservation {
return []*types.StorageChallengeObservation{nil}
},
wantErr: types.ErrInvalidPeerObservations,
wantSubstring: "nil storage challenge observation",
},
{
name: "unassigned target",
observations: func(portStates []types.PortState) []*types.StorageChallengeObservation {
return []*types.StorageChallengeObservation{{TargetSupernodeAccount: "sn-zzz-unassigned", PortStates: portStates}}
},
wantErr: types.ErrInvalidPeerObservations,
wantSubstring: "is not assigned to reporter",
},
{
name: "duplicate target",
observations: func(portStates []types.PortState) []*types.StorageChallengeObservation {
return []*types.StorageChallengeObservation{
{TargetSupernodeAccount: "sn-bbb-target", PortStates: portStates},
{TargetSupernodeAccount: "sn-bbb-target", PortStates: portStates},
}
},
wantErr: types.ErrInvalidPeerObservations,
wantSubstring: "duplicate storage challenge observation",
},
{
name: "wrong port-state length",
observations: func(_ []types.PortState) []*types.StorageChallengeObservation {
return []*types.StorageChallengeObservation{{TargetSupernodeAccount: "sn-bbb-target", PortStates: []types.PortState{types.PortState_PORT_STATE_OPEN}}}
},
wantErr: types.ErrInvalidPortStatesLength,
wantSubstring: "port_states length",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
f := initFixture(t)
f.ctx = f.ctx.WithBlockHeight(1)

params := types.DefaultParams().WithDefaults()
params.StorageTruthEnforcementMode = types.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED
require.NoError(t, f.keeper.SetParams(f.ctx, params))

ms := keeper.NewMsgServerImpl(f.keeper)
reporter := "sn-aaa-reporter"
activeAndTargets := []string{reporter, "sn-bbb-target", "sn-ccc-target", "sn-ddd-target"}

f.supernodeKeeper.EXPECT().
GetSuperNodeByAccount(gomock.Any(), reporter).
Return(sntypes.SuperNode{}, true, nil).
AnyTimes()

seedEpochAnchorForReportTest(t, f, 0, activeAndTargets, activeAndTargets)

portStates := make([]types.PortState, len(types.DefaultRequiredOpenPorts))
for i := range portStates {
portStates[i] = types.PortState_PORT_STATE_OPEN
}

_, err := ms.SubmitEpochReport(f.ctx, &types.MsgSubmitEpochReport{
Creator: reporter,
EpochId: 0,
HostReport: types.HostReport{
InboundPortStates: portStates,
},
StorageChallengeObservations: tc.observations(portStates),
})
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr.Error())
require.Contains(t, err.Error(), tc.wantSubstring)
})
}
}

func TestSubmitEpochReport_PersistsStorageProofResults(t *testing.T) {
f := initFixture(t)
f.ctx = f.ctx.WithBlockHeight(1)
Expand Down
Loading