Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046))
- **BREAKING** Make pending events cache and tx cache fully ephemeral. Those will be re-fetched on restart. DA Inclusion cache persists until cleared up after DA inclusion has been processed. Persist accross restart using store metadata. ([#3047](https://github.com/evstack/ev-node/pull/3047))
- Replace LRU cache by standard mem cache with manual eviction in `store_adapter`. When P2P blocks were fetched too fast, they would be evicted before being executed [#3051](https://github.com/evstack/ev-node/pull/3051)

## v1.0.0-rc.2

Expand All @@ -25,7 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Improve cache handling when there is a significant backlog of pending headers and data. ([#3030](https://github.com/evstack/ev-node/pull/3030))
- Decrease MaxBytesSize to `5MB` to increase compatibility with public nodes. ([#3030](https://github.com/evstack/ev-node/pull/3030))
- Proper counting of `DASubmitterPendingBlobs` metrics. [#3038](https://github.com/evstack/ev-node/pull/3038)
- Replace `go-header` store by `ev-node` store. This avoid duplication of all blocks in `go-header` and `ev-node` store. Thanks to the cached store from #3030, this should improve p2p performance as well.
- Replace `go-header` store by `ev-node` store. This avoid duplication of all blocks in `go-header` and `ev-node` store. Thanks to the cached store from #3030, this should improve p2p performance as well. [#3036](https://github.com/evstack/ev-node/pull/3036)

## v1.0.0-rc.1

Expand Down
38 changes: 24 additions & 14 deletions block/internal/cache/generic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"encoding/binary"
"fmt"
"strings"
"sync"

"sync/atomic"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/rs/zerolog/log"

"github.com/evstack/ev-node/pkg/store"
)
Expand Down Expand Up @@ -215,32 +217,40 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) {

// RestoreFromStore loads DA inclusion data from the store into the in-memory cache.
// This should be called during initialization to restore persisted state.
// It iterates through store metadata keys with the cache's prefix and populates the LRU cache.
func (c *Cache[T]) RestoreFromStore(ctx context.Context, hashes []string) error {
if c.store == nil {
return nil // No store configured, nothing to restore
// It directly queries store metadata keys with the cache's prefix, avoiding iteration through all blocks.
func (c *Cache[T]) RestoreFromStore(ctx context.Context) error {
if c.store == nil || c.storeKeyPrefix == "" {
return nil // No store configured or no prefix, nothing to restore
}

for _, hash := range hashes {
value, err := c.store.GetMetadata(ctx, c.storeKey(hash))
if err != nil {
// Key not found is not an error - the hash may not have been DA included yet
// Query all metadata entries with our prefix directly
entries, err := c.store.GetMetadataByPrefix(ctx, c.storeKeyPrefix)
if err != nil {
return fmt.Errorf("failed to query metadata by prefix %q: %w", c.storeKeyPrefix, err)
}

for _, entry := range entries {
// Extract the hash from the key by removing the prefix
hash := strings.TrimPrefix(entry.Key, c.storeKeyPrefix)
if hash == entry.Key || hash == "" {
// Prefix not found or empty hash - skip invalid entry
continue
}

daHeight, blockHeight, ok := decodeDAInclusion(value)
daHeight, blockHeight, ok := decodeDAInclusion(entry.Value)
if !ok {
continue // Invalid data, skip
log.Warn().
Str("key", entry.Key).
Int("value_len", len(entry.Value)).
Msg("skipping invalid DA inclusion entry during cache restore")
continue
}

c.daIncluded.Add(hash, daHeight)
c.hashByHeight.Add(blockHeight, hash)

// Update max DA height
current := c.maxDAHeight.Load()
if daHeight > current {
c.maxDAHeight.Store(daHeight)
}
c.setMaxDAHeight(daHeight)
}

return nil
Expand Down
7 changes: 3 additions & 4 deletions block/internal/cache/generic_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ func TestCache_MaxDAHeight_WithStore(t *testing.T) {
// Create new cache and restore from store
c2 := NewCache[testItem](st, "test/da-included/")

// Restore with the known hashes
err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"})
err = c2.RestoreFromStore(ctx)
require.NoError(t, err)

if got := c2.daHeight(); got != 200 {
Expand Down Expand Up @@ -106,7 +105,7 @@ func TestCache_WithStorePersistence(t *testing.T) {
// Create new cache with same store and restore
c2 := NewCache[testItem](st, "test/")

err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"})
err = c2.RestoreFromStore(ctx)
require.NoError(t, err)

// hash1 and hash2 should be restored, hash3 should not exist
Expand Down Expand Up @@ -263,7 +262,7 @@ func TestCache_SaveToStore(t *testing.T) {
// Verify data is in store by creating new cache and restoring
c2 := NewCache[testItem](st, "save-test/")

err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2"})
err = c2.RestoreFromStore(ctx)
require.NoError(t, err)

daHeight, ok := c2.getDAIncluded("hash1")
Expand Down
40 changes: 6 additions & 34 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,54 +342,26 @@ func (m *implementation) SaveToStore() error {
}

// RestoreFromStore restores the DA inclusion cache from the store.
// This iterates through blocks in the store and checks for persisted DA inclusion data.
// This uses prefix-based queries to directly load persisted DA inclusion data,
// avoiding expensive iteration through all blocks.
func (m *implementation) RestoreFromStore() error {
ctx := context.Background()

// Get current store height to know how many blocks to check
height, err := m.store.Height(ctx)
if err != nil {
return fmt.Errorf("failed to get store height: %w", err)
}

if height == 0 {
return nil // No blocks to restore
}

// Collect hashes from stored blocks
var headerHashes []string
var dataHashes []string

for h := uint64(1); h <= height; h++ {
header, data, err := m.store.GetBlockData(ctx, h)
if err != nil {
m.logger.Warn().Uint64("height", h).Err(err).Msg("failed to get block data during cache restore")
continue
}

if header != nil {
headerHashes = append(headerHashes, header.Hash().String())
}
if data != nil {
dataHashes = append(dataHashes, data.DACommitment().String())
}
}

// Restore DA inclusion data from store
if err := m.headerCache.RestoreFromStore(ctx, headerHashes); err != nil {
if err := m.headerCache.RestoreFromStore(ctx); err != nil {
return fmt.Errorf("failed to restore header cache from store: %w", err)
}

if err := m.dataCache.RestoreFromStore(ctx, dataHashes); err != nil {
if err := m.dataCache.RestoreFromStore(ctx); err != nil {
return fmt.Errorf("failed to restore data cache from store: %w", err)
}

// Initialize DA height from store metadata to ensure DaHeight() is never 0.
m.initDAHeightFromStore(ctx)

m.logger.Info().
Int("header_hashes", len(headerHashes)).
Int("data_hashes", len(dataHashes)).
Int("header_entries", m.headerCache.daIncluded.Len()).
Int("data_entries", m.dataCache.daIncluded.Len()).
Uint64("da_height", m.DaHeight()).
Msg("restored DA inclusion cache from store")

Expand Down
12 changes: 6 additions & 6 deletions pkg/store/data_store_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func TestDataStoreAdapter_NewDataStoreAdapter(t *testing.T) {
// Initially, height should be 0
assert.Equal(t, uint64(0), adapter.Height())

// Head should return ErrNotFound when empty
// Head should return ErrEmptyStore when empty
_, err = adapter.Head(ctx)
assert.ErrorIs(t, err, header.ErrNotFound)
assert.ErrorIs(t, err, header.ErrEmptyStore)
}

func TestDataStoreAdapter_AppendAndRetrieve(t *testing.T) {
Expand Down Expand Up @@ -289,9 +289,9 @@ func TestDataStoreAdapter_Tail(t *testing.T) {
store := New(ds)
adapter := NewDataStoreAdapter(store, testGenesisData())

// Tail on empty store should return ErrNotFound
// Tail on empty store should return ErrEmptyStore
_, err = adapter.Tail(ctx)
assert.ErrorIs(t, err, header.ErrNotFound)
assert.ErrorIs(t, err, header.ErrEmptyStore)

_, d1 := types.GetRandomBlock(1, 1, "test-chain")
_, d2 := types.GetRandomBlock(2, 1, "test-chain")
Expand Down Expand Up @@ -512,9 +512,9 @@ func TestDataStoreAdapter_InitWithNil(t *testing.T) {
err = adapter.Init(ctx, nil)
require.NoError(t, err)

// Should still return ErrNotFound
// Should still return ErrEmptyStore
_, err = adapter.Head(ctx)
assert.ErrorIs(t, err, header.ErrNotFound)
assert.ErrorIs(t, err, header.ErrEmptyStore)
}

func TestDataStoreAdapter_ContextTimeout(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/header_store_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func TestHeaderStoreAdapter_NewHeaderStoreAdapter(t *testing.T) {
// Initially, height should be 0
assert.Equal(t, uint64(0), adapter.Height())

// Head should return ErrNotFound when empty
// Head should return ErrEmptyStore when empty
_, err = adapter.Head(ctx)
assert.ErrorIs(t, err, header.ErrNotFound)
assert.ErrorIs(t, err, header.ErrEmptyStore)
}

func TestHeaderStoreAdapter_AppendAndRetrieve(t *testing.T) {
Expand Down Expand Up @@ -287,9 +287,9 @@ func TestHeaderStoreAdapter_Tail(t *testing.T) {
store := New(ds)
adapter := NewHeaderStoreAdapter(store, testGenesis())

// Tail on empty store should return ErrNotFound
// Tail on empty store should return ErrEmptyStore
_, err = adapter.Tail(ctx)
assert.ErrorIs(t, err, header.ErrNotFound)
assert.ErrorIs(t, err, header.ErrEmptyStore)

h1, _ := types.GetRandomBlock(1, 1, "test-chain")
h2, _ := types.GetRandomBlock(2, 1, "test-chain")
Expand Down Expand Up @@ -510,9 +510,9 @@ func TestHeaderStoreAdapter_InitWithNil(t *testing.T) {
err = adapter.Init(ctx, nil)
require.NoError(t, err)

// Should still return ErrNotFound
// Should still return ErrEmptyStore
_, err = adapter.Head(ctx)
assert.ErrorIs(t, err, header.ErrNotFound)
assert.ErrorIs(t, err, header.ErrEmptyStore)
}

func TestHeaderStoreAdapter_ContextTimeout(t *testing.T) {
Expand Down
36 changes: 36 additions & 0 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"encoding/binary"
"errors"
"fmt"
"strings"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"google.golang.org/protobuf/proto"

"github.com/evstack/ev-node/types"
Expand Down Expand Up @@ -190,6 +192,40 @@ func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, err
return data, nil
}

// GetMetadataByPrefix returns all metadata entries whose keys have the given prefix.
// This is more efficient than iterating through known keys when the set of keys is unknown.
func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) {
// The full key in the datastore includes the meta prefix
fullPrefix := getMetaKey(prefix)

results, err := s.db.Query(ctx, dsq.Query{Prefix: fullPrefix})
if err != nil {
return nil, fmt.Errorf("failed to query metadata with prefix '%s': %w", prefix, err)
}
defer results.Close()

var entries []MetadataEntry
for result := range results.Next() {
if result.Error != nil {
return nil, fmt.Errorf("error iterating metadata results: %w", result.Error)
}

// Extract the original key by removing the meta prefix
// The key from datastore is like "/m/cache/header-da-included/hash"
// We want to return "cache/header-da-included/hash"
metaKeyPrefix := getMetaKey("")
key := strings.TrimPrefix(result.Key, metaKeyPrefix)
key = strings.TrimPrefix(key, "/") // Remove leading slash for consistency

entries = append(entries, MetadataEntry{
Key: key,
Value: result.Value,
})
}

return entries, nil
}

// DeleteMetadata removes a metadata key from the store.
func (s *DefaultStore) DeleteMetadata(ctx context.Context, key string) error {
err := s.db.Delete(ctx, ds.NewKey(getMetaKey(key)))
Expand Down
Loading
Loading