Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 105 additions & 13 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"errors"
"fmt"
"net/url"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -96,12 +97,33 @@ const (

// FlagDAFiberEnabled enables the Fiber DA client instead of the default JSON-RPC blob client
FlagDAFiberEnabled = FlagPrefixEvnode + "da.fiber.enabled"
// FlagDAFiberStateAddress is the gRPC address of the celestia-app node for Fiber state queries
FlagDAFiberStateAddress = FlagPrefixEvnode + "da.fiber.state_address"
// FlagDAFiberBridgeAddress is the gRPC address of the bridge node for Fiber state queries
// FlagDAFiberConsensusAddress is the gRPC address of the celestia-app (consensus)
// node. Feeds both the shared TxClient/CoreAccessor gRPC conn and
// appfibre.Client's internal state-client dial target — they're the same
// physical endpoint.
FlagDAFiberConsensusAddress = FlagPrefixEvnode + "da.fiber.consensus_address"
// FlagDAFiberConsensusTLS enables TLS for the consensus gRPC connection.
FlagDAFiberConsensusTLS = FlagPrefixEvnode + "da.fiber.consensus_tls"
// FlagDAFiberConsensusAuthToken is the auth token (x-token) for the
// consensus gRPC endpoint. Kept separate from bridge_auth_token because
// consensus providers typically issue different credentials from the
// bridge JWT.
FlagDAFiberConsensusAuthToken = FlagPrefixEvnode + "da.fiber.consensus_auth_token"
// FlagDAFiberBridgeAddress is the URL of the celestia-node bridge for
// JSON-RPC access. Must use ws:// or wss:// because blob.Subscribe is a
// channel-returning RPC and only works over WebSocket.
FlagDAFiberBridgeAddress = FlagPrefixEvnode + "da.fiber.bridge_address"
// FlagDAFiberBridgeAuthToken is the JWT for the bridge node, passed as
// Authorization: Bearer <token>.
FlagDAFiberBridgeAuthToken = FlagPrefixEvnode + "da.fiber.bridge_auth_token"
// FlagDAFiberKeyName is the key name in the keyring to use for signing payment promises
FlagDAFiberKeyName = FlagPrefixEvnode + "da.fiber.key_name"
// FlagDAFiberUploadConcurrency caps concurrent FSP upload connections.
// 0 keeps the appfibre.Client default (ProtocolParams.MaxValidatorCount).
FlagDAFiberUploadConcurrency = FlagPrefixEvnode + "da.fiber.upload_concurrency"
// FlagDAFiberDownloadConcurrency caps concurrent FSP download connections.
// 0 keeps the appfibre.Client default (ProtocolParams.ValidatorsForReconstruction()).
FlagDAFiberDownloadConcurrency = FlagPrefixEvnode + "da.fiber.download_concurrency"

// P2P configuration flags

Expand Down Expand Up @@ -285,21 +307,82 @@ type DAConfig struct {
// FiberDAConfig contains configuration for the Fiber DA client.
// When Enabled is true, the Fiber client is used instead of the default
// JSON-RPC blob client for DA operations.
//
// Two physical endpoints are addressed:
// - ConsensusAddress: celestia-app gRPC, used for tx broadcasts (MsgPayForFibre,
// Deposit/Withdraw) and state queries (validator set, chain-id, escrow).
// Internally feeds both CoreGRPCConfig.Addr and appfibre.Client.StateAddress.
// - BridgeAddress: celestia-node JSON-RPC, used for blob.Subscribe (Listen
// path) and as a read-only fallback for Fibre APIs.
type FiberDAConfig struct {
// Enabled switches the DA backend from the default JSON-RPC blob client
// to the Fiber protocol client.
Enabled bool `mapstructure:"enabled" yaml:"enabled" comment:"Enable the Fiber DA client for direct validator communication instead of the default JSON-RPC blob client"`
// StateAddress is the gRPC address of the celestia-app node used for
// state queries (validator set, chain ID, promise verification).
StateAddress string `mapstructure:"state_address" yaml:"state_address" comment:"gRPC address of the celestia-app node for Fiber state queries (host:port)"`
// BridgeAddress is the address of the bridge node.
BridgeAddress string `mapstructure:"bridge_address" yaml:"bridge_address" comment:"Bridge Node Address for Fiber"`

// ConsensusAddress is the gRPC address of the celestia-app node. Used
// for tx broadcasts (MsgPayForFibre, Deposit, Withdraw) and for
// appfibre.Client's state queries (validator set, chain-id, promise
// verification). host:port.
ConsensusAddress string `mapstructure:"consensus_address" yaml:"consensus_address" comment:"gRPC address of the celestia-app node used for tx broadcasts and Fiber state queries (host:port)"`
// ConsensusTLS enables TLS for the consensus gRPC connection. Required
// when talking to a TLS-terminated consensus endpoint.
ConsensusTLS bool `mapstructure:"consensus_tls" yaml:"consensus_tls" comment:"Enable TLS for the consensus gRPC connection"`
// ConsensusAuthToken is the auth token (x-token header) for the
// consensus gRPC endpoint. Empty disables header injection.
ConsensusAuthToken string `mapstructure:"consensus_auth_token" yaml:"consensus_auth_token" comment:"x-token auth for the consensus gRPC endpoint"`

// BridgeAddress is the URL of the celestia-node bridge for JSON-RPC
// access. Must use ws:// or wss:// — blob.Subscribe is a
// channel-returning RPC and only works over WebSocket.
BridgeAddress string `mapstructure:"bridge_address" yaml:"bridge_address" comment:"celestia-node bridge URL; must be ws:// or wss:// (blob.Subscribe needs WebSocket)"`
// BridgeAuthToken is the JWT for the bridge, sent as
// Authorization: Bearer <token>.
BridgeAuthToken string `mapstructure:"bridge_auth_token" yaml:"bridge_auth_token" comment:"JWT for the celestia-node bridge (Authorization: Bearer header)"`

// KeyringPath is the directory path containing the keyring for signing
// Fiber payment promises.
KeyringPath string `mapstructure:"keyring_path" yaml:"keyring_path" comment:"Path to the keyring directory for Fiber payment promise signing"`
// Fiber payment promises. Consumers (e.g. risotto) may ignore this and
// derive the path from their own RootDir; documented for completeness.
KeyringPath string `mapstructure:"keyring_path" yaml:"keyring_path" comment:"Path to the keyring directory for Fiber payment promise signing (optional; consumers may derive from RootDir)"`
// KeyName is the name of the key in the keyring to use for signing.
KeyName string `mapstructure:"key_name" yaml:"key_name" comment:"Name of the key in the keyring to use for signing Fiber payment promises"`
// UploadConcurrency limits the number of concurrent upload connections

// UploadConcurrency caps concurrent FSP upload connections. 0 keeps
// appfibre.Client's default (ProtocolParams.MaxValidatorCount).
UploadConcurrency int `mapstructure:"upload_concurrency" yaml:"upload_concurrency" comment:"Max concurrent FSP upload connections; 0 uses appfibre.Client's protocol default"`
// DownloadConcurrency caps concurrent FSP download connections. 0 keeps
// appfibre.Client's default (ProtocolParams.ValidatorsForReconstruction).
DownloadConcurrency int `mapstructure:"download_concurrency" yaml:"download_concurrency" comment:"Max concurrent FSP download connections; 0 uses appfibre.Client's protocol default"`
}

// Validate checks that a FiberDAConfig is usable. Only called when enabled;
// a disabled block is always valid because the Fibre client is not built.
func (c *FiberDAConfig) Validate() error {
if !c.Enabled {
return nil
}
if c.ConsensusAddress == "" {
return fmt.Errorf("%s is required when fiber DA is enabled", FlagDAFiberConsensusAddress)
}
if c.BridgeAddress == "" {
return fmt.Errorf("%s is required when fiber DA is enabled", FlagDAFiberBridgeAddress)
}
u, err := url.Parse(c.BridgeAddress)
if err != nil {
return fmt.Errorf("%s: %w", FlagDAFiberBridgeAddress, err)
}
if u.Scheme != "ws" && u.Scheme != "wss" {
return fmt.Errorf(
"%s must use ws:// or wss:// (got %q) — blob.Subscribe requires WebSocket, HTTP JSON-RPC cannot stream channels",
FlagDAFiberBridgeAddress, u.Scheme,
)
}
if c.UploadConcurrency < 0 {
return fmt.Errorf("%s must be non-negative", FlagDAFiberUploadConcurrency)
}
if c.DownloadConcurrency < 0 {
return fmt.Errorf("%s must be non-negative", FlagDAFiberDownloadConcurrency)
}
return nil
}

// IsFiberEnabled returns true if the Fiber DA client is configured and enabled.
Expand Down Expand Up @@ -578,6 +661,10 @@ func (c *Config) Validate() error {
return err
}

if err := c.DA.Fiber.Validate(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -661,9 +748,14 @@ func AddFlags(cmd *cobra.Command) {

// Fiber DA configuration flags
cmd.Flags().Bool(FlagDAFiberEnabled, def.DA.Fiber.Enabled, "enable the Fiber DA client for direct validator communication")
cmd.Flags().String(FlagDAFiberStateAddress, def.DA.Fiber.StateAddress, "gRPC address of the celestia-app node for Fiber state queries (host:port)")
cmd.Flags().String(FlagDAFiberBridgeAddress, def.DA.Fiber.BridgeAddress, "json rpc of the bridge node")
cmd.Flags().String(FlagDAFiberConsensusAddress, def.DA.Fiber.ConsensusAddress, "gRPC address of the celestia-app node used for tx broadcasts and Fiber state queries (host:port)")
cmd.Flags().Bool(FlagDAFiberConsensusTLS, def.DA.Fiber.ConsensusTLS, "enable TLS for the consensus gRPC connection")
cmd.Flags().String(FlagDAFiberConsensusAuthToken, def.DA.Fiber.ConsensusAuthToken, "x-token auth for the consensus gRPC endpoint")
cmd.Flags().String(FlagDAFiberBridgeAddress, def.DA.Fiber.BridgeAddress, "celestia-node bridge URL; must be ws:// or wss:// (blob.Subscribe needs WebSocket)")
cmd.Flags().String(FlagDAFiberBridgeAuthToken, def.DA.Fiber.BridgeAuthToken, "JWT for the celestia-node bridge (Authorization: Bearer header)")
cmd.Flags().String(FlagDAFiberKeyName, def.DA.Fiber.KeyName, "name of the key in the keyring for signing Fiber payment promises")
cmd.Flags().Int(FlagDAFiberUploadConcurrency, def.DA.Fiber.UploadConcurrency, "max concurrent FSP upload connections; 0 uses the appfibre protocol default")
cmd.Flags().Int(FlagDAFiberDownloadConcurrency, def.DA.Fiber.DownloadConcurrency, "max concurrent FSP download connections; 0 uses the appfibre protocol default")

// P2P configuration flags
cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)")
Expand Down
10 changes: 8 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ func TestAddFlags(t *testing.T) {

// DA Fiber flags
assertFlagValue(t, flags, FlagDAFiberEnabled, DefaultConfig().DA.Fiber.Enabled)
assertFlagValue(t, flags, FlagDAFiberStateAddress, DefaultConfig().DA.Fiber.StateAddress)
assertFlagValue(t, flags, FlagDAFiberConsensusAddress, DefaultConfig().DA.Fiber.ConsensusAddress)
assertFlagValue(t, flags, FlagDAFiberConsensusTLS, DefaultConfig().DA.Fiber.ConsensusTLS)
assertFlagValue(t, flags, FlagDAFiberConsensusAuthToken, DefaultConfig().DA.Fiber.ConsensusAuthToken)
assertFlagValue(t, flags, FlagDAFiberBridgeAddress, DefaultConfig().DA.Fiber.BridgeAddress)
assertFlagValue(t, flags, FlagDAFiberBridgeAuthToken, DefaultConfig().DA.Fiber.BridgeAuthToken)
assertFlagValue(t, flags, FlagDAFiberKeyName, DefaultConfig().DA.Fiber.KeyName)
assertFlagValue(t, flags, FlagDAFiberUploadConcurrency, DefaultConfig().DA.Fiber.UploadConcurrency)
assertFlagValue(t, flags, FlagDAFiberDownloadConcurrency, DefaultConfig().DA.Fiber.DownloadConcurrency)

// P2P flags
assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig().P2P.ListenAddress)
Expand Down Expand Up @@ -153,7 +159,7 @@ func TestAddFlags(t *testing.T) {
assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration)

// Count the number of flags we're explicitly checking
expectedFlagCount := 84 // Update this number if you add more flag checks above
expectedFlagCount := 91 // Update this number if you add more flag checks above

// Get the actual number of flags (both regular and persistent)
actualFlagCount := 0
Expand Down
13 changes: 9 additions & 4 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ func DefaultConfig() Config {
BatchMaxDelay: DurationWrapper{0}, // 0 means use DA BlockTime
BatchMinItems: 1,
Fiber: FiberDAConfig{
Enabled: false,
StateAddress: "127.0.0.1:9090",
BridgeAddress: "",
KeyName: "default-fibre",
Enabled: false,
ConsensusAddress: "127.0.0.1:9090",
ConsensusTLS: false,
ConsensusAuthToken: "",
BridgeAddress: "",
BridgeAuthToken: "",
KeyName: "default-fibre",
UploadConcurrency: 0,
DownloadConcurrency: 0,
},
},
Instrumentation: DefaultInstrumentationConfig(),
Expand Down
Loading