-
Notifications
You must be signed in to change notification settings - Fork 248
feat(syncer): verify force inclusion for p2p blocks #2963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0b04da3
044903f
2c9a212
e2b0520
aaacbde
74be668
b9b5f5b
e3336ce
c40b96b
f3fb315
56c278f
413b40d
a585190
d09c8ab
f0a505f
4ecf0a0
6c85d8d
7abfecc
e593848
993c2b1
8b99828
544c0b9
e1446a3
570ac06
1f6c405
4907c92
6e4554d
6962e6f
158bcad
66b6db8
94fe911
a20e483
69bf91b
b8ec42f
eb5aff5
9c95101
19835e8
c6e5696
b7b2100
7b1c802
14b83ff
3a2ba3f
c9d3251
fcb3e75
1b3802e
aac3f47
68b3bcb
1b394e0
4f4ad36
36526ec
025ee3f
3abb188
0298cea
4905677
d6ba01d
f662a50
0a8e542
a729ac2
f728ec2
956c898
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -114,6 +114,7 @@ type Syncer struct { | |||||||||||||||||||||
| gracePeriodMultiplier *atomic.Pointer[float64] | ||||||||||||||||||||||
| blockFullnessEMA *atomic.Pointer[float64] | ||||||||||||||||||||||
| gracePeriodConfig forcedInclusionGracePeriodConfig | ||||||||||||||||||||||
| p2pHeightHints map[uint64]uint64 // map[height]daHeight | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Lifecycle | ||||||||||||||||||||||
| ctx context.Context | ||||||||||||||||||||||
|
|
@@ -183,6 +184,7 @@ func NewSyncer( | |||||||||||||||||||||
| gracePeriodMultiplier: gracePeriodMultiplier, | ||||||||||||||||||||||
| blockFullnessEMA: blockFullnessEMA, | ||||||||||||||||||||||
| gracePeriodConfig: newForcedInclusionGracePeriodConfig(), | ||||||||||||||||||||||
| p2pHeightHints: make(map[uint64]uint64), | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| s.blockSyncer = s | ||||||||||||||||||||||
| if raftNode != nil && !reflect.ValueOf(raftNode).IsNil() { | ||||||||||||||||||||||
|
|
@@ -654,6 +656,7 @@ func (s *Syncer) processHeightEvent(ctx context.Context, event *common.DAHeightE | |||||||||||||||||||||
| Msg("P2P event with DA height hint, queuing priority DA retrieval") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Queue priority DA retrieval - will be processed in fetchDAUntilCaughtUp | ||||||||||||||||||||||
| s.p2pHeightHints[height] = daHeightHint | ||||||||||||||||||||||
| s.daRetriever.QueuePriorityHeight(daHeightHint) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
@@ -733,13 +736,17 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve | |||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Verify forced inclusion transactions if configured | ||||||||||||||||||||||
| if event.Source == common.SourceDA { | ||||||||||||||||||||||
| if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil { | ||||||||||||||||||||||
| s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") | ||||||||||||||||||||||
| if errors.Is(err, errMaliciousProposer) { | ||||||||||||||||||||||
| s.cache.RemoveHeaderDAIncluded(headerHash) | ||||||||||||||||||||||
| return err | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| currentDaHeight, ok := s.p2pHeightHints[nextHeight] | ||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||
| currentDaHeight = currentState.DAHeight | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| delete(s.p2pHeightHints, nextHeight) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| if err := s.VerifyForcedInclusionTxs(ctx, currentDaHeight, data); err != nil { | ||||||||||||||||||||||
| s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") | ||||||||||||||||||||||
| if errors.Is(err, errMaliciousProposer) { | ||||||||||||||||||||||
| s.cache.RemoveHeaderDAIncluded(headerHash) | ||||||||||||||||||||||
| return err | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
Comment on lines
+746
to
750
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Failure to verify forced inclusion transactions should not be ignored. Currently, only
Suggested change
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. current behavior is correct. a fetch issue due to the sync node have issues or the da layer down should not crash the node. |
||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -777,6 +784,22 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve | |||||||||||||||||||||
| return fmt.Errorf("failed to commit batch: %w", err) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Persist DA height mapping for blocks synced from DA | ||||||||||||||||||||||
| // This ensures consistency with the sequencer's submitter which also persists this mapping | ||||||||||||||||||||||
| // Note: P2P hints are already persisted via store_adapter.Append when items have DAHint set | ||||||||||||||||||||||
| // But DaHeight from events always take precedence as they are authoritative (comes from DA) | ||||||||||||||||||||||
| if event.DaHeight > 0 { | ||||||||||||||||||||||
| daHeightBytes := make([]byte, 8) | ||||||||||||||||||||||
| binary.LittleEndian.PutUint64(daHeightBytes, event.DaHeight) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(nextHeight), daHeightBytes); err != nil { | ||||||||||||||||||||||
| s.logger.Warn().Err(err).Uint64("height", nextHeight).Msg("failed to persist header DA height mapping") | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(nextHeight), daHeightBytes); err != nil { | ||||||||||||||||||||||
| s.logger.Warn().Err(err).Uint64("height", nextHeight).Msg("failed to persist data DA height mapping") | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Update in-memory state after successful commit | ||||||||||||||||||||||
| s.SetLastState(newState) | ||||||||||||||||||||||
| s.metrics.Height.Set(float64(newState.LastBlockHeight)) | ||||||||||||||||||||||
|
|
@@ -952,7 +975,7 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 { | |||||||||||||||||||||
| // Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions | ||||||||||||||||||||||
| // to future blocks (smoothing). This is legitimate behavior within an epoch. | ||||||||||||||||||||||
| // However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later). | ||||||||||||||||||||||
| func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { | ||||||||||||||||||||||
| func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { | ||||||||||||||||||||||
| if s.fiRetriever == nil { | ||||||||||||||||||||||
| return nil | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
@@ -962,7 +985,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type | |||||||||||||||||||||
| s.updateDynamicGracePeriod(blockFullness) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Retrieve forced inclusion transactions from DA for current epoch | ||||||||||||||||||||||
| forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentState.DAHeight) | ||||||||||||||||||||||
| forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, daHeight) | ||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||
| if errors.Is(err, da.ErrForceInclusionNotConfigured) { | ||||||||||||||||||||||
| s.logger.Debug().Msg("forced inclusion namespace not configured, skipping verification") | ||||||||||||||||||||||
|
|
@@ -1049,10 +1072,10 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type | |||||||||||||||||||||
| effectiveGracePeriod := s.getEffectiveGracePeriod() | ||||||||||||||||||||||
| graceBoundary := pending.EpochEnd + (effectiveGracePeriod * s.genesis.DAEpochForcedInclusion) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if currentState.DAHeight > graceBoundary { | ||||||||||||||||||||||
| if daHeight > graceBoundary { | ||||||||||||||||||||||
| maliciousTxs = append(maliciousTxs, pending) | ||||||||||||||||||||||
| s.logger.Warn(). | ||||||||||||||||||||||
| Uint64("current_da_height", currentState.DAHeight). | ||||||||||||||||||||||
| Uint64("current_da_height", daHeight). | ||||||||||||||||||||||
| Uint64("epoch_end", pending.EpochEnd). | ||||||||||||||||||||||
| Uint64("grace_boundary", graceBoundary). | ||||||||||||||||||||||
| Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). | ||||||||||||||||||||||
|
|
@@ -1062,7 +1085,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type | |||||||||||||||||||||
| Msg("forced inclusion transaction past grace boundary - marking as malicious") | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| remainingPending = append(remainingPending, pending) | ||||||||||||||||||||||
| if currentState.DAHeight > pending.EpochEnd { | ||||||||||||||||||||||
| if daHeight > pending.EpochEnd { | ||||||||||||||||||||||
| txsInGracePeriod++ | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
@@ -1086,7 +1109,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type | |||||||||||||||||||||
| effectiveGracePeriod := s.getEffectiveGracePeriod() | ||||||||||||||||||||||
| s.logger.Error(). | ||||||||||||||||||||||
| Uint64("height", data.Height()). | ||||||||||||||||||||||
| Uint64("current_da_height", currentState.DAHeight). | ||||||||||||||||||||||
| Uint64("current_da_height", daHeight). | ||||||||||||||||||||||
| Int("malicious_count", len(maliciousTxs)). | ||||||||||||||||||||||
| Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). | ||||||||||||||||||||||
| Uint64("effective_grace_periods", effectiveGracePeriod). | ||||||||||||||||||||||
|
|
@@ -1106,7 +1129,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| s.logger.Info(). | ||||||||||||||||||||||
| Uint64("height", data.Height()). | ||||||||||||||||||||||
| Uint64("da_height", currentState.DAHeight). | ||||||||||||||||||||||
| Uint64("da_height", daHeight). | ||||||||||||||||||||||
| Uint64("epoch_start", forcedIncludedTxsEvent.StartDaHeight). | ||||||||||||||||||||||
| Uint64("epoch_end", forcedIncludedTxsEvent.EndDaHeight). | ||||||||||||||||||||||
| Int("included_count", includedCount). | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowing #2891 (comment), this is actually not the right solution for P2P.