Feat/candle service#19
Conversation
…ti timeframe candle data with L1 cache
…rade-store-service
WalkthroughAdds a new Go-based candle-service (Kafka consumer/producer, sharded workers, in-memory candle store, Redis persistence, gRPC server), pluralizes API Gateway route mounts, adds protobufs for aggregation/indicators, introduces trade-ingestor service and ClickHouse clients, updates several go.mod files, and adds documentation and tooling (Makefiles, Air configs). Changes
Sequence Diagram(s)sequenceDiagram
participant Kafka as Kafka<br/>(trades)
participant Consumer as Kafka<br/>Consumer
participant Router as Worker<br/>Router
participant Worker as Worker<br/>(shard)
participant Store as Candle<br/>Store
participant Redis as Redis
participant Producer as Kafka<br/>Producer
participant gRPC as gRPC<br/>Client
Kafka->>Consumer: deliver message
Consumer->>Router: Route(EngineEvent)
Router->>Worker: enqueue by symbol hash
Worker->>Worker: dequeue & Process()
Worker->>Store: AddNewCandle(symbol, trade)
Store->>Store: update/current & close buckets
Store->>Redis: PushCandle / PublishCandleEventToRedis
Store->>Producer: (on close) PublishCandleEventToKafka
gRPC->>Router: GetCandles(symbol, timeframe)
Router->>Worker: route request by hash
Worker->>Store: GetCandles(...)
Store-->>gRPC: return candle list
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)level=error msg="Running error: context loading failed: failed to load packages: failed to load packages: failed to load with go/packages: err: exit status 1: stderr: go: cannot load module packages/proto-defs/go/generated listed in go.work file: open packages/proto-defs/go/generated/go.mod: no such file or directory\n" Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
🟠 Major comments (15)
apps/api-gateway/src/app.ts-11-13 (1)
11-13:⚠️ Potential issue | 🟠 MajorAvoid breaking existing API clients with a route rename.
These mounts change the public gateway surface from singular to plural paths, so any caller using
/api/v1/trade,/api/v1/user, or/api/v1/orderwill start failing immediately. Please keep the old aliases temporarily or add a migration/deprecation plan before removing them.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/api-gateway/src/app.ts` around lines 11 - 13, The route mounts changed public paths from singular to plural (app.use("/api/v1/trades", tradeRouter), app.use("/api/v1/users", userRouter), app.use("/api/v1/orders", orderRouter)), which will break clients using /api/v1/trade, /api/v1/user, /api/v1/order; restore backward compatibility by adding the old singular aliases (e.g., app.use("/api/v1/trade", tradeRouter), app.use("/api/v1/user", userRouter), app.use("/api/v1/order", orderRouter)) or add 301/302 redirects from the singular to plural routes inside the same app initialization so both paths work while you publish a deprecation plan; ensure the change references the same router instances (tradeRouter, userRouter, orderRouter) so behavior remains identical.apps/matching-engine/SYSTEM_DESIGN.md-890-921 (1)
890-921:⚠️ Potential issue | 🟠 MajorHandle
ORDER_PARTIAL_FILLEDduring replay.Section 6 marks this as a persisted event, but replay never processes it. A restart can therefore rebuild a partially filled order with the wrong status.
🔁 Suggested replay update
ORDER_REDUCED: update remaining + cancelled qty on order + + ORDER_PARTIAL_FILLED: + update order status to PARTIAL_FILLED and keep order in AllOrders/PriceLevel🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/matching-engine/SYSTEM_DESIGN.md` around lines 890 - 921, The replayWAL loop ignores ORDER_PARTIAL_FILLED events which means partial fills are not reconstructed on restart; update the switch in replayWAL to handle ORDER_PARTIAL_FILLED by locating the order in AllOrders (or reconstructing from the persisted OrderStatusEvent), updating its filled/remaining quantities and average price, setting its status to PARTIALLY_FILLED, and ensuring any PriceLevel bookkeeping (e.g., adjusting order quantity in PriceLevel) is updated just like TRADE_EXECUTED does so the in-memory order state matches the persisted event stream.apps/candle-service/go.mod-5-36 (1)
5-36:⚠️ Potential issue | 🟠 MajorMove
google.golang.org/grpcfrom indirect to direct requirements.The candle-service directly imports gRPC in
apps/candle-service/internal/server.go(packages:grpc,grpc/codes,grpc/reflection,grpc/status), butgoogle.golang.org/grpc v1.80.0is declared as an indirect dependency ingo.mod. Direct code imports should have corresponding directrequiredeclarations to ensure proper dependency resolution and prevent potential tooling issues.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/go.mod` around lines 5 - 36, The go.mod currently lists google.golang.org/grpc v1.80.0 only as an indirect requirement but the service directly imports gRPC packages in apps/candle-service/internal/server.go (imports: google.golang.org/grpc, grpc/codes, grpc/reflection, grpc/status); move or add google.golang.org/grpc v1.80.0 into the primary require block as a direct requirement (remove the // indirect annotation), then run go mod tidy to ensure module graph is consistent.apps/candle-service/internal/kafka/consumerClient.go-56-59 (1)
56-59:⚠️ Potential issue | 🟠 MajorAdd backoff on consume errors to avoid hot-loop retries.
Immediate retries on persistent failures can cause log storms and unnecessary CPU churn.
Suggested fix
for { if err := k.group.Consume(ctx, topics, handler); err != nil { log.Println("consumer error:", err) + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/kafka/consumerClient.go` around lines 56 - 59, The consume loop currently retries immediately on errors causing a hot-loop; modify the loop around k.group.Consume(ctx, topics, handler) to implement a backoff strategy (eg. exponential backoff with jitter) that sleeps before retrying on error, resets the backoff when Consume returns nil, and still breaks promptly when ctx is cancelled; ensure the backoff logic is associated with the goroutine/method that contains k.group.Consume and uses ctx.Done() to avoid blocking shutdown.docs/data-service.html-371-891 (1)
371-891:⚠️ Potential issue | 🟠 MajorMermaid blocks contain unescaped
</>and invalid HTML token sequences.The diagram content includes patterns like
<-->and<-->|...|inside HTML, which is causing parser/tag-pair errors and can break rendering.Suggested fix pattern
-<pre><code class="language-mermaid"><div class="mermaid">graph TB +<pre class="mermaid">graph TB ... - WS1 <-->|gRPC| DS + WS1 <-->|gRPC| DS ... -</div></code></pre> +</pre>🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/data-service.html` around lines 371 - 891, Mermaid code blocks contain raw angle brackets (e.g., arrows like RD -->|Pub/Sub| WS1, Exchange->>Kafka, WS1 <-->|gRPC| DS and other "-->", "<-->" sequences) which break the HTML parser; fix by escaping angle brackets inside each <div class="mermaid"> block (replace "<" with "<" and ">" with ">") or wrap the mermaid content in a safe container (e.g., CDATA/script type="text/plain") so the parser doesn't interpret tokens as HTML; update every mermaid block (graph TB, sequenceDiagram, graph LR, etc.) including examples like "WebSocket Server Process", "Real-Time Data Flow" and "Subscription Manager" diagrams accordingly.apps/candle-service/internal/engine/worker.go-29-31 (1)
29-31:⚠️ Potential issue | 🟠 MajorHandle protobuf decode failures before mutating candle state.
The call to
proto.Unmarshalat line 29 ignores errors, allowing partially initialized or invalidTradeEventobjects to reachAddNewCandle. This can write candles with empty symbols or corrupted data when payloads are malformed.Suggested fix
func (w *Worker) Process() { for event := range w.eventQueue { tradeEvent := &pb.TradeEvent{} - - proto.Unmarshal(event.Data, tradeEvent) + if err := proto.Unmarshal(event.Data, tradeEvent); err != nil { + continue + } + if tradeEvent.GetSymbol() == "" { + continue + } w.candleCache.AddNewCandle(tradeEvent.Symbol, tradeEvent) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/engine/worker.go` around lines 29 - 31, The proto.Unmarshal call can fail and currently its error is ignored, allowing a partially decoded tradeEvent to be passed to w.candleCache.AddNewCandle; update the code around proto.Unmarshal(event.Data, tradeEvent) to check the returned error and bail out (log/return/skip processing) when unmarshalling fails, ensuring you do not call w.candleCache.AddNewCandle(tradeEvent.Symbol, tradeEvent) with an invalid tradeEvent; also ensure tradeEvent is properly allocated before Unmarshal and only used after a successful unmarshal.apps/candle-service/cmd/candle-service/main.go-41-44 (1)
41-44:⚠️ Potential issue | 🟠 MajorClose the Kafka producer during shutdown to prevent message loss.
The
AsyncProducerrequires an explicitClose()call before the process exits to ensure buffered messages are flushed. Currently, the shutdown path (lines 85–86) only cancels the consumer context and stops gRPC, leaving the producer's buffers unhandled. With flush configured for 10ms and 1MB (lines 31–32), in-flight messages will be dropped on exit.Call
kafka.KafkaProducerClient.Close()in the shutdown sequence aftergrpcServer.GracefulStop().Also applies to: 85-87
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/cmd/candle-service/main.go` around lines 41 - 44, Shutdown currently cancels the consumer context and calls grpcServer.GracefulStop() but never closes the Kafka producer, causing buffered messages to be dropped; update the shutdown sequence so that after calling grpcServer.GracefulStop() you call kafka.KafkaProducerClient.Close() (the AsyncProducer Close) and handle/ log any error returned, ensuring InitKafkaProducer()/kafka.KafkaProducerClient is accessible in main.go and closed before exiting.apps/candle-service/internal/kafka/consumerClient.go-22-24 (1)
22-24:⚠️ Potential issue | 🟠 MajorEnable
config.Consumer.Return.Errors = trueto make error handling effective.The code at lines 50-54 attempts to consume errors from
group.Errors(), but Sarama's Kafka client only sends errors to this channel whenConsumer.Return.Errorsis explicitly enabled. Without this configuration, errors are logged internally by Sarama and never reach the channel, making this error handling path ineffective.Suggested fix
config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Second config.Consumer.MaxWaitTime = 500 * time.Millisecond + config.Consumer.Return.Errors = true🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/kafka/consumerClient.go` around lines 22 - 24, The error channel from the Sarama consumer group is never populated because Consumer.Return.Errors is not enabled; update the Kafka client configuration by setting config.Consumer.Return.Errors = true alongside the existing config.Consumer.Offsets.* and config.Consumer.MaxWaitTime settings so that errors are forwarded to group.Errors(), allowing the existing error handling code that reads from group.Errors() to work as intended.apps/candle-service/internal/server.go-31-34 (1)
31-34:⚠️ Potential issue | 🟠 MajorAvoid
log.Fatalfin gRPC serve goroutine.The server calls
Serve()in a goroutine, andFatalfforces process exit. When the server shuts down gracefully,Servereturnsgrpc.ErrServerStopped(an expected, non-error condition), butFatalfwill terminate the process instead of allowing shutdown to complete. Filter the expected error and use non-fatal logging:Suggested fix
import ( "context" + "errors" "log" "net" @@ go func() { - if err := srv.Serve(netListener); err != nil { - log.Fatalf("failed to serve: %v", err) + if err := srv.Serve(netListener); err != nil && !errors.Is(err, grpc.ErrServerStopped) { + log.Printf("gRPC serve error: %v", err) } }()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/server.go` around lines 31 - 34, The goroutine calling srv.Serve(netListener) currently uses log.Fatalf which will kill the process even when Serve returns the expected grpc.ErrServerStopped during graceful shutdown; change the error handling in that goroutine to check if err == grpc.ErrServerStopped and log an informational message (e.g., log.Printf or log.Info) in that case, otherwise log the unexpected error without calling Fatalf (e.g., log.Printf/log.Errorf) so the process can continue orderly shutdown; update the anonymous goroutine that calls srv.Serve to import/reference grpc.ErrServerStopped and handle the two branches accordingly.apps/candle-service/internal/memoryStore/candleStore.go-91-99 (1)
91-99:⚠️ Potential issue | 🟠 MajorSet
close_timewhen finalizing a candle.Closed candles currently set
IsClosedonly. WithoutCloseTime, downstream consumers lose exact interval boundary metadata.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/memoryStore/candleStore.go` around lines 91 - 99, The closed candle is missing its CloseTime, so set activeCandle.CloseTime to the interval boundary (e.g., openTimeBucket + tfSeconds) before marking IsClosed and appending to store.history; update the block that currently sets activeCandle.IsClosed = true and appends to store.history (around activeCandle, store.history[tfName], store.current[tfName], cache.newCandle and cache.onCandleClosed) to assign CloseTime, then proceed with creating the new candle and invoking cache.onCandleClosed.apps/candle-service/internal/memoryStore/redis.go-51-75 (1)
51-75:⚠️ Potential issue | 🟠 MajorAdd nil-client guard before Redis operations.
PushCandleandGetCandlesFromRedisassumeRedisClientis always initialized. If initialization fails or usage order changes, these paths panic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/memoryStore/redis.go` around lines 51 - 75, PushCandle and GetCandlesFromRedis call RedisClient methods without checking for a nil RedisClient, which can cause panics if initialization failed; add a guard at the start of both functions to return a clear error when RedisClient == nil (and optionally when ctx is nil) before calling candleKey/LPush/LRange/TxPipeline, referencing the RedisClient variable and the PushCandle and GetCandlesFromRedis functions so the check is added early and returns a wrapped fmt.Errorf describing "redis client not initialized" (or similar) instead of proceeding.apps/candle-service/internal/engine/router.go-38-38 (1)
38-38:⚠️ Potential issue | 🟠 MajorUnbounded blocking on worker queue can stall ingestion.
Line 38 blocks when
eventQueueis full. Under load, this can halt the consumer loop and cascade into lag/rebalances. Consider makingRoutereturn an error on full queue so the caller can apply retry/backpressure policy explicitly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/engine/router.go` at line 38, The Route method currently blocks on sending to worker.eventQueue (worker.eventQueue <- event), which can stall ingestion; change Route to return an error instead of blocking: update the Route(signature) to return error, implement a non-blocking send using select (attempt to send to worker.eventQueue and on default return a new sentinel error like ErrQueueFull), add the ErrQueueFull sentinel (exported or package-private) and update callers to handle/propagate this error so they can apply retries or backpressure policies.docs/data-service.md-341-346 (1)
341-346:⚠️ Potential issue | 🟠 MajorSubscription lifecycle diagram conflicts with current gateway behavior.
Lines 341-346 document first/last subscriber actions as direct Redis subscribe/unsubscribe ownership, but the current websocket gateway implementation manages first/last subscriber via per-symbol matching-engine gRPC streams. Please align this section so runbooks and architecture docs match runtime behavior.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/data-service.md` around lines 341 - 346, The diagram incorrectly shows SM -> RSB/RUS directly causing RD subscribe/unsubscribe; update the doc so the "First Subscriber" and "Last Subscriber" actions reflect the websocket gateway managing per-symbol matching-engine gRPC streams, not direct Redis ownership: change the arrows/labels (e.g., replace RSB/RUS -> RD with WebsocketGateway -> MatchingEngineGRPCStream start/stop and then MatchingEngine -> Redis subscribe/unsubscribe) and adjust the surrounding text to state that first/last subscriber triggers opening/closing a per-symbol gRPC stream from the websocket gateway to the matching-engine which in turn performs Redis subscriptions, ensuring symbols like SM, RSB, RUS and RD are renamed or annotated to reflect gateway->matching-engine->redis flow.apps/candle-service/internal/kafka/producerClient.go-56-71 (1)
56-71:⚠️ Potential issue | 🟠 MajorProtect publish path from nil producer panics.
Line 70 assumes producer initialization always succeeded. A nil/closed producer here will panic and crash the candle path.
Proposed fix
-func PublishCandleEventToKafka(symbol string, timeframe string, candle *pbAggegration.Candle) { +func PublishCandleEventToKafka(symbol string, timeframe string, candle *pbAggegration.Candle) { + if KafkaProducerClient == nil { + log.Println("kafka producer is not initialized") + return + } value, err := proto.Marshal(candle)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/kafka/producerClient.go` around lines 56 - 71, PublishCandleEventToKafka currently assumes KafkaProducerClient is initialized and calls KafkaProducerClient.Input() <- msg which will panic if the producer is nil or closed; before sending, check that KafkaProducerClient != nil (and optionally that KafkaProducerClient.Input() is usable) and if it is nil/closed log an error and return early to avoid panics. Update the PublishCandleEventToKafka function to guard the send with a nil-check of KafkaProducerClient (and handle the closed/invalid producer case by logging and returning) so calls to KafkaProducerClient.Input() are only made when the producer is valid.packages/proto-defs/proto/aggeration/v1/indicators.proto-50-52 (1)
50-52:⚠️ Potential issue | 🟠 MajorUse shared
Timeframeenum instead of free-form string.At Line 51,
timeframeisstring, while candle APIs use typedTimeframe. This allows invalid inputs and weakens cross-service contracts.Proposed fix
message IndicatorInstance { string instance_id = 1; // unique per user/chart string indicator_id = 2; string symbol = 3; - string timeframe = 4; + Timeframe timeframe = 4; repeated IndicatorParam params = 5; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/proto-defs/proto/aggeration/v1/indicators.proto` around lines 50 - 52, The timeframe field is defined as a free-form string which weakens cross-service contracts; update the field in indicators.proto by changing the type of the field named timeframe (field number 4) from string to the shared Timeframe enum used by candle APIs, add the appropriate import for the proto that defines Timeframe, and keep the field tag (4) unchanged so wire compatibility is preserved; after updating the proto, run the proto generation step to regenerate the related language bindings (look for references to timeframe in any usages of the Indicators message to ensure callers are updated to use the Timeframe enum values).
🟡 Minor comments (2)
apps/candle-service/internal/utils/utils.go-7-11 (1)
7-11:⚠️ Potential issue | 🟡 MinorRename the exported response type.
APIReponseis misspelled, and that typo will leak into every caller of this internal package.🛠️ Proposed fix
-type APIReponse struct { +type APIResponse struct {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/utils/utils.go` around lines 7 - 11, The exported struct name APIReponse is misspelled; rename it to APIResponse and update all references/usages across the codebase (e.g., functions, tests, unmarshalling/typing) to use APIResponse so the typo doesn't propagate; preserve the struct fields and JSON tags (Success, Data, Error) and then run go vet/gofmt to ensure imports and build breakages are resolved.apps/matching-engine/SYSTEM_DESIGN.md-37-39 (1)
37-39:⚠️ Potential issue | 🟡 MinorUse full
host:portaddresses for every broker.The table currently mixes
localhost:19092with bare ports19093and19094, which makes the endpoint list ambiguous.✏️ Suggested text fix
-| Kafka brokers | `localhost:19092`, `19093`, `19094` | +| Kafka brokers | `localhost:19092`, `localhost:19093`, `localhost:19094` |🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/matching-engine/SYSTEM_DESIGN.md` around lines 37 - 39, Update the Kafka brokers entry in the table so all brokers use full host:port notation (replace the bare ports `19093`, `19094` with `localhost:19093`, `localhost:19094`) under the "Kafka brokers" row to make the endpoint list unambiguous; keep the gRPC port line (`gRPC port | localhost:50052`) unchanged.
🧹 Nitpick comments (3)
apps/candle-service/internal/server.go (1)
72-115: Remove legacy commented HTTP handlers from gRPC server file.This dead block adds noise and raises maintenance cost in a new service baseline.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/server.go` around lines 72 - 115, Delete the entire legacy commented HTTP handler block (the commented functions HealthCheck and (s *Server) GetCandles) from the file to remove dead code noise; specifically remove references to HealthCheck, GetCandles, pbAggeration.Timeframe_value, pbAggeration.Timeframe, s.router.GetCandles and utils.APIReponse in that commented section so the gRPC server file only contains active gRPC-related code.apps/candle-service/cmd/candle-service/main.go (1)
39-40: Move brokers/topic/port to configuration inputs.Hardcoded network endpoints and topic names make multi-env deploys brittle.
Suggested direction
- brokerAddresses := []string{"localhost:19092", "localhost:19093", "localhost:19094"} + brokerAddresses := strings.Split(getEnv("KAFKA_BROKERS", "localhost:19092,localhost:19093,localhost:19094"), ",") - topics := []string{ - "trades", - } + topics := []string{getEnv("TRADES_TOPIC", "trades")} - PORT := "50054" + port := getEnv("GRPC_PORT", "50054") - listener, err := net.Listen("tcp", ":"+PORT) + listener, err := net.Listen("tcp", ":"+port)Also applies to: 57-59, 65-67
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/cmd/candle-service/main.go` around lines 39 - 40, Replace hardcoded endpoints and topic strings with configurable inputs: read brokerAddresses, broker ports, and topic names from configuration (environment variables, command-line flags, or a config file) instead of the literal slice and string constants; for example, replace the brokerAddresses slice and any topic constants referenced near it with values loaded via os.Getenv or flag.* (with sensible defaults) and parse a comma-separated broker list into a slice so deployments can override endpoints per environment; update the variables where they are used (brokerAddresses and the topic name identifiers found later in main.go) to use the new config values.apps/candle-service/makefile (1)
6-21: Add standard phony/default targets to satisfy make tooling.
all,clean, andtestare missing, and.PHONYdeclarations are absent. This is low-risk but improves CI/tool compatibility.Suggested Makefile patch
+.PHONY: all clean test post-install dev build tidy + +all: build + +clean: + `@echo` "➡ Cleaning build artifacts..." + rm -rf $(BUILD_DIR) + +test: + `@echo` "➡ Running tests..." + go test ./... + post-install: `@echo` "➡ Generating go.mod for proto-generated code..." `@mkdir` -p $(ROOT_DIR)/packages/proto-defs/go/generated `@printf` "module github.com/sameerkrdev/nerve/packages/proto-defs/go/generated\n\ngo 1.21\n" > \ $(ROOT_DIR)/packages/proto-defs/go/generated/go.mod🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/makefile` around lines 6 - 21, Add standard Makefile targets and .PHONY declarations: create a default all target that depends on build (so running make builds the service), add a clean target that removes the built binary and build artifacts using BUILD_DIR and APP_NAME, add a test target that runs "go test ./..." for the module, and append a .PHONY line listing all phony targets (all, build, clean, dev, post-install, tidy, test) so Make treats them as phony; reference existing targets like post-install, dev, build and variables APP_NAME, BUILD_DIR, CMD_PATH when adding these entries.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/candle-service/internal/engine/router.go`:
- Around line 15-44: NewWorkerRouter currently allows workerCount <= 0 which
leads to divide-by-zero panics in Route and GetCandles; fix by normalizing the
count on construction (e.g., if workerCount <= 0 set workerCount = 1), ensure
you allocate at least one Worker and append to WorkerRouter.workers accordingly,
and also add defensive guards in Route and GetCandles to handle a wrapped router
with no workers (return early or no-op / an error) referencing NewWorkerRouter,
WorkerRouter.workers, WorkerRouter.count, Route, and GetCandles so modulo
operations never use zero.
In `@apps/candle-service/internal/kafka/consumerHandler.go`:
- Around line 36-45: proto.Unmarshal errors are currently ignored which causes
malformed messages to be acked; update the message handling loop so that the
return value of proto.Unmarshal(msg.Value, event) is checked, and on error log
the failure (include the raw error and message metadata) and skip calling
session.MarkMessage for that message; only call session.MarkMessage(msg, "")
after a successful unmarshal and after any subsequent processing such as
ch.router.Route(event) for common.EventType_TRADE_EXECUTED so bad payloads are
not silently dropped.
In `@apps/candle-service/internal/memoryStore/candleStore.go`:
- Around line 73-77: The loop over pb.Timeframe_value can divide by zero because
TIMEFRAME_UNSPECIFIED maps to 0; inside the for tfName, tfSeconds := range
pb.Timeframe_value loop (before computing openTimeBucket and before using
store.current), add a guard to skip entries with tfSeconds == 0 (or tfName ==
"TIMEFRAME_UNSPECIFIED") by continuing the loop so you never perform the
division; update any logic that assumes an entry was processed (e.g., handling
of activeCandle/store.current) accordingly.
- Around line 104-123: The price conversion uses integer division causing
truncation; change the conversions so you cast trade.Price to float64 before
dividing (fix the price := ... assignment and the O/H/L/C initializations in
newCandle) so you compute the true decimal price (e.g., cast trade.Price to
float64 then divide by 100) and update all usages (price, O, H, L, C)
accordingly; keep trade.Quantity handling unchanged.
In `@apps/candle-service/internal/memoryStore/redis.go`:
- Line 24: The Redis URL is hardcoded in the call to redis.ParseURL (opt, err :=
redis.ParseURL(...)) which leaks credentials; replace this by reading the Redis
connection URL from configuration or an environment variable (e.g., REDIS_URL)
and pass that value into redis.ParseURL, validate that the env/config value
exists and return/log an error if missing, and remove the literal credential
string from the source so secrets are not committed.
- Around line 91-93: PublishCandleEventToRedis is publishing the protobuf struct
directly (RedisClient.Publish with candle) instead of a marshaled byte slice;
change it to marshal the pb.Candle using proto.Marshal(candle), handle the
marshaling error, and pass the resulting []byte to RedisClient.Publish (similar
to PushCandle), ensuring error paths log the marshal error and the Publish error
using the same slog.Error pattern.
---
Major comments:
In `@apps/api-gateway/src/app.ts`:
- Around line 11-13: The route mounts changed public paths from singular to
plural (app.use("/api/v1/trades", tradeRouter), app.use("/api/v1/users",
userRouter), app.use("/api/v1/orders", orderRouter)), which will break clients
using /api/v1/trade, /api/v1/user, /api/v1/order; restore backward compatibility
by adding the old singular aliases (e.g., app.use("/api/v1/trade", tradeRouter),
app.use("/api/v1/user", userRouter), app.use("/api/v1/order", orderRouter)) or
add 301/302 redirects from the singular to plural routes inside the same app
initialization so both paths work while you publish a deprecation plan; ensure
the change references the same router instances (tradeRouter, userRouter,
orderRouter) so behavior remains identical.
In `@apps/candle-service/cmd/candle-service/main.go`:
- Around line 41-44: Shutdown currently cancels the consumer context and calls
grpcServer.GracefulStop() but never closes the Kafka producer, causing buffered
messages to be dropped; update the shutdown sequence so that after calling
grpcServer.GracefulStop() you call kafka.KafkaProducerClient.Close() (the
AsyncProducer Close) and handle/ log any error returned, ensuring
InitKafkaProducer()/kafka.KafkaProducerClient is accessible in main.go and
closed before exiting.
In `@apps/candle-service/go.mod`:
- Around line 5-36: The go.mod currently lists google.golang.org/grpc v1.80.0
only as an indirect requirement but the service directly imports gRPC packages
in apps/candle-service/internal/server.go (imports: google.golang.org/grpc,
grpc/codes, grpc/reflection, grpc/status); move or add google.golang.org/grpc
v1.80.0 into the primary require block as a direct requirement (remove the //
indirect annotation), then run go mod tidy to ensure module graph is consistent.
In `@apps/candle-service/internal/engine/router.go`:
- Line 38: The Route method currently blocks on sending to worker.eventQueue
(worker.eventQueue <- event), which can stall ingestion; change Route to return
an error instead of blocking: update the Route(signature) to return error,
implement a non-blocking send using select (attempt to send to worker.eventQueue
and on default return a new sentinel error like ErrQueueFull), add the
ErrQueueFull sentinel (exported or package-private) and update callers to
handle/propagate this error so they can apply retries or backpressure policies.
In `@apps/candle-service/internal/engine/worker.go`:
- Around line 29-31: The proto.Unmarshal call can fail and currently its error
is ignored, allowing a partially decoded tradeEvent to be passed to
w.candleCache.AddNewCandle; update the code around proto.Unmarshal(event.Data,
tradeEvent) to check the returned error and bail out (log/return/skip
processing) when unmarshalling fails, ensuring you do not call
w.candleCache.AddNewCandle(tradeEvent.Symbol, tradeEvent) with an invalid
tradeEvent; also ensure tradeEvent is properly allocated before Unmarshal and
only used after a successful unmarshal.
In `@apps/candle-service/internal/kafka/consumerClient.go`:
- Around line 56-59: The consume loop currently retries immediately on errors
causing a hot-loop; modify the loop around k.group.Consume(ctx, topics, handler)
to implement a backoff strategy (eg. exponential backoff with jitter) that
sleeps before retrying on error, resets the backoff when Consume returns nil,
and still breaks promptly when ctx is cancelled; ensure the backoff logic is
associated with the goroutine/method that contains k.group.Consume and uses
ctx.Done() to avoid blocking shutdown.
- Around line 22-24: The error channel from the Sarama consumer group is never
populated because Consumer.Return.Errors is not enabled; update the Kafka client
configuration by setting config.Consumer.Return.Errors = true alongside the
existing config.Consumer.Offsets.* and config.Consumer.MaxWaitTime settings so
that errors are forwarded to group.Errors(), allowing the existing error
handling code that reads from group.Errors() to work as intended.
In `@apps/candle-service/internal/kafka/producerClient.go`:
- Around line 56-71: PublishCandleEventToKafka currently assumes
KafkaProducerClient is initialized and calls KafkaProducerClient.Input() <- msg
which will panic if the producer is nil or closed; before sending, check that
KafkaProducerClient != nil (and optionally that KafkaProducerClient.Input() is
usable) and if it is nil/closed log an error and return early to avoid panics.
Update the PublishCandleEventToKafka function to guard the send with a nil-check
of KafkaProducerClient (and handle the closed/invalid producer case by logging
and returning) so calls to KafkaProducerClient.Input() are only made when the
producer is valid.
In `@apps/candle-service/internal/memoryStore/candleStore.go`:
- Around line 91-99: The closed candle is missing its CloseTime, so set
activeCandle.CloseTime to the interval boundary (e.g., openTimeBucket +
tfSeconds) before marking IsClosed and appending to store.history; update the
block that currently sets activeCandle.IsClosed = true and appends to
store.history (around activeCandle, store.history[tfName],
store.current[tfName], cache.newCandle and cache.onCandleClosed) to assign
CloseTime, then proceed with creating the new candle and invoking
cache.onCandleClosed.
In `@apps/candle-service/internal/memoryStore/redis.go`:
- Around line 51-75: PushCandle and GetCandlesFromRedis call RedisClient methods
without checking for a nil RedisClient, which can cause panics if initialization
failed; add a guard at the start of both functions to return a clear error when
RedisClient == nil (and optionally when ctx is nil) before calling
candleKey/LPush/LRange/TxPipeline, referencing the RedisClient variable and the
PushCandle and GetCandlesFromRedis functions so the check is added early and
returns a wrapped fmt.Errorf describing "redis client not initialized" (or
similar) instead of proceeding.
In `@apps/candle-service/internal/server.go`:
- Around line 31-34: The goroutine calling srv.Serve(netListener) currently uses
log.Fatalf which will kill the process even when Serve returns the expected
grpc.ErrServerStopped during graceful shutdown; change the error handling in
that goroutine to check if err == grpc.ErrServerStopped and log an informational
message (e.g., log.Printf or log.Info) in that case, otherwise log the
unexpected error without calling Fatalf (e.g., log.Printf/log.Errorf) so the
process can continue orderly shutdown; update the anonymous goroutine that calls
srv.Serve to import/reference grpc.ErrServerStopped and handle the two branches
accordingly.
In `@apps/matching-engine/SYSTEM_DESIGN.md`:
- Around line 890-921: The replayWAL loop ignores ORDER_PARTIAL_FILLED events
which means partial fills are not reconstructed on restart; update the switch in
replayWAL to handle ORDER_PARTIAL_FILLED by locating the order in AllOrders (or
reconstructing from the persisted OrderStatusEvent), updating its
filled/remaining quantities and average price, setting its status to
PARTIALLY_FILLED, and ensuring any PriceLevel bookkeeping (e.g., adjusting order
quantity in PriceLevel) is updated just like TRADE_EXECUTED does so the
in-memory order state matches the persisted event stream.
In `@docs/data-service.html`:
- Around line 371-891: Mermaid code blocks contain raw angle brackets (e.g.,
arrows like RD -->|Pub/Sub| WS1, Exchange->>Kafka, WS1 <-->|gRPC| DS and other
"-->", "<-->" sequences) which break the HTML parser; fix by escaping angle
brackets inside each <div class="mermaid"> block (replace "<" with "<" and
">" with ">") or wrap the mermaid content in a safe container (e.g.,
CDATA/script type="text/plain") so the parser doesn't interpret tokens as HTML;
update every mermaid block (graph TB, sequenceDiagram, graph LR, etc.) including
examples like "WebSocket Server Process", "Real-Time Data Flow" and
"Subscription Manager" diagrams accordingly.
In `@docs/data-service.md`:
- Around line 341-346: The diagram incorrectly shows SM -> RSB/RUS directly
causing RD subscribe/unsubscribe; update the doc so the "First Subscriber" and
"Last Subscriber" actions reflect the websocket gateway managing per-symbol
matching-engine gRPC streams, not direct Redis ownership: change the
arrows/labels (e.g., replace RSB/RUS -> RD with WebsocketGateway ->
MatchingEngineGRPCStream start/stop and then MatchingEngine -> Redis
subscribe/unsubscribe) and adjust the surrounding text to state that first/last
subscriber triggers opening/closing a per-symbol gRPC stream from the websocket
gateway to the matching-engine which in turn performs Redis subscriptions,
ensuring symbols like SM, RSB, RUS and RD are renamed or annotated to reflect
gateway->matching-engine->redis flow.
In `@packages/proto-defs/proto/aggeration/v1/indicators.proto`:
- Around line 50-52: The timeframe field is defined as a free-form string which
weakens cross-service contracts; update the field in indicators.proto by
changing the type of the field named timeframe (field number 4) from string to
the shared Timeframe enum used by candle APIs, add the appropriate import for
the proto that defines Timeframe, and keep the field tag (4) unchanged so wire
compatibility is preserved; after updating the proto, run the proto generation
step to regenerate the related language bindings (look for references to
timeframe in any usages of the Indicators message to ensure callers are updated
to use the Timeframe enum values).
---
Minor comments:
In `@apps/candle-service/internal/utils/utils.go`:
- Around line 7-11: The exported struct name APIReponse is misspelled; rename it
to APIResponse and update all references/usages across the codebase (e.g.,
functions, tests, unmarshalling/typing) to use APIResponse so the typo doesn't
propagate; preserve the struct fields and JSON tags (Success, Data, Error) and
then run go vet/gofmt to ensure imports and build breakages are resolved.
In `@apps/matching-engine/SYSTEM_DESIGN.md`:
- Around line 37-39: Update the Kafka brokers entry in the table so all brokers
use full host:port notation (replace the bare ports `19093`, `19094` with
`localhost:19093`, `localhost:19094`) under the "Kafka brokers" row to make the
endpoint list unambiguous; keep the gRPC port line (`gRPC port |
localhost:50052`) unchanged.
---
Nitpick comments:
In `@apps/candle-service/cmd/candle-service/main.go`:
- Around line 39-40: Replace hardcoded endpoints and topic strings with
configurable inputs: read brokerAddresses, broker ports, and topic names from
configuration (environment variables, command-line flags, or a config file)
instead of the literal slice and string constants; for example, replace the
brokerAddresses slice and any topic constants referenced near it with values
loaded via os.Getenv or flag.* (with sensible defaults) and parse a
comma-separated broker list into a slice so deployments can override endpoints
per environment; update the variables where they are used (brokerAddresses and
the topic name identifiers found later in main.go) to use the new config values.
In `@apps/candle-service/internal/server.go`:
- Around line 72-115: Delete the entire legacy commented HTTP handler block (the
commented functions HealthCheck and (s *Server) GetCandles) from the file to
remove dead code noise; specifically remove references to HealthCheck,
GetCandles, pbAggeration.Timeframe_value, pbAggeration.Timeframe,
s.router.GetCandles and utils.APIReponse in that commented section so the gRPC
server file only contains active gRPC-related code.
In `@apps/candle-service/makefile`:
- Around line 6-21: Add standard Makefile targets and .PHONY declarations:
create a default all target that depends on build (so running make builds the
service), add a clean target that removes the built binary and build artifacts
using BUILD_DIR and APP_NAME, add a test target that runs "go test ./..." for
the module, and append a .PHONY line listing all phony targets (all, build,
clean, dev, post-install, tidy, test) so Make treats them as phony; reference
existing targets like post-install, dev, build and variables APP_NAME,
BUILD_DIR, CMD_PATH when adding these entries.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 07edf1dc-7eec-495c-b919-a9cb2c796e93
⛔ Files ignored due to path filters (8)
apps/candle-service/go.sumis excluded by!**/*.sumapps/matching-engine/go.sumis excluded by!**/*.sumapps/matching-engine/wal/BTCUSD/0.logis excluded by!**/*.logapps/matching-engine/wal/SOLUSD/0.logis excluded by!**/*.logapps/websocket-server/go.sumis excluded by!**/*.sumgo.workis excluded by!**/*.workgo.work.sumis excluded by!**/*.sumpnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (39)
apps/api-gateway/package.jsonapps/api-gateway/src/app.tsapps/candle-service/.air.tomlapps/candle-service/cmd/candle-service/main.goapps/candle-service/go.modapps/candle-service/internal/engine/router.goapps/candle-service/internal/engine/worker.goapps/candle-service/internal/kafka/consumerClient.goapps/candle-service/internal/kafka/consumerHandler.goapps/candle-service/internal/kafka/producerClient.goapps/candle-service/internal/memoryStore/candleStore.goapps/candle-service/internal/memoryStore/redis.goapps/candle-service/internal/server.goapps/candle-service/internal/utils/utils.goapps/candle-service/makefileapps/candle-service/package.jsonapps/matching-engine/SYSTEM_DESIGN.mdapps/matching-engine/go.modapps/matching-engine/wal/BTCUSD/checkpoint.metaapps/matching-engine/wal/SOLUSD/checkpoint.metaapps/order-service/src/config/dotenv.tsapps/order-trade-store-service/.env.exampleapps/order-trade-store-service/README.mdapps/order-trade-store-service/eslint.config.mjsapps/order-trade-store-service/nodemon.jsonapps/order-trade-store-service/package.jsonapps/order-trade-store-service/src/config/dotenv.tsapps/order-trade-store-service/src/constants.tsapps/order-trade-store-service/src/controllers/order.controller.tsapps/order-trade-store-service/src/controllers/trade.controller.tsapps/order-trade-store-service/src/index.tsapps/order-trade-store-service/src/kafka.consumer.tsapps/order-trade-store-service/tsconfig.jsonapps/websocket-server/go.moddocs/data-service.htmldocs/data-service.mdpackages/proto-defs/proto/aggeration/v1/candles.protopackages/proto-defs/proto/aggeration/v1/common.protopackages/proto-defs/proto/aggeration/v1/indicators.proto
💤 Files with no reviewable changes (2)
- apps/api-gateway/package.json
- apps/order-service/src/config/dotenv.ts
| func NewWorkerRouter(workerCount int, onCandleClosed memorystore.OnCandleClosedFn) *WorkerRouter { | ||
| var workers []*Worker | ||
|
|
||
| for i := range workerCount { | ||
| candleCache := memorystore.NewCandleStore(onCandleClosed) | ||
| worker := NewWorker(i, candleCache) | ||
|
|
||
| go worker.Process() | ||
|
|
||
| workers = append(workers, worker) | ||
| } | ||
|
|
||
| return &WorkerRouter{ | ||
| workers: workers, | ||
| count: workerCount, | ||
| } | ||
| } | ||
|
|
||
| func (wr *WorkerRouter) Route(event *pb.EngineEvent) { | ||
| workerIndex := int(utils.Hash(event.Symbol) % uint32(wr.count)) | ||
|
|
||
| worker := wr.workers[workerIndex] | ||
|
|
||
| worker.eventQueue <- event | ||
| } | ||
|
|
||
| func (wr *WorkerRouter) GetCandles(symbol string, timeframe pbAggeration.Timeframe) ([]*pbAggeration.Candle, error) { | ||
| workerIndex := int(utils.Hash(symbol) % uint32(wr.count)) | ||
| return wr.workers[workerIndex].candleCache.GetCandles(symbol, timeframe) | ||
| } |
There was a problem hiding this comment.
Guard against zero/negative worker counts.
At Line 34 and Line 42, modulo uses wr.count. If workerCount <= 0, routing/reads panic.
Proposed fix
func NewWorkerRouter(workerCount int, onCandleClosed memorystore.OnCandleClosedFn) *WorkerRouter {
+ if workerCount <= 0 {
+ panic("workerCount must be > 0")
+ }
var workers []*Worker🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/engine/router.go` around lines 15 - 44,
NewWorkerRouter currently allows workerCount <= 0 which leads to divide-by-zero
panics in Route and GetCandles; fix by normalizing the count on construction
(e.g., if workerCount <= 0 set workerCount = 1), ensure you allocate at least
one Worker and append to WorkerRouter.workers accordingly, and also add
defensive guards in Route and GetCandles to handle a wrapped router with no
workers (return early or no-op / an error) referencing NewWorkerRouter,
WorkerRouter.workers, WorkerRouter.count, Route, and GetCandles so modulo
operations never use zero.
| event := &pb.EngineEvent{} | ||
| proto.Unmarshal(msg.Value, event) | ||
|
|
||
| if event.EventType == common.EventType_TRADE_EXECUTED { | ||
| ch.router.Route(event) | ||
| } | ||
|
|
||
| // mark message as processed | ||
| session.MarkMessage(msg, "") | ||
| } |
There was a problem hiding this comment.
Handle protobuf unmarshal failures before acking Kafka messages.
At Line 37, proto.Unmarshal errors are ignored, and at Line 44 the message is always acknowledged. This can silently drop bad payloads and hide schema/producer issues.
Proposed fix
for msg := range claim.Messages() {
event := &pb.EngineEvent{}
- proto.Unmarshal(msg.Value, event)
+ if err := proto.Unmarshal(msg.Value, event); err != nil {
+ log.Printf("failed to unmarshal engine event (topic=%s partition=%d offset=%d): %v",
+ msg.Topic, msg.Partition, msg.Offset, err)
+ // Either mark and continue (skip poison message), or route to DLQ before marking.
+ session.MarkMessage(msg, "unmarshal_error")
+ continue
+ }
if event.EventType == common.EventType_TRADE_EXECUTED {
ch.router.Route(event)
}
// mark message as processed
session.MarkMessage(msg, "")
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/kafka/consumerHandler.go` around lines 36 - 45,
proto.Unmarshal errors are currently ignored which causes malformed messages to
be acked; update the message handling loop so that the return value of
proto.Unmarshal(msg.Value, event) is checked, and on error log the failure
(include the raw error and message metadata) and skip calling
session.MarkMessage for that message; only call session.MarkMessage(msg, "")
after a successful unmarshal and after any subsequent processing such as
ch.router.Route(event) for common.EventType_TRADE_EXECUTED so bad payloads are
not silently dropped.
| func (cache *CandleStore) getOrCreateSymbol(symbol string) *SymbolStore { | ||
| if store, exists := cache.store[symbol]; exists { | ||
| return store | ||
| } | ||
|
|
||
| store := &SymbolStore{ | ||
| current: make(map[string]*pb.Candle), | ||
| history: make(map[string][]*pb.Candle), | ||
| } | ||
|
|
||
| cache.store[symbol] = store | ||
|
|
||
| return store | ||
| } | ||
|
|
||
| func (cache *CandleStore) AddNewCandle( | ||
| symbol string, | ||
| tradeData *matchingEnigne.TradeEvent, | ||
| ) { | ||
| store := cache.getOrCreateSymbol(symbol) | ||
|
|
||
| timestamp := tradeData.Timestamp | ||
|
|
||
| for tfName, tfSeconds := range pb.Timeframe_value { | ||
| activeCandle, exists := store.current[tfName] | ||
|
|
||
| openTimeBucket := (timestamp.Seconds / int64(tfSeconds)) * int64(tfSeconds) | ||
|
|
||
| if !exists { | ||
| store.current[tfName] = cache.newCandle(tradeData, tfName, tfSeconds, openTimeBucket) | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| if activeCandle.OpenTime == openTimeBucket { | ||
| cache.updateCandle(activeCandle, tradeData) | ||
| PublishCandleEventToRedis(symbol, tfName, store.current[tfName]) | ||
| continue | ||
| } | ||
|
|
||
| //close the current candle if current trade passes the bucket time | ||
| activeCandle.IsClosed = true | ||
| store.history[tfName] = append(store.history[tfName], activeCandle) | ||
|
|
||
| store.current[tfName] = cache.newCandle(tradeData, tfName, tfSeconds, openTimeBucket) | ||
| store.history[tfName] = trim(store.history[tfName]) | ||
|
|
||
| if cache.onCandleClosed != nil { | ||
| cache.onCandleClosed(symbol, tfName, activeCandle) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (cache *CandleStore) updateCandle(candle *pb.Candle, trade *matchingEnigne.TradeEvent) { | ||
| price := float64(trade.Price / 100) | ||
|
|
||
| if candle.H < price { | ||
| candle.H = price | ||
| } | ||
|
|
||
| if candle.L > price { | ||
| candle.L = price | ||
| } | ||
|
|
||
| candle.C = price | ||
| candle.V += trade.Quantity | ||
| } | ||
|
|
||
| func (cache *CandleStore) newCandle(trade *matchingEnigne.TradeEvent, _ string, _ int32, openTimeBucket int64) *pb.Candle { | ||
| candle := &pb.Candle{ | ||
| O: float64(trade.Price / 100), | ||
| H: float64(trade.Price / 100), | ||
| L: float64(trade.Price / 100), | ||
| C: float64(trade.Price / 100), | ||
| V: trade.Quantity, | ||
| OpenTime: openTimeBucket, | ||
| } | ||
|
|
||
| return candle | ||
| } | ||
|
|
||
| func trim(candles []*pb.Candle) []*pb.Candle { | ||
|
|
||
| if len(candles) > 1000 { | ||
| return candles[1:] | ||
| } | ||
|
|
||
| return candles | ||
| } | ||
|
|
||
| func (cache *CandleStore) GetCandles( | ||
| symbol string, | ||
| timeframe pb.Timeframe, | ||
| ) ([]*pb.Candle, error) { | ||
|
|
||
| store := cache.getOrCreateSymbol(symbol) | ||
|
|
||
| tfName, ok := pb.Timeframe_name[int32(timeframe)] | ||
| if !ok { | ||
| return nil, fmt.Errorf("invalid interval") | ||
| } | ||
|
|
||
| candles := store.history[tfName] | ||
|
|
||
| return candles, nil | ||
| } |
There was a problem hiding this comment.
Protect store/history from concurrent read-write races.
AddNewCandle mutates maps/slices while GetCandles reads them, and router reads can happen concurrently with worker writes. This can cause data races and map access panics.
| for tfName, tfSeconds := range pb.Timeframe_value { | ||
| activeCandle, exists := store.current[tfName] | ||
|
|
||
| openTimeBucket := (timestamp.Seconds / int64(tfSeconds)) * int64(tfSeconds) | ||
|
|
There was a problem hiding this comment.
Skip TIMEFRAME_UNSPECIFIED to avoid divide-by-zero panics.
Line 76 divides by tfSeconds; TIMEFRAME_UNSPECIFIED is 0, which panics at runtime.
Proposed fix
for tfName, tfSeconds := range pb.Timeframe_value {
+ if tfSeconds <= 0 {
+ continue
+ }
activeCandle, exists := store.current[tfName]
openTimeBucket := (timestamp.Seconds / int64(tfSeconds)) * int64(tfSeconds)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for tfName, tfSeconds := range pb.Timeframe_value { | |
| activeCandle, exists := store.current[tfName] | |
| openTimeBucket := (timestamp.Seconds / int64(tfSeconds)) * int64(tfSeconds) | |
| for tfName, tfSeconds := range pb.Timeframe_value { | |
| if tfSeconds <= 0 { | |
| continue | |
| } | |
| activeCandle, exists := store.current[tfName] | |
| openTimeBucket := (timestamp.Seconds / int64(tfSeconds)) * int64(tfSeconds) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/memoryStore/candleStore.go` around lines 73 -
77, The loop over pb.Timeframe_value can divide by zero because
TIMEFRAME_UNSPECIFIED maps to 0; inside the for tfName, tfSeconds := range
pb.Timeframe_value loop (before computing openTimeBucket and before using
store.current), add a guard to skip entries with tfSeconds == 0 (or tfName ==
"TIMEFRAME_UNSPECIFIED") by continuing the loop so you never perform the
division; update any logic that assumes an entry was processed (e.g., handling
of activeCandle/store.current) accordingly.
| price := float64(trade.Price / 100) | ||
|
|
||
| if candle.H < price { | ||
| candle.H = price | ||
| } | ||
|
|
||
| if candle.L > price { | ||
| candle.L = price | ||
| } | ||
|
|
||
| candle.C = price | ||
| candle.V += trade.Quantity | ||
| } | ||
|
|
||
| func (cache *CandleStore) newCandle(trade *matchingEnigne.TradeEvent, _ string, _ int32, openTimeBucket int64) *pb.Candle { | ||
| candle := &pb.Candle{ | ||
| O: float64(trade.Price / 100), | ||
| H: float64(trade.Price / 100), | ||
| L: float64(trade.Price / 100), | ||
| C: float64(trade.Price / 100), |
There was a problem hiding this comment.
Price conversion currently truncates cents due to integer division.
At Line 104 and Lines 120-123, trade.Price / 100 is integer math before casting, so fractional values are lost.
Proposed fix
- price := float64(trade.Price / 100)
+ price := float64(trade.Price) / 100.0
@@
- O: float64(trade.Price / 100),
- H: float64(trade.Price / 100),
- L: float64(trade.Price / 100),
- C: float64(trade.Price / 100),
+ O: float64(trade.Price) / 100.0,
+ H: float64(trade.Price) / 100.0,
+ L: float64(trade.Price) / 100.0,
+ C: float64(trade.Price) / 100.0,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| price := float64(trade.Price / 100) | |
| if candle.H < price { | |
| candle.H = price | |
| } | |
| if candle.L > price { | |
| candle.L = price | |
| } | |
| candle.C = price | |
| candle.V += trade.Quantity | |
| } | |
| func (cache *CandleStore) newCandle(trade *matchingEnigne.TradeEvent, _ string, _ int32, openTimeBucket int64) *pb.Candle { | |
| candle := &pb.Candle{ | |
| O: float64(trade.Price / 100), | |
| H: float64(trade.Price / 100), | |
| L: float64(trade.Price / 100), | |
| C: float64(trade.Price / 100), | |
| price := float64(trade.Price) / 100.0 | |
| if candle.H < price { | |
| candle.H = price | |
| } | |
| if candle.L > price { | |
| candle.L = price | |
| } | |
| candle.C = price | |
| candle.V += trade.Quantity | |
| } | |
| func (cache *CandleStore) newCandle(trade *matchingEnigne.TradeEvent, _ string, _ int32, openTimeBucket int64) *pb.Candle { | |
| candle := &pb.Candle{ | |
| O: float64(trade.Price) / 100.0, | |
| H: float64(trade.Price) / 100.0, | |
| L: float64(trade.Price) / 100.0, | |
| C: float64(trade.Price) / 100.0, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/memoryStore/candleStore.go` around lines 104 -
123, The price conversion uses integer division causing truncation; change the
conversions so you cast trade.Price to float64 before dividing (fix the price :=
... assignment and the O/H/L/C initializations in newCandle) so you compute the
true decimal price (e.g., cast trade.Price to float64 then divide by 100) and
update all usages (price, O, H, L, C) accordingly; keep trade.Quantity handling
unchanged.
|
|
||
| func InitRedis() error { | ||
| once.Do(func() { | ||
| opt, err := redis.ParseURL("rediss://default:gQAAAAAAATZNAAIgcDEyNDllZDYyYWM5ZTk0NTgo:6379") |
There was a problem hiding this comment.
Remove hardcoded Redis credentials from source.
Line 24 commits a live credential-like Redis URL directly in code. This is a secret leakage and rotation/compliance risk.
Proposed fix
import (
"context"
+ "errors"
"fmt"
"log/slog"
+ "os"
"strings"
"sync"
@@
once.Do(func() {
- opt, err := redis.ParseURL("rediss://default:gQAAAAAAATZNAAIgcDEyNDllZDYyYWM5ZTk0NTgo:6379")
+ redisURL := os.Getenv("REDIS_URL")
+ if redisURL == "" {
+ initErr = errors.New("REDIS_URL is empty")
+ return
+ }
+ opt, err := redis.ParseURL(redisURL)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| opt, err := redis.ParseURL("rediss://default:gQAAAAAAATZNAAIgcDEyNDllZDYyYWM5ZTk0NTgo:6379") | |
| import ( | |
| "context" | |
| "errors" | |
| "fmt" | |
| "log/slog" | |
| "os" | |
| "strings" | |
| "sync" | |
| // ... other imports | |
| ) | |
| // In the initialization function: | |
| once.Do(func() { | |
| redisURL := os.Getenv("REDIS_URL") | |
| if redisURL == "" { | |
| initErr = errors.New("REDIS_URL is empty") | |
| return | |
| } | |
| opt, err := redis.ParseURL(redisURL) | |
| // ... rest of initialization | |
| }) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/memoryStore/redis.go` at line 24, The Redis URL
is hardcoded in the call to redis.ParseURL (opt, err := redis.ParseURL(...))
which leaks credentials; replace this by reading the Redis connection URL from
configuration or an environment variable (e.g., REDIS_URL) and pass that value
into redis.ParseURL, validate that the env/config value exists and return/log an
error if missing, and remove the literal credential string from the source so
secrets are not committed.
| func PublishCandleEventToRedis(symbol string, timeframe string, candle *pb.Candle) { | ||
| if err := RedisClient.Publish(ctx, candleKey(symbol, timeframe), candle).Err(); err != nil { | ||
| slog.Error("failed to pulish candle event", "error", err) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In go-redis/v9, when calling Client.Publish(ctx, channel, value) with a Go struct (e.g., protobuf message), how is valueserialized? Is explicitproto.Marshalto[]byte required for interoperable binary payloads?
💡 Result:
In go-redis/v9, Client.Publish(ctx, channel, value) ultimately sends Redis PUB/SUB “message” payload as the command argument corresponding to value. go-redis does not have any special Protobuf-aware serialization for structs passed as value. 1) What happens when you pass a Go struct? - Publish’s signature is Publish(ctx, channel string, message interface{}) *IntCmd, so it accepts any Go value. - That value is passed as a Redis command argument to the internal command builder (NewIntCmd(ctx, "publish", channel, message)). - During command encoding, go-redis converts non-string basic types to their protocol representation; for arbitrary structs it uses reflection/generic formatting rules rather than “protobuf binary”. This means your struct is not automatically converted to protobuf wire format bytes. 2) What does a subscriber receive? - Redis Pub/Sub transports the payload as a Redis bulk string (RESP). On the receiving side, go-redis’s Message payload is exposed as either string or []string, i.e., it expects payloads to be text-like bulk strings. - The Redis server itself treats the PUBLISH payload as an opaque byte sequence for the receiving client, but go-redis’s Pub/Sub receive API maps it to strings. 3) Is explicit proto.Marshal required for interoperable binary payloads? - Yes, if you want interoperable protobuf binary payloads (the exact protobuf wire format), you should explicitly marshal your protobuf message to []byte using proto.Marshal and publish those bytes. - Otherwise, passing the struct directly will not produce a standard protobuf wire-format []byte stream that other languages/systems can reliably decode. Practical recommendation: - For protobuf interoperability across services/languages: protoBytes, _ := proto.Marshal(msg); rdb.Publish(ctx, "channel", protoBytes) - Then subscribers do the reverse: var out MyProto; proto.Unmarshal(receivedBytes, &out). Notes: - If you publish a protobuf-marshaled []byte, make sure your subscriber reads the payload as raw bytes (go-redis’s Pub/Sub API currently exposes payload as string / []string in Message; you may need to use a lower-level/raw receive approach or ensure encoding/decoding is consistent).
Citations:
- 1: https://github.com/redis/go-redis/blob/master/pubsub_commands.go
- 2: https://github.com/redis/go-redis/blob/v9.6.1/pubsub.go
- 3: https://redis.io/docs/latest/develop/pubsub/
🏁 Script executed:
# First, explore the repo structure
find apps/candle-service -type f -name "*.go" | head -20Repository: sameerkrdev/nerve
Length of output: 549
🏁 Script executed:
# Search for Redis Subscribe calls or consumers of the candleKey
rg "Subscribe|SUBSCRIBE|candleKey" apps/candle-service --type go -A 3 -B 1Repository: sameerkrdev/nerve
Length of output: 1833
🏁 Script executed:
# Check how candle events are consumed - look for unmarshaling or deserialization
rg "proto\.Unmarshal|json\.Unmarshal|candle|Candle" apps/candle-service --type go -A 2 -B 1 | head -100Repository: sameerkrdev/nerve
Length of output: 6837
🏁 Script executed:
# Look at the full PublishCandleEventToRedis function and surrounding code
cat apps/candle-service/internal/memoryStore/redis.goRepository: sameerkrdev/nerve
Length of output: 2259
🏁 Script executed:
# Search for calls to PublishCandleEventToRedis
rg "PublishCandleEventToRedis" apps/candle-service --type go -B 2 -A 2Repository: sameerkrdev/nerve
Length of output: 1014
🏁 Script executed:
# Search for Redis Pub/Sub subscription patterns
rg "Subscribe|PSubscribe" apps/candle-service --type go -B 2 -A 5Repository: sameerkrdev/nerve
Length of output: 43
🏁 Script executed:
# Check worker.go for how candles are processed and published
cat apps/candle-service/internal/engine/worker.goRepository: sameerkrdev/nerve
Length of output: 823
🏁 Script executed:
# Search entire repo for Redis Subscribe patterns
rg "Subscribe.*candle|candleKey|candles:" --type go -B 2 -A 3Repository: sameerkrdev/nerve
Length of output: 2093
🏁 Script executed:
# Look for any websocket server files that might consume Redis Pub/Sub
find . -type f -name "*.go" -path "*/websocket*" -o -path "*/ws*" | head -10Repository: sameerkrdev/nerve
Length of output: 158
🏁 Script executed:
# Search for any Redis Pub/Sub subscription in entire repo
rg "PSubscribe|\.Subscribe\(" --type go --max-count 20Repository: sameerkrdev/nerve
Length of output: 43
🏁 Script executed:
# Check websocket server for Redis subscriptions
cat apps/websocket-server/internal/gateway.goRepository: sameerkrdev/nerve
Length of output: 10617
🏁 Script executed:
# Also check the websocket main file
cat apps/websocket-server/cmd/websocket-server/main.goRepository: sameerkrdev/nerve
Length of output: 565
🏁 Script executed:
# Search for Subscribe in websocket server
rg "Subscribe|Publish|candle" apps/websocket-server --type go -B 2 -A 3Repository: sameerkrdev/nerve
Length of output: 2197
Marshal protobuf messages before publishing to Redis.
The candle struct passed to Publish at line 92 is not automatically serialized to protobuf wire format. This is inconsistent with PushCandle (which correctly uses proto.Marshal) and breaks interoperability with any subscribers expecting binary protobuf payloads. Explicit serialization via proto.Marshal(candle) is required.
Proposed fix
func PublishCandleEventToRedis(symbol string, timeframe string, candle *pb.Candle) {
- if err := RedisClient.Publish(ctx, candleKey(symbol, timeframe), candle).Err(); err != nil {
+ data, err := proto.Marshal(candle)
+ if err != nil {
+ slog.Error("failed to marshal candle event", "error", err)
+ return
+ }
+ if err := RedisClient.Publish(ctx, candleKey(symbol, timeframe), data).Err(); err != nil {
slog.Error("failed to pulish candle event", "error", err)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func PublishCandleEventToRedis(symbol string, timeframe string, candle *pb.Candle) { | |
| if err := RedisClient.Publish(ctx, candleKey(symbol, timeframe), candle).Err(); err != nil { | |
| slog.Error("failed to pulish candle event", "error", err) | |
| func PublishCandleEventToRedis(symbol string, timeframe string, candle *pb.Candle) { | |
| data, err := proto.Marshal(candle) | |
| if err != nil { | |
| slog.Error("failed to marshal candle event", "error", err) | |
| return | |
| } | |
| if err := RedisClient.Publish(ctx, candleKey(symbol, timeframe), data).Err(); err != nil { | |
| slog.Error("failed to pulish candle event", "error", err) | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/memoryStore/redis.go` around lines 91 - 93,
PublishCandleEventToRedis is publishing the protobuf struct directly
(RedisClient.Publish with candle) instead of a marshaled byte slice; change it
to marshal the pb.Candle using proto.Marshal(candle), handle the marshaling
error, and pass the resulting []byte to RedisClient.Publish (similar to
PushCandle), ensuring error paths log the marshal error and the Publish error
using the same slog.Error pattern.
…ice (trade-ingestor-service) to store the trades data into clickhouse
There was a problem hiding this comment.
Actionable comments posted: 17
🧹 Nitpick comments (5)
apps/candle-service/cmd/candle-service/main.go (1)
39-67: Move broker/port settings to runtime config (env/flags).Hardcoded infra endpoints (Lines 39 and 65) make this service brittle across dev/staging/prod and harder to operate safely. Please load these from env vars (with sane defaults) and validate at startup.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/cmd/candle-service/main.go` around lines 39 - 67, Replace hardcoded brokerAddresses and PORT with runtime-configured values: read broker list from an env var or flag (e.g., KAFKA_BROKERS, comma-separated) and the gRPC port from an env var or flag (e.g., PORT or GRPC_PORT) with sensible defaults; parse and validate the broker addresses before calling kafka.InitKafkaProducer and kafka.NewKafkaConsumerClient and validate the port is numeric/non-empty before calling net.Listen; update initialization sites (brokerAddresses variable, calls to kafka.InitKafkaProducer and kafka.NewKafkaConsumerClient, and the PORT variable used with net.Listen) to use the parsed config and return clear startup errors if validation fails.apps/candle-service/internal/clickhouse/candleRepository.go (1)
3-5: Avoid exporting a silent no-op repository method.Line 3 currently exposes
FetchCandleswith no return value and no behavior. If this gets wired accidentally, failures are silent. Please give it a real contract ((..., error)) and return an explicit “not implemented” error until ready.Proposed change
+import "errors" + -func FetchCandles() { - -} +func FetchCandles() error { + return errors.New("clickhouse FetchCandles not implemented") +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/candle-service/internal/clickhouse/candleRepository.go` around lines 3 - 5, The exported no-op FetchCandles function must be given a real contract and return an explicit "not implemented" error; update the function signature for FetchCandles to include an error return (e.g., FetchCandles(...) error), import/use the standard errors or fmt package, and have the body immediately return a descriptive not-implemented error (for example errors.New("FetchCandles: not implemented")). Also update any callers to handle the returned error so failures are not silent.apps/trade-ingestor-service/makefile (1)
6-21: Consider declaring phony targets.These are command targets, not files. Declaring
.PHONYavoids accidental no-op behavior when same-named files exist.Suggested fix
+.PHONY: post-install dev build tidy + post-install: `@echo` "➡ Generating go.mod for proto-generated code..."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/trade-ingestor-service/makefile` around lines 6 - 21, The Makefile defines command targets (post-install, dev, build, tidy) but doesn't declare them as phony; add a .PHONY declaration listing post-install dev build tidy (and any other non-file targets) so make won't confuse them with same-named files and the rules (post-install, dev, build, tidy) always run as commands.apps/trade-ingestor-service/internal/clickhouse/tradeRepository.go (1)
23-24: Remove pointers from interface types in struct fields.
sarama.ConsumerGroupSessionanddriver.Connare both interfaces. In Go, interface values should be used directly rather than as pointers; pointers to interfaces add unnecessary indirection and are non-idiomatic. Change the field declarations to use the interface types directly:type TradeBatcher struct { ch chan BatchItem buffer []BatchItem maxSize int flushTime time.Duration - kafkaClient *sarama.ConsumerGroupSession - clickhouseClient *driver.Conn + kafkaClient sarama.ConsumerGroupSession + clickhouseClient driver.Conn }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/trade-ingestor-service/internal/clickhouse/tradeRepository.go` around lines 23 - 24, The struct fields kafkaClient and clickhouseClient are declared as pointer-to-interface types (*sarama.ConsumerGroupSession and *driver.Conn) which is non-idiomatic; change their declarations to the interface types directly (sarama.ConsumerGroupSession and driver.Conn) and update any assignments, method receivers, constructors (e.g., NewTradeRepository or functions that set kafkaClient/clickhouseClient) and usages to pass/store the interface value rather than a pointer to it so types match throughout the codebase.apps/trade-ingestor-service/internal/kafka/kafka.go (1)
18-39: Returnsarama.ConsumerGroupdirectly instead of*sarama.ConsumerGroup.Pointer-to-interface is non-idiomatic in Go and adds unnecessary indirection. Since
sarama.ConsumerGroupis an interface type, it is already a reference type. The only caller at line 28 ofmain.godiscards the return value, so this change is safe and improves code clarity.Suggested fix
-func InitKafkaConsumerClient(brokers []string) (*sarama.ConsumerGroup, error) { +func InitKafkaConsumerClient(brokers []string) (sarama.ConsumerGroup, error) { @@ - return &KafkaConsumerClient, initErr + return KafkaConsumerClient, initErr }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/trade-ingestor-service/internal/kafka/kafka.go` around lines 18 - 39, The function InitKafkaConsumerClient currently returns *sarama.ConsumerGroup and assigns a pointer to KafkaConsumerClient; change the API to use sarama.ConsumerGroup (non-pointer) everywhere: update the KafkaConsumerClient variable type to sarama.ConsumerGroup, have InitKafkaConsumerClient return (sarama.ConsumerGroup, error) and assign conn (the sarama.NewConsumerGroup result) directly to KafkaConsumerClient, and return KafkaConsumerClient and initErr; update any callers (e.g., the call in main.go) to accept the interface return rather than a pointer.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/candle-service/cmd/candle-service/main.go`:
- Around line 75-76: The gRPC helper NewGrpcServer currently hard-exits on Serve
failure; change it to return a server and a serve-starting function/error
channel (e.g., return *grpc.Server and net.Listener or a Start/Serve method that
returns error) instead of calling log.Fatalf inside internal.NewGrpcServer (or
its Serve call), and remove any direct os.Exit/log.Fatalf; then update main to
call the returned Serve (or listen on the error channel) and handle errors
through main's coordinated shutdown logic (invoke graceful stop on the returned
*grpc.Server and propagate/log the error centrally).
- Around line 61-87: The graceful shutdown currently cancels the context but
does not wait for the Kafka consumer goroutine or explicitly shut down any Kafka
producer/flush path, so in-flight messages can be lost; change the shutdown to
create and track the consumer goroutine with a sync.WaitGroup (or a returned
done channel) so the main goroutine waits for kafkaConsumerClient.Consume(ctx,
topics, kafkaConsumerHandler) to return before calling
grpcServer.GracefulStop(), and add explicit shutdown/flush calls for your Kafka
client (e.g., kafkaConsumerClient.Close() or producer.Flush/Close) after
cancel() and before exiting; ensure kafkaConsumerClient.Consume,
kafkaConsumerClient.Close (or the producer Flush/Close) are invoked in that
order and the WaitGroup/done is awaited.
In `@apps/candle-service/internal/clickhouse/clickhouse.go`:
- Around line 52-54: InsertRawTrade is exported but currently a silent no-op;
change its signature to return an error (e.g., func InsertRawTrade(...) error)
and have the body immediately return a clear not-implemented error (for example
fmt.Errorf("InsertRawTrade: not implemented")) so callers cannot accidentally
drop writes; update any callers of InsertRawTrade to handle the returned error
(propagate or log) until a real implementation is provided.
- Around line 24-33: The code currently hard-codes the ClickHouse address and
credentials inside the clickhouse.Open call (the Addr slice and clickhouse.Auth
with Username/Password) and even sets TLS.InsecureSkipVerify; replace these
literal values by reading them from configuration or environment variables
(e.g., CLICKHOUSE_ADDR, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD,
CLICKHOUSE_TLS_INSECURE flag) and wire them into the clickhouse.Open parameters;
validate that required config values exist at startup and return/fail fast with
a clear error if any are missing, and avoid defaulting to
InsecureSkipVerify=true unless explicitly configured.
- Around line 26-28: The TLS config currently sets InsecureSkipVerify: true
which disables certificate validation; update the TLS configuration (the TLS:
&tls.Config{...} block) to set InsecureSkipVerify: false (or remove it) and add
MinVersion: tls.VersionTLS13 to enforce TLS 1.3; ensure this change is applied
where the ClickHouse connection options are built (the TLS: &tls.Config literal
in clickhouse.go) and, if necessary, load/verify the system or provided RootCAs
so certificate validation succeeds.
- Around line 19-20: The NewClickhouseClient function currently uses the
package-level once (sync.Once) which permanently caches initialization
success/failure; either remove this unused dead function entirely or refactor it
so initialization errors are not permanently cached: replace the sync.Once
pattern in NewClickhouseClient with a guarded lazy-init that stores the active
driver.Conn and any lastErr behind a mutex (e.g., check if conn != nil return
it; if not, lock and attempt to create a new connection, updating conn and
lastErr so subsequent calls can retry on failure), and remove/stop referencing
the once variable; locate symbols NewClickhouseClient and once to apply the
change.
In `@apps/trade-ingestor-service/cmd/trade-ingestor-service/main.go`:
- Around line 27-28: Replace the hardcoded brokerAddresses slice with a
configurable source: read broker list from environment or config and pass it to
kafka.InitKafkaConsumerClient; specifically update the variable brokerAddresses
(and any call sites using kafka.InitKafkaConsumerClient) to load a
comma-separated KAFKA_BROKERS env var (or from your config loader) and
split/validate it before calling kafka.InitKafkaConsumerClient so non-local
deployments can supply real broker endpoints.
- Around line 40-47: The main function currently launches kafka.Consume(ctx,
topics, consumerHandler) in a goroutine and immediately exits after receiving a
signal, which can terminate cleanup prematurely; wrap the call in a waiter
(e.g., sync.WaitGroup or a done channel) and ensure you cancel the context and
wait for the consumer goroutine to finish before exiting. Concretely: create a
WaitGroup or done chan, increment before starting the goroutine that calls
kafka.Consume(ctx, topics, consumerHandler), defer or signal completion when
kafka.Consume returns, call cancel() on the context after receiving from quit,
then call wg.Wait() or block on done before logging "shutting down..." and
exiting.
In `@apps/trade-ingestor-service/internal/clickhouse/clickhouse.go`:
- Around line 39-41: The fmt.Printf call that prints ClickHouse exception stack
traces (the err handling branch checking err.(*clickhouse.Exception) and using
fmt.Printf) should be replaced with structured logging that does not emit the
raw StackTrace; update the error handling in clickhouse.go to log only sanitized
fields (e.g., exception.Code and exception.Message) via the service's logger
(avoid printing exception.StackTrace to stdout), and ensure the code returns or
wraps the error instead of leaking internals to fmt.Printf.
- Around line 24-31: The code passes possibly empty CLICKHOUSE_ADDR,
CLICKHOUSE_USER, and CLICKHOUSE_PASSWORD into clickhouse.Open (creating conn and
initErr) and defers failure to ping/open; add upfront validation that
os.Getenv("CLICKHOUSE_ADDR"), os.Getenv("CLICKHOUSE_USER"), and
os.Getenv("CLICKHOUSE_PASSWORD") are non-empty and return a clear configuration
error before calling clickhouse.Open; modify the function that constructs the
client (the block that sets conn, initErr and calls clickhouse.Open) to check
these env values, return or set initErr with a descriptive message like "missing
clickhouse config: CLICKHOUSE_ADDR/USER/PASSWORD" when any are empty, and only
call clickhouse.Open when validation passes.
- Line 27: The TLS configuration uses an empty tls.Config{} which allows older
TLS versions; update the tls.Config in the ClickHouse client setup (the struct
literal where TLS is set) to include MinVersion: tls.VersionTLS13 so the client
enforces TLS 1.3 minimum (i.e., replace tls.Config{} with a tls.Config that sets
MinVersion to tls.VersionTLS13).
In `@apps/trade-ingestor-service/internal/clickhouse/tradeRepository.go`:
- Around line 27-29: Replace the empty silent Insert() with a real skeleton:
change signature to Insert(ctx context.Context, items []BatchItem) error,
validate that ctx is non-nil and items is non-empty (return a wrapped error like
fmt.Errorf("invalid args") or specific errors for those cases), and for the
unimplemented body return a sentinel ErrNotImplemented (define var
ErrNotImplemented = errors.New("Insert: not implemented") near the top of the
package); reference the Insert function and BatchItem type so callers get an
error instead of silently dropping work.
In `@apps/trade-ingestor-service/internal/kafka/consumerHandler.go`:
- Line 27: Logging the full message payload with slog.Info("message consumed",
..., "value", string(msg.Value)) risks leaking sensitive trade data and bloating
logs; change the consumerHandler.go usage of slog.Info to omit or redact
msg.Value (e.g., log only metadata like msg.Topic, msg.Partition, msg.Offset and
either a boolean/hashed indicator or truncated/obfuscated payload length) so the
call to slog.Info no longer includes string(msg.Value) but instead a safe field
such as "payload_present": true, "payload_size": len(msg.Value) or a
hashed/trimmed token.
- Around line 29-30: ConsumeClaim currently calls session.MarkMessage(msg, "")
before persisting; move the call so offsets are marked only after a successful
write to ClickHouse. Update the ConsumeClaim handler to: 1) deserialize the
message and call the trade persistence routine (e.g., your ClickHouse insert
function), 2) handle and retry transient failures (or return an error) and only
call session.MarkMessage(msg, "") after the insert succeeds, and 3) log and
avoid marking on permanent failures so the message can be retried; ensure the
insert routine is idempotent or deduplicates to prevent double-processing on
retries.
In `@apps/trade-ingestor-service/internal/kafka/kafka.go`:
- Around line 54-57: The consumer loop currently immediately retries on Consume
errors causing hot-looping; modify the loop around
KafkaConsumerClient.Consume(ctx, topics, handler) to implement an exponential
backoff with a cap (e.g., start 500ms, double up to ~30s) and jitter, reset the
backoff when Consume returns nil, and respect ctx cancellation so Sleep can be
aborted; update the slog.Error to include the retry delay; reference
KafkaConsumerClient.Consume, ctx, topics, and handler when applying the backoff
logic.
- Around line 48-52: The consumer Error handler goroutine reads from
KafkaConsumerClient.Errors() but the Sarama consumer config never enables error
emission; set config.Consumer.Return.Errors = true on the Sarama Config instance
used to create the consumer (the same config passed when building the
ConsumerGroup/KafkaConsumerClient) so the Errors() channel will actually emit
errors to be handled by the goroutine; update the config initialization where
the consumer is created to set this flag before creating KafkaConsumerClient.
In `@apps/trade-ingestor-service/makefile`:
- Around line 15-17: The Makefile's build target fails on clean environments
because $(BUILD_DIR) may not exist; modify the build target (target name
"build") to ensure the directory $(BUILD_DIR) is created before running go build
(e.g., invoke mkdir -p $(BUILD_DIR) or equivalent) so that the subsequent go
build -o $(BUILD_DIR)/$(APP_NAME) $(CMD_PATH) succeeds; keep the existing
variables APP_NAME and CMD_PATH and only add the directory creation step
immediately before the go build invocation.
---
Nitpick comments:
In `@apps/candle-service/cmd/candle-service/main.go`:
- Around line 39-67: Replace hardcoded brokerAddresses and PORT with
runtime-configured values: read broker list from an env var or flag (e.g.,
KAFKA_BROKERS, comma-separated) and the gRPC port from an env var or flag (e.g.,
PORT or GRPC_PORT) with sensible defaults; parse and validate the broker
addresses before calling kafka.InitKafkaProducer and
kafka.NewKafkaConsumerClient and validate the port is numeric/non-empty before
calling net.Listen; update initialization sites (brokerAddresses variable, calls
to kafka.InitKafkaProducer and kafka.NewKafkaConsumerClient, and the PORT
variable used with net.Listen) to use the parsed config and return clear startup
errors if validation fails.
In `@apps/candle-service/internal/clickhouse/candleRepository.go`:
- Around line 3-5: The exported no-op FetchCandles function must be given a real
contract and return an explicit "not implemented" error; update the function
signature for FetchCandles to include an error return (e.g., FetchCandles(...)
error), import/use the standard errors or fmt package, and have the body
immediately return a descriptive not-implemented error (for example
errors.New("FetchCandles: not implemented")). Also update any callers to handle
the returned error so failures are not silent.
In `@apps/trade-ingestor-service/internal/clickhouse/tradeRepository.go`:
- Around line 23-24: The struct fields kafkaClient and clickhouseClient are
declared as pointer-to-interface types (*sarama.ConsumerGroupSession and
*driver.Conn) which is non-idiomatic; change their declarations to the interface
types directly (sarama.ConsumerGroupSession and driver.Conn) and update any
assignments, method receivers, constructors (e.g., NewTradeRepository or
functions that set kafkaClient/clickhouseClient) and usages to pass/store the
interface value rather than a pointer to it so types match throughout the
codebase.
In `@apps/trade-ingestor-service/internal/kafka/kafka.go`:
- Around line 18-39: The function InitKafkaConsumerClient currently returns
*sarama.ConsumerGroup and assigns a pointer to KafkaConsumerClient; change the
API to use sarama.ConsumerGroup (non-pointer) everywhere: update the
KafkaConsumerClient variable type to sarama.ConsumerGroup, have
InitKafkaConsumerClient return (sarama.ConsumerGroup, error) and assign conn
(the sarama.NewConsumerGroup result) directly to KafkaConsumerClient, and return
KafkaConsumerClient and initErr; update any callers (e.g., the call in main.go)
to accept the interface return rather than a pointer.
In `@apps/trade-ingestor-service/makefile`:
- Around line 6-21: The Makefile defines command targets (post-install, dev,
build, tidy) but doesn't declare them as phony; add a .PHONY declaration listing
post-install dev build tidy (and any other non-file targets) so make won't
confuse them with same-named files and the rules (post-install, dev, build,
tidy) always run as commands.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: f70396ac-4818-4a21-a21c-0e0064c4ff64
⛔ Files ignored due to path filters (6)
apps/candle-service/go.sumis excluded by!**/*.sumapps/matching-engine/go.sumis excluded by!**/*.sumapps/trade-ingestor-service/go.sumis excluded by!**/*.sumapps/websocket-server/go.sumis excluded by!**/*.sumgo.workis excluded by!**/*.workgo.work.sumis excluded by!**/*.sum
📒 Files selected for processing (17)
apps/candle-service/cmd/candle-service/main.goapps/candle-service/go.modapps/candle-service/internal/clickhouse/candleRepository.goapps/candle-service/internal/clickhouse/clickhouse.goapps/matching-engine/go.modapps/order-trade-store-service/package.jsonapps/trade-ingestor-service/.air.tomlapps/trade-ingestor-service/.env.exampleapps/trade-ingestor-service/cmd/trade-ingestor-service/main.goapps/trade-ingestor-service/go.modapps/trade-ingestor-service/internal/clickhouse/clickhouse.goapps/trade-ingestor-service/internal/clickhouse/tradeRepository.goapps/trade-ingestor-service/internal/kafka/consumerHandler.goapps/trade-ingestor-service/internal/kafka/kafka.goapps/trade-ingestor-service/makefileapps/trade-ingestor-service/package.jsonapps/websocket-server/go.mod
✅ Files skipped from review due to trivial changes (5)
- apps/trade-ingestor-service/package.json
- apps/trade-ingestor-service/.air.toml
- apps/candle-service/go.mod
- apps/trade-ingestor-service/go.mod
- apps/order-trade-store-service/package.json
🚧 Files skipped from review as they are similar to previous changes (2)
- apps/websocket-server/go.mod
- apps/matching-engine/go.mod
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
| go kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler) | ||
|
|
||
| PORT := "50054" | ||
|
|
||
| listener, err := net.Listen("tcp", ":"+PORT) | ||
| if err != nil { | ||
| slog.Error("net server failed", "error", err) | ||
| os.Exit(1) | ||
| } | ||
|
|
||
| slog.Info("Net server listening", "port", PORT) | ||
|
|
||
| grpcServer := internal.NewGrpcServer(workerRouter, listener) | ||
|
|
||
| // graceful shutdown | ||
| quit := make(chan os.Signal, 1) | ||
| signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) | ||
|
|
||
| <-quit | ||
|
|
||
| slog.Info("shutting down...") | ||
|
|
||
| cancel() | ||
| grpcServer.GracefulStop() | ||
| } |
There was a problem hiding this comment.
Graceful shutdown is incomplete for async Kafka flow.
Line 63 starts consumption in a goroutine, but Lines 85-87 exit without waiting for that goroutine and without an explicit producer shutdown/flush path. This can lose in-flight events during SIGTERM.
Suggested direction
+import "sync"
...
-ctx, cancel := context.WithCancel(context.Background())
+ctx, cancel := context.WithCancel(context.Background())
defer cancel()
-go kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler)
+var consumerWG sync.WaitGroup
+consumerWG.Add(1)
+go func() {
+ defer consumerWG.Done()
+ kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler)
+}()
...
cancel()
+consumerWG.Wait()
+// call kafka producer shutdown helper here (flush/close) before process exits
grpcServer.GracefulStop()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ctx, cancel := context.WithCancel(context.Background()) | |
| defer cancel() | |
| go kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler) | |
| PORT := "50054" | |
| listener, err := net.Listen("tcp", ":"+PORT) | |
| if err != nil { | |
| slog.Error("net server failed", "error", err) | |
| os.Exit(1) | |
| } | |
| slog.Info("Net server listening", "port", PORT) | |
| grpcServer := internal.NewGrpcServer(workerRouter, listener) | |
| // graceful shutdown | |
| quit := make(chan os.Signal, 1) | |
| signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) | |
| <-quit | |
| slog.Info("shutting down...") | |
| cancel() | |
| grpcServer.GracefulStop() | |
| } | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| defer cancel() | |
| var consumerWG sync.WaitGroup | |
| consumerWG.Add(1) | |
| go func() { | |
| defer consumerWG.Done() | |
| kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler) | |
| }() | |
| PORT := "50054" | |
| listener, err := net.Listen("tcp", ":"+PORT) | |
| if err != nil { | |
| slog.Error("net server failed", "error", err) | |
| os.Exit(1) | |
| } | |
| slog.Info("Net server listening", "port", PORT) | |
| grpcServer := internal.NewGrpcServer(workerRouter, listener) | |
| // graceful shutdown | |
| quit := make(chan os.Signal, 1) | |
| signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) | |
| <-quit | |
| slog.Info("shutting down...") | |
| cancel() | |
| consumerWG.Wait() | |
| // call kafka producer shutdown helper here (flush/close) before process exits | |
| grpcServer.GracefulStop() | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/cmd/candle-service/main.go` around lines 61 - 87, The
graceful shutdown currently cancels the context but does not wait for the Kafka
consumer goroutine or explicitly shut down any Kafka producer/flush path, so
in-flight messages can be lost; change the shutdown to create and track the
consumer goroutine with a sync.WaitGroup (or a returned done channel) so the
main goroutine waits for kafkaConsumerClient.Consume(ctx, topics,
kafkaConsumerHandler) to return before calling grpcServer.GracefulStop(), and
add explicit shutdown/flush calls for your Kafka client (e.g.,
kafkaConsumerClient.Close() or producer.Flush/Close) after cancel() and before
exiting; ensure kafkaConsumerClient.Consume, kafkaConsumerClient.Close (or the
producer Flush/Close) are invoked in that order and the WaitGroup/done is
awaited.
| grpcServer := internal.NewGrpcServer(workerRouter, listener) | ||
|
|
There was a problem hiding this comment.
gRPC serve errors are not surfaced to main for controlled handling.
At Line 75, server start is delegated to internal.NewGrpcServer(...), but that helper currently hard-exits via log.Fatalf on serve failure (apps/candle-service/internal/server.go:31). This bypasses coordinated shutdown logic in main.
Prefer returning serve errors (e.g., via channel or explicit Serve call in main) and handling termination policy centrally.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/cmd/candle-service/main.go` around lines 75 - 76, The
gRPC helper NewGrpcServer currently hard-exits on Serve failure; change it to
return a server and a serve-starting function/error channel (e.g., return
*grpc.Server and net.Listener or a Start/Serve method that returns error)
instead of calling log.Fatalf inside internal.NewGrpcServer (or its Serve call),
and remove any direct os.Exit/log.Fatalf; then update main to call the returned
Serve (or listen on the error channel) and handle errors through main's
coordinated shutdown logic (invoke graceful stop on the returned *grpc.Server
and propagate/log the error centrally).
| func NewClickhouseClient(ctx context.Context) (driver.Conn, error) { | ||
| once.Do(func() { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate all call sites and surrounding context
rg -nP --type go -C3 '\bNewClickhouseClient\s*\('
# Show whether request-scoped contexts are forwarded from RPC handlers
rg -nP --type go -C3 'GetCandles\(|context\.Context|NewClickhouseClient\('Repository: sameerkrdev/nerve
Length of output: 10568
🏁 Script executed:
cat -n apps/candle-service/internal/clickhouse/clickhouse.goRepository: sameerkrdev/nerve
Length of output: 1542
🏁 Script executed:
rg -n 'NewClickhouseClient' apps/candle-service/ -B 5 -A 5Repository: sameerkrdev/nerve
Length of output: 1048
🏁 Script executed:
rg -n 'clickhouse\.' apps/candle-service/ --type go | head -20Repository: sameerkrdev/nerve
Length of output: 625
🏁 Script executed:
rg -n 'import.*clickhouse|NewClickhouseClient' apps/candle-service/ --type goRepository: sameerkrdev/nerve
Length of output: 187
🏁 Script executed:
cat -n apps/trade-ingestor-service/cmd/trade-ingestor-service/main.go | head -30Repository: sameerkrdev/nerve
Length of output: 1211
🏁 Script executed:
cat -n apps/trade-ingestor-service/internal/clickhouse/clickhouse.goRepository: sameerkrdev/nerve
Length of output: 1471
🏁 Script executed:
rg -n 'from.*clickhouse|internal/clickhouse' apps/candle-service/ --type goRepository: sameerkrdev/nerve
Length of output: 174
sync.Once caches initialization errors permanently with no retry path.
While the sync.Once pattern technically caches errors, the actual risk is lower than a typical transient failure scenario. NewClickhouseClient is called during app startup with an app-level context (not a request-scoped one), and failure triggers immediate exit. However, this pattern can become problematic if the function is later called from request handlers or used in contexts where retries are needed. Additionally, NewClickhouseClient in candle-service is not currently used anywhere in the codebase—it appears to be dead code copied from trade-ingestor-service.
Consider either removing the unused function or refactoring the initialization to support retries if future usage requires it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/clickhouse/clickhouse.go` around lines 19 - 20,
The NewClickhouseClient function currently uses the package-level once
(sync.Once) which permanently caches initialization success/failure; either
remove this unused dead function entirely or refactor it so initialization
errors are not permanently cached: replace the sync.Once pattern in
NewClickhouseClient with a guarded lazy-init that stores the active driver.Conn
and any lastErr behind a mutex (e.g., check if conn != nil return it; if not,
lock and attempt to create a new connection, updating conn and lastErr so
subsequent calls can retry on failure), and remove/stop referencing the once
variable; locate symbols NewClickhouseClient and once to apply the change.
| Addr: []string{"zjc5ukn020.eu-west-2.aws.clickhouse.cloud:9440"}, // 9440 is a secure native TCP port | ||
| Protocol: clickhouse.Native, | ||
| TLS: &tls.Config{ | ||
| InsecureSkipVerify: true, | ||
| }, // enable secure TLS | ||
| Auth: clickhouse.Auth{ | ||
| Username: "default", | ||
| Password: "9FzzlTeRu~V5V", | ||
| }, | ||
| }) |
There was a problem hiding this comment.
Remove hard-coded ClickHouse endpoint and credentials immediately.
Line 24 and Lines 30-31 embed production connection details and password in source. This is a critical secret-leak and key-rotation blocker. Load these from environment/config and fail fast when missing.
Proposed change
import (
"context"
"crypto/tls"
+ "errors"
"fmt"
+ "os"
"sync"
@@
conn, initErr = clickhouse.Open(&clickhouse.Options{
- Addr: []string{"zjc5ukn020.eu-west-2.aws.clickhouse.cloud:9440"}, // 9440 is a secure native TCP port
+ Addr: []string{os.Getenv("CLICKHOUSE_ADDR")},
Protocol: clickhouse.Native,
@@
Auth: clickhouse.Auth{
- Username: "default",
- Password: "9FzzlTeRu~V5V",
+ Username: os.Getenv("CLICKHOUSE_USERNAME"),
+ Password: os.Getenv("CLICKHOUSE_PASSWORD"),
},
})
+ if os.Getenv("CLICKHOUSE_ADDR") == "" || os.Getenv("CLICKHOUSE_USERNAME") == "" || os.Getenv("CLICKHOUSE_PASSWORD") == "" {
+ initErr = errors.New("missing clickhouse configuration")
+ return
+ }🧰 Tools
🪛 ast-grep (0.42.1)
[warning] 25-27: MinVersionis missing from this TLS configuration. By default, TLS 1.2 is currently used as the minimum when acting as a client, and TLS 1.0 when acting as a server. General purpose web applications should default to TLS 1.3 with all other protocols disabled. Only where it is known that a web server must support legacy clients with unsupported an insecure browsers (such as Internet Explorer 10), it may be necessary to enable TLS 1.0 to provide support. AddMinVersion: tls.VersionTLS13' to the TLS configuration to bump the minimum version to TLS 1.3.
Context: tls.Config{
InsecureSkipVerify: true,
}
Note: [CWE-327]: Use of a Broken or Risky Cryptographic Algorithm [OWASP A03:2017]: Sensitive Data Exposure [OWASP A02:2021]: Cryptographic Failures [REFERENCES]
https://owasp.org/Top10/A02_2021-Cryptographic_Failures
(missing-ssl-minversion-go)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/clickhouse/clickhouse.go` around lines 24 - 33,
The code currently hard-codes the ClickHouse address and credentials inside the
clickhouse.Open call (the Addr slice and clickhouse.Auth with Username/Password)
and even sets TLS.InsecureSkipVerify; replace these literal values by reading
them from configuration or environment variables (e.g., CLICKHOUSE_ADDR,
CLICKHOUSE_USER, CLICKHOUSE_PASSWORD, CLICKHOUSE_TLS_INSECURE flag) and wire
them into the clickhouse.Open parameters; validate that required config values
exist at startup and return/fail fast with a clear error if any are missing, and
avoid defaulting to InsecureSkipVerify=true unless explicitly configured.
| TLS: &tls.Config{ | ||
| InsecureSkipVerify: true, | ||
| }, // enable secure TLS |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "clickhouse.go" | head -20Repository: sameerkrdev/nerve
Length of output: 181
🏁 Script executed:
cat -n ./apps/candle-service/internal/clickhouse/clickhouse.go | head -50Repository: sameerkrdev/nerve
Length of output: 1486
🏁 Script executed:
cat -n ./apps/trade-ingestor-service/internal/clickhouse/clickhouse.go | head -50Repository: sameerkrdev/nerve
Length of output: 1423
Disable InsecureSkipVerify for ClickHouse TLS connection.
The TLS configuration at line 27 sets InsecureSkipVerify: true, which disables certificate verification and permits MITM interception. Since this connects to a cloud-hosted ClickHouse service, certificate verification must be enforced. Additionally, set a minimum TLS version to TLS 1.3 for defense-in-depth.
Proposed change
TLS: &tls.Config{
- InsecureSkipVerify: true,
+ MinVersion: tls.VersionTLS13,
+ InsecureSkipVerify: false,
}, // enable secure TLS📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| TLS: &tls.Config{ | |
| InsecureSkipVerify: true, | |
| }, // enable secure TLS | |
| TLS: &tls.Config{ | |
| MinVersion: tls.VersionTLS13, | |
| InsecureSkipVerify: false, | |
| }, // enable secure TLS |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/candle-service/internal/clickhouse/clickhouse.go` around lines 26 - 28,
The TLS config currently sets InsecureSkipVerify: true which disables
certificate validation; update the TLS configuration (the TLS: &tls.Config{...}
block) to set InsecureSkipVerify: false (or remove it) and add MinVersion:
tls.VersionTLS13 to enforce TLS 1.3; ensure this change is applied where the
ClickHouse connection options are built (the TLS: &tls.Config literal in
clickhouse.go) and, if necessary, load/verify the system or provided RootCAs so
certificate validation succeeds.
|
|
||
| func (h *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | ||
| for msg := range claim.Messages() { | ||
| slog.Info("message consumed", "topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "value", string(msg.Value)) |
There was a problem hiding this comment.
Do not log full message payloads in info logs.
At Line 27, logging string(msg.Value) risks leaking sensitive trade content and increases log volume/cost.
Suggested fix
- slog.Info("message consumed", "topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "value", string(msg.Value))
+ slog.Info("message consumed", "topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "payload_bytes", len(msg.Value))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| slog.Info("message consumed", "topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "value", string(msg.Value)) | |
| slog.Info("message consumed", "topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "payload_bytes", len(msg.Value)) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/trade-ingestor-service/internal/kafka/consumerHandler.go` at line 27,
Logging the full message payload with slog.Info("message consumed", ...,
"value", string(msg.Value)) risks leaking sensitive trade data and bloating
logs; change the consumerHandler.go usage of slog.Info to omit or redact
msg.Value (e.g., log only metadata like msg.Topic, msg.Partition, msg.Offset and
either a boolean/hashed indicator or truncated/obfuscated payload length) so the
call to slog.Info no longer includes string(msg.Value) but instead a safe field
such as "payload_present": true, "payload_size": len(msg.Value) or a
hashed/trimmed token.
| // mark message as processed | ||
| session.MarkMessage(msg, "") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify ack-before-persist behavior in handler implementation.
rg -nP --type=go '\bMarkMessage\s*\(' apps/trade-ingestor-service/internal/kafka/consumerHandler.go -C3
rg -nP --type=go 'Insert|clickhouse' apps/trade-ingestor-service/internal/kafka/consumerHandler.go -C2Repository: sameerkrdev/nerve
Length of output: 292
🏁 Script executed:
cat -n apps/trade-ingestor-service/internal/kafka/consumerHandler.go | head -50Repository: sameerkrdev/nerve
Length of output: 1101
🏁 Script executed:
# Check if ConsumerHandler has any fields or dependencies
rg -nP --type=go 'type ConsumerHandler' apps/trade-ingestor-service/internal/kafka/ -A 10
# Search for any ClickHouse writes in the kafka package
rg -nP --type=go 'clickhouse|Insert|Write|client' apps/trade-ingestor-service/internal/kafka/
# Search in entire trade-ingestor-service for ClickHouse operations
rg -nP --type=go 'clickhouse|Insert' apps/trade-ingestor-service/ --max-count=20Repository: sameerkrdev/nerve
Length of output: 2776
🏁 Script executed:
# Check the Insert function signature and implementation
rg -nP --type=go 'func Insert' apps/trade-ingestor-service/internal/clickhouse/ -A 5
# Check InsertRawTrade function
rg -nP --type=go 'func InsertRawTrade' apps/trade-ingestor-service/internal/clickhouse/ -A 5
# Check the full main.go setup
head -40 apps/trade-ingestor-service/cmd/trade-ingestor-service/main.go
# Search for where ConsumerHandler is used
rg -nP --type=go 'ConsumerHandler' apps/trade-ingestor-service/Repository: sameerkrdev/nerve
Length of output: 2704
MarkMessage marks offset before message processing, causing permanent data loss on restart.
The ConsumeClaim handler immediately marks messages as consumed (line 30) without writing to ClickHouse. Messages are logged but never persisted. On restart or rebalance, these messages are permanently skipped, resulting in data loss. The handler requires actual trade insertion logic before MarkMessage is called. Main.go confirms this is unfinished work (TODOs on lines 15–16).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/trade-ingestor-service/internal/kafka/consumerHandler.go` around lines
29 - 30, ConsumeClaim currently calls session.MarkMessage(msg, "") before
persisting; move the call so offsets are marked only after a successful write to
ClickHouse. Update the ConsumeClaim handler to: 1) deserialize the message and
call the trade persistence routine (e.g., your ClickHouse insert function), 2)
handle and retry transient failures (or return an error) and only call
session.MarkMessage(msg, "") after the insert succeeds, and 3) log and avoid
marking on permanent failures so the message can be retried; ensure the insert
routine is idempotent or deduplicates to prevent double-processing on retries.
| go func() { | ||
| for err := range KafkaConsumerClient.Errors() { | ||
| slog.Error("kafka consume error", "error", err) | ||
| } | ||
| }() |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In IBM Sarama consumer groups, do we need config.Consumer.Return.Errors = trueforConsumerGroup.Errors() to emit errors consistently?
💡 Result:
Yes. In Sarama IBM/Shopify consumer groups, you set config.Consumer.Return.Errors = true if you want ConsumerGroup.Errors to emit errors for your code to read; otherwise errors are logged internally and not returned over that channel consistently. Evidence from Sarama’s consumer_group.go: the Errors channel is only used when Consumer.Return.Errors is enabled; handleError checks this flag and either logs and returns (flag false) or sends the error to c.errors (flag true).
Citations:
🏁 Script executed:
cat -n apps/trade-ingestor-service/internal/kafka/kafka.go | head -70Repository: sameerkrdev/nerve
Length of output: 1959
🏁 Script executed:
rg "Consumer.Return.Errors" apps/trade-ingestor-service/Repository: sameerkrdev/nerve
Length of output: 43
🏁 Script executed:
rg "NewConsumerGroup\|NewConfig" apps/trade-ingestor-service/ -A 5 -B 2Repository: sameerkrdev/nerve
Length of output: 43
Add config.Consumer.Return.Errors = true to enable error emissions from Sarama ConsumerGroup.
The code at lines 48-52 ranges over KafkaConsumerClient.Errors(), but the Sarama consumer configuration is missing Consumer.Return.Errors = true. Without this flag, the Errors channel will not emit errors consistently, causing the error handler goroutine to sit idle and errors to be silently logged internally instead of reaching this handler.
Suggested fix
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Second
config.Consumer.MaxWaitTime = 500 * time.Millisecond
+ config.Consumer.Return.Errors = true🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/trade-ingestor-service/internal/kafka/kafka.go` around lines 48 - 52,
The consumer Error handler goroutine reads from KafkaConsumerClient.Errors() but
the Sarama consumer config never enables error emission; set
config.Consumer.Return.Errors = true on the Sarama Config instance used to
create the consumer (the same config passed when building the
ConsumerGroup/KafkaConsumerClient) so the Errors() channel will actually emit
errors to be handled by the goroutine; update the config initialization where
the consumer is created to set this flag before creating KafkaConsumerClient.
| for { | ||
| if err := KafkaConsumerClient.Consume(ctx, topics, handler); err != nil { | ||
| slog.Error("consumer error:", "error", err) | ||
| } |
There was a problem hiding this comment.
Add retry backoff after consume errors.
At Line 55-57, persistent failures re-enter immediately, causing hot-loop retries and noisy logs.
Suggested fix
for {
if err := KafkaConsumerClient.Consume(ctx, topics, handler); err != nil {
slog.Error("consumer error:", "error", err)
+ select {
+ case <-ctx.Done():
+ slog.Info("kafka consumer stopped")
+ return
+ case <-time.After(1 * time.Second):
+ }
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/trade-ingestor-service/internal/kafka/kafka.go` around lines 54 - 57,
The consumer loop currently immediately retries on Consume errors causing
hot-looping; modify the loop around KafkaConsumerClient.Consume(ctx, topics,
handler) to implement an exponential backoff with a cap (e.g., start 500ms,
double up to ~30s) and jitter, reset the backoff when Consume returns nil, and
respect ctx cancellation so Sleep can be aborted; update the slog.Error to
include the retry delay; reference KafkaConsumerClient.Consume, ctx, topics, and
handler when applying the backoff logic.
| build: | ||
| @echo "➡ Building $(APP_NAME)..." | ||
| go build -o $(BUILD_DIR)/$(APP_NAME) $(CMD_PATH) |
There was a problem hiding this comment.
Create $(BUILD_DIR) before go build.
At Line 17, build fails on clean environments when bin/ is absent.
Suggested fix
build:
`@echo` "➡ Building $(APP_NAME)..."
+ `@mkdir` -p $(BUILD_DIR)
go build -o $(BUILD_DIR)/$(APP_NAME) $(CMD_PATH)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/trade-ingestor-service/makefile` around lines 15 - 17, The Makefile's
build target fails on clean environments because $(BUILD_DIR) may not exist;
modify the build target (target name "build") to ensure the directory
$(BUILD_DIR) is created before running go build (e.g., invoke mkdir -p
$(BUILD_DIR) or equivalent) so that the subsequent go build -o
$(BUILD_DIR)/$(APP_NAME) $(CMD_PATH) succeeds; keep the existing variables
APP_NAME and CMD_PATH and only add the directory creation step immediately
before the go build invocation.
Summary by CodeRabbit
New Features
API Updates
New APIs / Schemas
Documentation