Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions cmd/puniversald/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"encoding/json"
"fmt"
"path/filepath"

Expand Down Expand Up @@ -85,12 +84,6 @@ func startCmd() *cobra.Command {
return fmt.Errorf("failed to load config: %w", err)
}

configJSON, err := json.MarshalIndent(loadedCfg, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal config: %w", err)
}
fmt.Printf("\n=== Loaded Configuration ===\n%s\n===========================\n\n", string(configJSON))

ctx := context.Background()
client, err := core.NewUniversalClient(ctx, &loadedCfg)
if err != nil {
Expand Down
28 changes: 14 additions & 14 deletions universalClient/chains/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func (c *Chains) run(parent context.Context) {
for {
select {
case <-parent.Done():
c.logger.Info().Msg("chains: context canceled; stopping")
c.logger.Debug().Msg("context canceled; stopping")
return
case <-c.stopCh:
c.logger.Info().Msg("chains: stop requested; stopping")
c.logger.Debug().Msg("stop requested; stopping")
return
case <-ticker.C:
if err := c.fetchAndUpdate(parent); err != nil {
Expand Down Expand Up @@ -241,10 +241,10 @@ func (c *Chains) determineChainAction(cfg *uregistrytypes.ChainConfig) chainActi

if bothDisabled {
if exists {
c.logger.Info().Str("chain", chainID).Msg("chain fully disabled (inbound+outbound off), removing")
c.logger.Info().Str("chain", chainID).Msg("chain disabled, removing")
return chainActionRemove
}
c.logger.Debug().Str("chain", chainID).Msg("chain fully disabled, skipping")
c.logger.Debug().Str("chain", chainID).Msg("chain disabled, skipping")
return chainActionSkip
}

Expand Down Expand Up @@ -304,7 +304,7 @@ func (c *Chains) addChain(ctx context.Context, cfg *uregistrytypes.ChainConfig)

c.logger.Info().
Str("chain", cfg.Chain).
Msg("successfully added chain client")
Msg("chain client added")

return nil
}
Expand All @@ -318,11 +318,6 @@ func (c *Chains) removeChain(chainID string) error {
if !exists {
return nil
}

c.logger.Info().
Str("chain", chainID).
Msg("removing chain client")

// Stop the client
if err := client.Stop(); err != nil {
c.logger.Error().
Expand All @@ -333,6 +328,11 @@ func (c *Chains) removeChain(chainID string) error {

delete(c.chains, chainID)
delete(c.chainConfigs, chainID)

c.logger.Info().
Str("chain", chainID).
Msg("chain client removed")

return nil
}

Expand All @@ -341,7 +341,7 @@ func (c *Chains) StopAll() {
c.chainsMu.Lock()
defer c.chainsMu.Unlock()

c.logger.Info().Msg("stopping all chain clients")
c.logger.Debug().Msg("stopping all chain clients")

for chainID, client := range c.chains {
if err := client.Stop(); err != nil {
Expand Down Expand Up @@ -420,8 +420,8 @@ func (c *Chains) getChainDB(chainID string) (*db.DB, error) {
return nil, fmt.Errorf("failed to create database for chain %s: %w", chainID, err)
}

c.logger.Info().
Str("chain_id", chainID).
c.logger.Debug().
Str("chain", chainID).
Str("db_path", filepath.Join(baseDir, dbFilename)).
Msg("created file database for chain")

Expand Down Expand Up @@ -488,7 +488,7 @@ func (c *Chains) ensurePushChain(ctx context.Context) error {

c.logger.Info().
Str("chain", c.pushChainID).
Msg("successfully added push chain client")
Msg("chain client added")

return nil
}
Expand Down
8 changes: 4 additions & 4 deletions universalClient/chains/common/event_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewEventCleaner(

// Start begins the periodic cleanup process
func (ec *EventCleaner) Start(ctx context.Context) error {
ec.logger.Info().
ec.logger.Debug().
Str("cleanup_interval", ec.cleanupInterval.String()).
Str("retention_period", ec.retentionPeriod.String()).
Msg("starting event cleaner")
Expand All @@ -57,10 +57,10 @@ func (ec *EventCleaner) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
ec.logger.Info().Msg("context cancelled, stopping event cleaner")
ec.logger.Debug().Msg("context cancelled, stopping event cleaner")
return
case <-ec.stopCh:
ec.logger.Info().Msg("stop signal received, stopping event cleaner")
ec.logger.Debug().Msg("stop signal received, stopping event cleaner")
return
case <-ec.ticker.C:
if err := ec.performCleanup(); err != nil {
Expand All @@ -75,7 +75,7 @@ func (ec *EventCleaner) Start(ctx context.Context) error {

// Stop gracefully stops the event cleaner
func (ec *EventCleaner) Stop() {
ec.logger.Info().Msg("stopping event cleaner")
ec.logger.Debug().Msg("stopping event cleaner")

if ec.ticker != nil {
ec.ticker.Stop()
Expand Down
16 changes: 9 additions & 7 deletions universalClient/chains/common/event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (ep *EventProcessor) Stop() error {
return nil
}

ep.logger.Info().Msg("stopping event processor")
ep.logger.Debug().Msg("stopping event processor")
close(ep.stopCh)
ep.running = false

Expand All @@ -97,10 +97,10 @@ func (ep *EventProcessor) processLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
ep.logger.Info().Msg("context cancelled, stopping event processor")
ep.logger.Debug().Msg("context cancelled, stopping event processor")
return
case <-ep.stopCh:
ep.logger.Info().Msg("stop signal received, stopping event processor")
ep.logger.Debug().Msg("stop signal received, stopping event processor")
return
case <-ticker.C:
// Fetch 1000 CONFIRMED events and process them
Expand Down Expand Up @@ -151,7 +151,7 @@ func (ep *EventProcessor) processConfirmedEvents(ctx context.Context) error {

// processOutboundEvent processes an outbound event by voting on it
func (ep *EventProcessor) processOutboundEvent(ctx context.Context, event *store.Event) error {
ep.logger.Info().
ep.logger.Debug().
Str("event_id", event.EventID).
Msg("processing outbound event")

Expand Down Expand Up @@ -188,15 +188,16 @@ func (ep *EventProcessor) processOutboundEvent(ctx context.Context, event *store

ep.logger.Info().
Str("event_id", event.EventID).
Str("type", event.Type).
Str("vote_tx_hash", voteTxHash).
Msg("outbound event marked as COMPLETED")
Msg("event marked as COMPLETED")

return nil
}

// processInboundEvent processes an inbound event by voting on it and confirming it
func (ep *EventProcessor) processInboundEvent(ctx context.Context, event *store.Event) error {
ep.logger.Info().
ep.logger.Debug().
Str("event_id", event.EventID).
Msg("processing inbound event")

Expand Down Expand Up @@ -228,8 +229,9 @@ func (ep *EventProcessor) processInboundEvent(ctx context.Context, event *store.

ep.logger.Info().
Str("event_id", event.EventID).
Str("type", event.Type).
Str("vote_tx_hash", voteTxHash).
Msg("inbound event marked as COMPLETED")
Msg("event marked as COMPLETED")

return nil
}
Expand Down
6 changes: 3 additions & 3 deletions universalClient/chains/evm/chain_meta_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ func (g *ChainMetaOracle) fetchAndVoteChainMeta(ctx context.Context) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

g.logger.Info().
g.logger.Debug().
Dur("interval", interval).
Msg("starting gas price fetching and voting")

for {
select {
case <-ctx.Done():
g.logger.Info().Msg("context cancelled, stopping gas price fetcher")
g.logger.Debug().Msg("context cancelled, stopping gas price fetcher")
return
case <-g.stopCh:
g.logger.Info().Msg("stop signal received, stopping gas price fetcher")
g.logger.Debug().Msg("stop signal received, stopping gas price fetcher")
return
case <-ticker.C:
// Fetch current gas price
Expand Down
11 changes: 5 additions & 6 deletions universalClient/chains/evm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewClient(
func (c *Client) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(context.Background())

c.logger.Info().Str("chain", c.chainIDStr).Msg("starting EVM chain client")
c.logger.Debug().Str("chain", c.chainIDStr).Msg("starting EVM chain client")

// Initialize RPC client first (required for other components)
if err := c.createRPCClient(); err != nil {
Expand All @@ -119,7 +119,7 @@ func (c *Client) Start(ctx context.Context) error {

// Stop gracefully shuts down the EVM chain client
func (c *Client) Stop() error {
c.logger.Info().Msg("stopping EVM chain client")
c.logger.Debug().Msg("stopping EVM chain client")

// Cancel context first to signal shutdown
if c.cancel != nil {
Expand All @@ -129,7 +129,7 @@ func (c *Client) Stop() error {
// Stop components in reverse order of initialization
if c.eventListener != nil {
if err := c.eventListener.Stop(); err != nil {
c.logger.Error().Err(err).Msg("error stopping event listener")
c.logger.Error().Err(err).Str("subsystem", "event_listener").Msg("subsystem failed to stop")
}
}

Expand All @@ -139,7 +139,7 @@ func (c *Client) Stop() error {

if c.eventProcessor != nil {
if err := c.eventProcessor.Stop(); err != nil {
c.logger.Error().Err(err).Msg("error stopping event processor")
c.logger.Error().Err(err).Str("subsystem", "event_processor").Msg("subsystem failed to stop")
}
}

Expand Down Expand Up @@ -208,7 +208,6 @@ func (c *Client) initializeComponents() error {
if err != nil {
return fmt.Errorf("failed to fetch vault address from gateway: %w", err)
}
c.logger.Info().Str("vault_address", vaultAddr.Hex()).Msg("vault address fetched from gateway")

eventListener, err := NewEventListener(
c.rpcClient,
Expand Down Expand Up @@ -320,7 +319,7 @@ func (c *Client) createRPCClient() error {
}

c.rpcClient = rpcClient
c.logger.Info().Int("connected_count", len(rpcClient.clients)).Msg("EVM RPC clients initialized successfully")
c.logger.Info().Int("connected_count", len(rpcClient.clients)).Msg("RPC clients initialized successfully")
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions universalClient/chains/evm/event_confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ func (ec *EventConfirmer) checkAndConfirmEvents(ctx context.Context) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

ec.logger.Info().
ec.logger.Debug().
Dur("interval", interval).
Msg("starting event confirmation checking")

for {
select {
case <-ctx.Done():
ec.logger.Info().Msg("context cancelled, stopping event confirmer")
ec.logger.Debug().Msg("context cancelled, stopping event confirmer")
return
case <-ec.stopCh:
ec.logger.Info().Msg("stop signal received, stopping event confirmer")
ec.logger.Debug().Msg("stop signal received, stopping event confirmer")
return
case <-ticker.C:
if err := ec.processPendingEvents(ctx); err != nil {
Expand Down Expand Up @@ -202,7 +202,7 @@ func (ec *EventConfirmer) processPendingEvents(ctx context.Context) error {

if rowsAffected > 0 {
confirmedCount++
ec.logger.Info().
ec.logger.Debug().
Str("event_id", event.EventID).
Str("event_type", event.Type).
Uint64("confirmations", confirmations).
Expand Down
14 changes: 6 additions & 8 deletions universalClient/chains/evm/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (el *EventListener) Start(ctx context.Context) error {
el.wg.Add(1)
go el.listen(ctx)

el.logger.Info().Msg("EVM event listener started")
return nil
}

Expand All @@ -130,12 +129,11 @@ func (el *EventListener) Stop() error {
return nil
}

el.logger.Info().Msg("stopping EVM event listener")
el.logger.Debug().Msg("stopping EVM event listener")
close(el.stopCh)
el.running = false

el.wg.Wait()
el.logger.Info().Msg("EVM event listener stopped")
return nil
}

Expand All @@ -161,11 +159,11 @@ func (el *EventListener) listen(ctx context.Context) {
// Get event topics
topics := el.eventTopics
if len(topics) == 0 {
el.logger.Warn().Msg("no event topics configured, event listener will not process events")
el.logger.Error().Msg("no event topics configured, event listener will not process events")
return
}

el.logger.Info().
el.logger.Debug().
Int("topic_count", len(topics)).
Uint64("from_block", fromBlock).
Dur("poll_interval", pollInterval).
Expand All @@ -178,10 +176,10 @@ func (el *EventListener) listen(ctx context.Context) {
for {
select {
case <-ctx.Done():
el.logger.Info().Msg("context cancelled, stopping event listener")
el.logger.Debug().Msg("context cancelled, stopping event listener")
return
case <-el.stopCh:
el.logger.Info().Msg("stop signal received, stopping event listener")
el.logger.Debug().Msg("stop signal received, stopping event listener")
return
case <-ticker.C:
if err := el.processNewBlocks(ctx, &currentBlock, topics); err != nil {
Expand Down Expand Up @@ -289,7 +287,7 @@ func (el *EventListener) processBlockChunk(

// Log when events are found
if len(logs) > 0 {
el.logger.Info().
el.logger.Debug().
Uint64("from_block", fromBlock).
Uint64("to_block", toBlock).
Int("logs_found", len(logs)).
Expand Down
Loading
Loading