diff --git a/binding.gyp b/binding.gyp index 3b650daf..c35d8e66 100644 --- a/binding.gyp +++ b/binding.gyp @@ -21,7 +21,8 @@ "bindings/binding.cc", "bindings/map-get.cc", "bindings/allocation-profile.cc", - "bindings/allocation-profile-node.cc" + "bindings/allocation-profile-node.cc", + "bindings/otel-thread-ctx.cc" ], "include_dirs": [ "bindings", @@ -46,7 +47,8 @@ "bindings/translate-time-profile.cc", "bindings/test/binding.cc", "bindings/allocation-profile.cc", - "bindings/allocation-profile-node.cc" + "bindings/allocation-profile-node.cc", + "bindings/otel-thread-ctx.cc" ], "include_dirs": [ "bindings", @@ -81,6 +83,15 @@ ["-Wno-deprecated-declarations"], "cflags_cc!": ["-std=gnu++14", "-std=gnu++1y", "-std=gnu++20" ], "cflags_cc": ["-std=gnu++2a"], + "conditions": [ + # -mtls-dialect=gnu2 forces TLSDESC on x86_64 so the + # otel_thread_ctx_nodejs_v1 symbol is reachable per the + # OTEP-4947 spec. On arm64 TLSDESC is the only dynamic + # model, so no flag is needed there. + ['target_arch == "x64"', { + "cflags": ["-mtls-dialect=gnu2"], + }], + ], } ], ["OS == 'mac'", diff --git a/bindings/binding.cc b/bindings/binding.cc index acbf99a0..68741dd8 100644 --- a/bindings/binding.cc +++ b/bindings/binding.cc @@ -19,6 +19,7 @@ #include #include "allocation-profile-node.hh" +#include "otel-thread-ctx.hh" #include "profilers/heap.hh" #include "profilers/wall.hh" #include "translate-time-profile.hh" @@ -53,5 +54,6 @@ NODE_MODULE_INIT(/* exports, module, context */) { dd::TimeProfileNodeView::Init(exports); dd::HeapProfiler::Init(exports); dd::WallProfiler::Init(exports); + dd::OtelThreadCtx::Init(exports); Nan::SetMethod(exports, "getNativeThreadId", GetNativeThreadId); } diff --git a/bindings/otel-thread-ctx.cc b/bindings/otel-thread-ctx.cc new file mode 100644 index 00000000..a5ab6844 --- /dev/null +++ b/bindings/otel-thread-ctx.cc @@ -0,0 +1,547 @@ +/* + * Copyright 2026 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Node.js writer for the OTEP-4947 Thread Local Context Record, adapted for +// the Node.js asynchronous context model. The record is wrapped in a JS +// object (CtxWrap) and stored in an AsyncLocalStorage instance; an +// out-of-process reader discovers it by walking the V8 isolate's +// ContinuationPreservedEmbedderData to the AsyncContextFrame (a JS Map), +// looking up the ALS instance as the key, reading the resulting CtxWrap, +// and finally the record it owns. + +#include "otel-thread-ctx.hh" + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +// Single thread-local read from outside the process via TLSDESC. It +// identifies, for the current V8 isolate's thread: +// +// - the address of the isolate's ContinuationPreservedEmbedderData slot +// (`cped_slot`), whose value V8 swaps as it switches between +// continuations. Reading `*cped_slot` yields the active +// AsyncContextFrame; no V8 internal symbol lookup is required on the +// reader side. +// - the AsyncLocalStorage instance the reader must look up inside that +// AsyncContextFrame map (`als_handle`), +// - that instance's JS identity hash (`als_identity_hash`), so the +// reader can restrict the lookup to a single hash bucket. +// - the (per-isolate) tagged address of the `undefined` singleton +// (`undefined_addr`). After looking up the value for our ALS key in +// the ACF map, the reader can compare against this to skip the +// JSObject / internal-field-0 dereference when no CtxWrap is +// currently attached. +// +// Layout is part of the reader ABI: see static_asserts below. +extern "C" { +using v8::Global; +using v8::Object; + +struct otel_thread_ctx_nodejs_v1_t { + v8::internal::Address *cped_slot; // offset 0 + Global als_handle; // offset sizeof(void*); 1 V8 ptr + int als_identity_hash; // offset 2 * sizeof(void*); 4 + 4 pad + v8::internal::Address undefined_addr; // offset 3 * sizeof(void*); tagged +}; + +__attribute__((visibility("default"))) +thread_local otel_thread_ctx_nodejs_v1_t otel_thread_ctx_nodejs_v1; +} + +static_assert(sizeof(v8::Global) == sizeof(void *), + "Global must be exactly one pointer wide"); +static_assert(offsetof(otel_thread_ctx_nodejs_v1_t, cped_slot) == 0, + "cped_slot must be at offset 0"); +static_assert(offsetof(otel_thread_ctx_nodejs_v1_t, als_handle) == + sizeof(void *), + "als_handle must immediately follow cped_slot"); +static_assert(offsetof(otel_thread_ctx_nodejs_v1_t, als_identity_hash) == + 2 * sizeof(void *), + "als_identity_hash must immediately follow als_handle"); +static_assert(offsetof(otel_thread_ctx_nodejs_v1_t, undefined_addr) == + 3 * sizeof(void *), + "undefined_addr must follow als_identity_hash + padding"); + +namespace dd { +namespace { + +using node::ObjectWrap; +using v8::Array; +using v8::Context; +using v8::Function; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::Global; +using v8::Integer; +using v8::Isolate; +using v8::Local; +using v8::Object; +using v8::String; +using v8::Uint8Array; +using v8::Value; + +// OTEP-4947 record. The trailing `attrs_data` is a C99 flexible array +// member: the writer allocates one contiguous block of size +// `sizeof(OtelThreadCtxRecord) + attrs_data_size`, and the FAM gives the +// reader of this struct definition the right intuition — "there's +// variable-length data after the header" — while sizeof / offsetof still +// see only the 28-byte header. Field offsets are statically verified. +struct OtelThreadCtxRecord { + uint8_t trace_id[16]; // offset 0 + uint8_t span_id[8]; // offset 16 + uint8_t valid; // offset 24 + uint8_t reserved; // offset 25 + uint16_t attrs_data_size; // offset 26 + uint8_t attrs_data[]; // offset 28; length is attrs_data_size +}; +static_assert(sizeof(OtelThreadCtxRecord) == 28, + "OTEP thread-ctx header must be exactly 28 bytes"); +static_assert(offsetof(OtelThreadCtxRecord, trace_id) == 0, "trace_id offset"); +static_assert(offsetof(OtelThreadCtxRecord, span_id) == 16, "span_id offset"); +static_assert(offsetof(OtelThreadCtxRecord, valid) == 24, "valid offset"); +static_assert(offsetof(OtelThreadCtxRecord, reserved) == 25, "reserved offset"); +static_assert(offsetof(OtelThreadCtxRecord, attrs_data_size) == 26, + "attrs_data_size offset"); +static_assert(offsetof(OtelThreadCtxRecord, attrs_data) == 28, + "attrs_data offset"); + +struct OtelThreadCtxRecordDeleter { + void operator()(OtelThreadCtxRecord *p) const noexcept { free(p); } +}; +using OwnedRecord = + std::unique_ptr; + +// Floor on the attrs_data capacity of a freshly allocated record. Sized so +// the total allocation is one 64-byte cache line — matching the OTEP-4947 +// "frugal writer" guidance — and giving small records some slack so the +// first few appends (if any) can be in-place. +constexpr size_t MIN_INITIAL_CAPACITY = 64 - sizeof(OtelThreadCtxRecord); + +// Upper bound on the attribute payload. Sized so the total record stays +// under the OTEP-4947 recommended 640 bytes, which is the read-buffer +// ceiling for typical eBPF readers. Attributes that would push past this +// are silently dropped (with `truncated_` set on the wrapper) rather than +// the writer throwing — the OTEP treats the cap as best-effort. +constexpr size_t MAX_ATTRS_DATA_SIZE = 640 - sizeof(OtelThreadCtxRecord); + +// Wraps a heap-allocated OtelThreadCtxRecord. Lifetime is managed by V8 +// GC: when no JS code (or AsyncLocalStorage entry) holds a reference, the +// record is freed. +// +// Layout note for the reader: `record_` is private to C++ but its byte +// position within CtxWrap is part of the reader contract. It is the first +// field after the node::ObjectWrap base subobject. `capacity_` and +// `truncated_` sit after `record_` purely for the writer's own +// bookkeeping — the reader never touches them. +class CtxWrap : public ObjectWrap { + public: + ~CtxWrap() override; + static void Init(Local exports); + + CtxWrap(const CtxWrap &) = delete; + CtxWrap &operator=(const CtxWrap &) = delete; + CtxWrap(CtxWrap &&) = delete; + CtxWrap &operator=(CtxWrap &&) noexcept = delete; + + private: + static void New(const FunctionCallbackInfo &args); + static void DebugBytes(const FunctionCallbackInfo &args); + static void Append(const FunctionCallbackInfo &args); + static void IsTruncated(const FunctionCallbackInfo &args); + + static bool EncodeAttrs(Isolate *isolate, Local context, + Local attrs_val, size_t existing_size, + std::vector *out, bool *out_truncated); + + CtxWrap(OtelThreadCtxRecord *record, size_t capacity, bool truncated); + + // The three fields are kept in one access section because C++ leaves + // the relative layout of fields in different access controls + // implementation-defined. `record_` must come first — its offset + // within CtxWrap is part of the reader contract (see the + // static_assert below) — and is therefore `public`. The bookkeeping + // fields after it would normally be private, but the access change + // would let a conforming compiler reorder them in front of `record_`; + // exposing them publicly keeps everything in one ordering-stable + // block. Readers never touch them. + public: + OtelThreadCtxRecord *record_; + // attrs_data capacity in bytes of the record_ allocation. The total + // allocation is `sizeof(OtelThreadCtxRecord) + capacity_`. Always + // `record_->attrs_data_size <= capacity_ <= MAX_ATTRS_DATA_SIZE`. + size_t capacity_; + // Set to true (once, never cleared) if at any point in this record's + // lifetime — during New() or any subsequent Append() — at least one + // attribute had to be dropped because it would have pushed attrs_data + // past MAX_ATTRS_DATA_SIZE. + bool truncated_; +}; + +// Pin the offset of `record_` — the field the reader walks to from the +// JSObject's internal field 0. We document it as "the first field after +// the node::ObjectWrap base subobject", so equality with +// sizeof(node::ObjectWrap) is the invariant. `offsetof` on a non- +// standard-layout type (CtxWrap has private fields and inherits from +// ObjectWrap) is conditionally supported per the standard but accepted +// by every compiler this addon targets; suppress -Winvalid-offsetof so +// the static_assert compiles cleanly under strict warning flags. +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Winvalid-offsetof" +static_assert(offsetof(CtxWrap, record_) == sizeof(node::ObjectWrap), + "record_ must be the first field after the ObjectWrap base " + "subobject"); +#pragma GCC diagnostic pop + +CtxWrap::~CtxWrap() { free(record_); } + +CtxWrap::CtxWrap(OtelThreadCtxRecord *record, size_t capacity, bool truncated) + : record_(record), capacity_(capacity), truncated_(truncated) {} + +// Copy exactly `expected_bytes` bytes out of a JS Uint8Array (or subclass +// such as Buffer) into `out`. Returns false if the value isn't a +// Uint8Array or its length doesn't match. +bool CopyBytes(Local value, size_t expected_bytes, uint8_t *out) { + if (!value->IsUint8Array()) return false; + Local arr = value.As(); + if (arr->ByteLength() != expected_bytes) return false; + uint8_t *base = + static_cast(arr->Buffer()->Data()) + arr->ByteOffset(); + memcpy(out, base, expected_bytes); + return true; +} + +bool CtxWrap::EncodeAttrs(Isolate *isolate, Local context, + Local attrs_val, size_t existing_size, + std::vector *out, bool *out_truncated) { + if (attrs_val->IsUndefined() || attrs_val->IsNull()) return true; + if (!attrs_val->IsArray()) { + isolate->ThrowError( + "attributes must be an array indexed by key, or undefined"); + return false; + } + Local attrs = attrs_val.As(); + uint32_t n = attrs->Length(); + if (n > 256) { + isolate->ThrowError("attributes array length must not exceed 256"); + return false; + } + out->reserve(out->size() + n * 4); + for (uint32_t i = 0; i < n; ++i) { + Local val_val; + if (!attrs->Get(context, i).ToLocal(&val_val)) return false; + if (val_val->IsUndefined() || val_val->IsNull()) continue; + + Local v; + if (!val_val->ToString(context).ToLocal(&v)) { + isolate->ThrowError("failed to coerce attribute value to string"); + return false; + } + int v_utf8_len = v->Utf8Length(isolate); + int v_budget = v_utf8_len > 255 ? 255 : v_utf8_len; + + const size_t needed = 2u + static_cast(v_budget); + if (existing_size + out->size() + needed > MAX_ATTRS_DATA_SIZE) { + *out_truncated = true; + continue; + } + + const size_t entry_off = out->size(); + out->resize(entry_off + needed); + (*out)[entry_off] = static_cast(i); + int v_written = v->WriteUtf8( + isolate, reinterpret_cast(&(*out)[entry_off + 2]), v_budget, + nullptr, String::NO_NULL_TERMINATION); + (*out)[entry_off + 1] = static_cast(v_written); + if (v_written < v_budget) { + out->resize(entry_off + 2u + static_cast(v_written)); + } + } + return true; +} + +void CtxWrap::New(const FunctionCallbackInfo &args) { + Isolate *isolate = args.GetIsolate(); + Local context = isolate->GetCurrentContext(); + + if (!args.IsConstructCall()) [[unlikely]] { + isolate->ThrowError("OtelThreadCtxWrap must be called with `new`"); + return; + } + if (args.Length() != 3) { + isolate->ThrowError( + "OtelThreadCtxWrap expects 3 arguments: traceId, spanId, attributes"); + return; + } + + uint8_t trace_id[16]; + uint8_t span_id[8]; + if (!CopyBytes(args[0], 16, trace_id)) { + isolate->ThrowError("traceId must be a 16-byte Uint8Array"); + return; + } + if (!CopyBytes(args[1], 8, span_id)) { + isolate->ThrowError("spanId must be an 8-byte Uint8Array"); + return; + } + + std::vector attrs_buf; + bool truncated = false; + if (!EncodeAttrs(isolate, context, args[2], 0, &attrs_buf, &truncated)) { + return; + } + + size_t capacity = std::max(attrs_buf.size(), MIN_INITIAL_CAPACITY); + const size_t total = sizeof(OtelThreadCtxRecord) + capacity; + OwnedRecord record(static_cast(calloc(1, total))); + if (!record) { + isolate->ThrowError("allocation failed"); + return; + } + memcpy(record->trace_id, trace_id, sizeof(trace_id)); + memcpy(record->span_id, span_id, sizeof(span_id)); + record->attrs_data_size = static_cast(attrs_buf.size()); + if (!attrs_buf.empty()) { + memcpy(record->attrs_data, attrs_buf.data(), attrs_buf.size()); + } + + // OTEP-4947 publication protocol: order the `valid = 1` store after every + // other field write, with an atomic_signal_fence + volatile store. + std::atomic_signal_fence(std::memory_order_release); + *reinterpret_cast(&record->valid) = 1; + + CtxWrap *self = new CtxWrap(record.release(), capacity, truncated); + self->Wrap(args.This()); + args.GetReturnValue().Set(args.This()); +} + +// Append entries to the active record. Either modifies the record in +// place (if the appended bytes fit in the current allocation's slack) or +// reallocates to a larger one (geometrically), keeping the invariant +// `record_->attrs_data_size <= capacity_`. +void CtxWrap::Append(const FunctionCallbackInfo &args) { + Isolate *isolate = args.GetIsolate(); + Local context = isolate->GetCurrentContext(); + + CtxWrap *self = ObjectWrap::Unwrap(args.This()); + if (!self) { + isolate->ThrowError("not an OtelThreadCtxWrap"); + return; + } + if (args.Length() != 1) { + isolate->ThrowError("append expects 1 argument: attributes"); + return; + } + + const size_t current_used = self->record_->attrs_data_size; + std::vector appended; + bool truncated = false; + if (!EncodeAttrs(isolate, context, args[0], current_used, &appended, + &truncated)) { + return; + } + if (truncated) self->truncated_ = true; + + if (appended.empty()) return; + + const size_t new_used = current_used + appended.size(); + + if (new_used <= self->capacity_) { + // In-place: write the new entries past the current attrs_data_size, + // then bump attrs_data_size with a release fence + volatile store. + // attrs_data_size is the publication boundary — bytes past it are + // not observable by the reader, so a reader firing mid-append sees + // either the old or new size, never a torn state. + memcpy(&self->record_->attrs_data[current_used], appended.data(), + appended.size()); + std::atomic_signal_fence(std::memory_order_release); + *reinterpret_cast(&self->record_->attrs_data_size) = + static_cast(new_used); + return; + } + + // Doesn't fit. Reallocate with geometric growth, capped. + size_t new_cap = std::min(std::max(self->capacity_ * 2, new_used), MAX_ATTRS_DATA_SIZE); + + const size_t total = sizeof(OtelThreadCtxRecord) + new_cap; + OwnedRecord new_rec(static_cast(calloc(1, total))); + if (!new_rec) { + isolate->ThrowError("allocation failed"); + return; + } + memcpy(new_rec.get(), self->record_, + sizeof(OtelThreadCtxRecord) + current_used); + memcpy(&new_rec->attrs_data[current_used], appended.data(), appended.size()); + new_rec->attrs_data_size = static_cast(new_used); + // The copy should've preserved valid=1 from the source record. + assert(new_rec->valid == 1); + + // Publish: the pointer swap is the atomic boundary the reader sees. The + // first fence keeps the new_rec content writes ordered before the pointer + // store from the compiler's perspective. The second fence prevents free() + // from being hoisted above the pointer swap — without it, a reader stopped + // between a reordered free() and the not-yet-completed swap would follow + // self->record_ into freed memory. OTEP signal-handler semantics (the + // writer is stopped during reads) take care of CPU-side ordering and make + // immediate freeing of the old record safe. + std::atomic_signal_fence(std::memory_order_release); + OtelThreadCtxRecord *old_rec = self->record_; + self->record_ = new_rec.release(); + self->capacity_ = new_cap; + std::atomic_signal_fence(std::memory_order_acq_rel); + free(old_rec); +} + +void CtxWrap::IsTruncated(const FunctionCallbackInfo &args) { + CtxWrap *self = ObjectWrap::Unwrap(args.This()); + if (!self) { + args.GetIsolate()->ThrowError("not an OtelThreadCtxWrap"); + return; + } + args.GetReturnValue().Set(self->truncated_); +} + +void CtxWrap::DebugBytes(const FunctionCallbackInfo &args) { + Isolate *isolate = args.GetIsolate(); + CtxWrap *self = ObjectWrap::Unwrap(args.This()); + if (!self) { + isolate->ThrowError("not an OtelThreadCtxWrap"); + return; + } + const size_t total = + sizeof(OtelThreadCtxRecord) + self->record_->attrs_data_size; + Local buf = v8::ArrayBuffer::New(isolate, total); + memcpy(buf->Data(), self->record_, total); + args.GetReturnValue().Set(Uint8Array::New(buf, 0, total)); +} + +void CtxWrap::Init(Local exports) { +#if NODE_MAJOR_VERSION >= 26 + Isolate *isolate = Isolate::GetCurrent(); +#else + Isolate *isolate = exports->GetIsolate(); +#endif + Local context = isolate->GetCurrentContext(); + + Local tpl = FunctionTemplate::New(isolate, New); + tpl->SetClassName(String::NewFromUtf8Literal(isolate, "OtelThreadCtxWrap")); + tpl->InstanceTemplate()->SetInternalFieldCount(1); + + tpl->PrototypeTemplate()->Set( + String::NewFromUtf8Literal(isolate, "debugBytes"), + FunctionTemplate::New(isolate, DebugBytes)); + tpl->PrototypeTemplate()->Set( + String::NewFromUtf8Literal(isolate, "append"), + FunctionTemplate::New(isolate, Append)); + tpl->PrototypeTemplate()->Set( + String::NewFromUtf8Literal(isolate, "isTruncated"), + FunctionTemplate::New(isolate, IsTruncated)); + + Local constructor = tpl->GetFunction(context).ToLocalChecked(); + exports + ->Set(context, String::NewFromUtf8Literal(isolate, "otelThreadCtxWrap"), + constructor) + .FromJust(); +} + +// Reset the Global and the cped_slot pointer before the isolate +// is torn down. The Global lives in thread-local storage and its +// destructor only runs at thread exit, which on the main thread happens +// after the isolate is already gone — causing a segfault. Registering +// this as a per-isolate cleanup hook the first time StoreAls is called +// keeps the handle safely scoped to the isolate. +void ResetDiscoveryStruct(void * /*arg*/) { + otel_thread_ctx_nodejs_v1.cped_slot = nullptr; + otel_thread_ctx_nodejs_v1.als_handle.Reset(); + otel_thread_ctx_nodejs_v1.als_identity_hash = 0; + otel_thread_ctx_nodejs_v1.undefined_addr = 0; +} + +void StoreAls(const FunctionCallbackInfo &args) { + static thread_local bool cleanup_registered = false; + + Isolate *isolate = args.GetIsolate(); + if (!args[0]->IsObject()) { + isolate->ThrowError("First argument must be the AsyncLocalStorage object."); + return; + } + Local obj = args[0].As(); + otel_thread_ctx_nodejs_v1.als_identity_hash = obj->GetIdentityHash(); + otel_thread_ctx_nodejs_v1.als_handle = Global(isolate, obj); + otel_thread_ctx_nodejs_v1.cped_slot = reinterpret_cast( + reinterpret_cast(isolate) + + v8::internal::Internals::kContinuationPreservedEmbedderDataOffset); + // Cache the per-isolate undefined singleton's tagged address. Undefined + // is a read-only-roots heap object, never moves, so a cached numeric + // address is fine — no Global<> tracking needed. + otel_thread_ctx_nodejs_v1.undefined_addr = + reinterpret_cast(*v8::Undefined(isolate)); + if (!cleanup_registered) { + node::AddEnvironmentCleanupHook(isolate, ResetDiscoveryStruct, nullptr); + cleanup_registered = true; + } +} + +// Without a function that explicitly reads the TLS variable, on x86 the +// linker may strip the symbol from the dynamic symbol table even though +// `nm` still reports it, breaking out-of-process discovery. +void GetStoredAlsHash(const FunctionCallbackInfo &args) { + Isolate *isolate = args.GetIsolate(); + args.GetReturnValue().Set( + Integer::New(isolate, otel_thread_ctx_nodejs_v1.als_identity_hash)); +} + +// V8 layout constants captured at addon-compile time from the same V8 +// headers Node bundles. Published via the discovery contract so an +// out-of-process reader can decode our wrapper / V8's internal hashmap +// layout without doing its own V8-internal-symbol lookups for the +// pointer-compression / sandbox state. +constexpr int WRAPPED_OBJECT_OFFSET = + v8::internal::Internals::kJSObjectHeaderSize + + v8::internal::Internals::kEmbedderDataSlotExternalPointerOffset; +constexpr int TAGGED_SIZE = v8::internal::kApiTaggedSize; + +} // namespace + +void OtelThreadCtx::Init(Local exports) { + CtxWrap::Init(exports); + NODE_SET_METHOD(exports, "otelThreadCtxStoreAls", StoreAls); + NODE_SET_METHOD(exports, "otelThreadCtxGetStoredAlsHash", GetStoredAlsHash); + + Isolate *isolate = exports->GetIsolate(); + Local ctx = isolate->GetCurrentContext(); + exports + ->Set(ctx, + String::NewFromUtf8Literal(isolate, "otelThreadCtxWrappedObjectOffset"), + Integer::New(isolate, WRAPPED_OBJECT_OFFSET)) + .FromJust(); + exports + ->Set(ctx, + String::NewFromUtf8Literal(isolate, "otelThreadCtxTaggedSize"), + Integer::New(isolate, TAGGED_SIZE)) + .FromJust(); +} + +} // namespace dd diff --git a/bindings/otel-thread-ctx.hh b/bindings/otel-thread-ctx.hh new file mode 100644 index 00000000..c8e7470b --- /dev/null +++ b/bindings/otel-thread-ctx.hh @@ -0,0 +1,26 @@ +/* + * Copyright 2026 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace dd { +class OtelThreadCtx { + public: + static void Init(v8::Local exports); +}; +} // namespace dd diff --git a/package.json b/package.json index 166d33ea..9bd678cc 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,8 @@ "test:js-tsan": "LD_PRELOAD=`gcc -print-file-name=libtsan.so` mocha out/test/test-*.js", "test:js-valgrind": "valgrind --leak-check=full mocha out/test/test-*.js", "test:js": "nyc mocha -r source-map-support/register out/test/test-*.js", - "test": "npm run test:js" + "test": "npm run test:js", + "test:docker": "./scripts/docker/run-in-docker.sh" }, "author": { "name": "Google Inc." diff --git a/scripts/docker/Dockerfile b/scripts/docker/Dockerfile new file mode 100644 index 00000000..b130849c --- /dev/null +++ b/scripts/docker/Dockerfile @@ -0,0 +1,14 @@ +# Image for running this project's test suite on Linux from a non-Linux dev +# machine. The native addon is built per-architecture inside the container; +# node-gyp needs python3 and a C++ toolchain. Node 24 is used so all of the +# OTEP-4947 thread-context tests run without needing to pass +# --experimental-async-context-frame. +FROM node:24-bookworm + +RUN apt-get update -qq \ + && apt-get install -y -qq --no-install-recommends \ + python3 \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /tmp/work diff --git a/scripts/docker/run-in-docker.sh b/scripts/docker/run-in-docker.sh new file mode 100755 index 00000000..099549da --- /dev/null +++ b/scripts/docker/run-in-docker.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# Build the test image (idempotent; cached after the first run) and run the +# project's test suite against the working tree inside it. The tree is mounted +# read-only and copied to a writable scratch dir inside the container, so the +# host repo is never modified (no stray node_modules/, build/, out/). + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" +IMAGE_TAG="pprof-nodejs-test:latest" + +if ! command -v docker >/dev/null 2>&1; then + echo "docker not found in PATH; install Docker Desktop / colima / podman-with-docker-alias" >&2 + exit 1 +fi + +if ! docker info >/dev/null 2>&1; then + echo "docker daemon not reachable; is it running?" >&2 + exit 1 +fi + +echo "==> building $IMAGE_TAG (cached after first run)" +docker build -q -t "$IMAGE_TAG" "$SCRIPT_DIR" >/dev/null + +echo "==> running tests" +exec docker run --rm \ + -v "$REPO_DIR":/work:ro \ + "$IMAGE_TAG" \ + bash -c ' + set -euo pipefail + cp -R /work/. /tmp/work/ + # Drop any host-built artifacts so we get a clean build inside. + rm -rf /tmp/work/node_modules /tmp/work/build /tmp/work/out + npm install --no-audit --no-fund + npm test + ' diff --git a/ts/src/otel-thread-ctx.ts b/ts/src/otel-thread-ctx.ts new file mode 100644 index 00000000..c6b516d8 --- /dev/null +++ b/ts/src/otel-thread-ctx.ts @@ -0,0 +1,347 @@ +/* + * Copyright 2026 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Node.js writer for the OpenTelemetry Thread Local Context Record +// (OTEP-4947), discoverable from an out-of-process reader via the +// `otel_thread_ctx_nodejs_v1` thread-local symbol exported by +// `dd_pprof.node`. +// +// Linux only; on other platforms the exported functions degrade to no-ops. + +import {join} from 'path'; +import {AsyncLocalStorage} from 'node:async_hooks'; + +/** + * Inputs to {@link runWithContext} and {@link enterWithContext}. + * + * `traceId` and `spanId` are passed as raw bytes (a `Uint8Array` of length + * 16 and 8 respectively; `Buffer` is acceptable as a subclass). + * + * `attributes`, if present, is positional: index N in the array is the value + * for uint8 key index N on the wire. Slots that are `null`, `undefined`, or + * absent (array holes) are skipped. Non-string values are coerced via + * `toString`. Values longer than 255 UTF-8 bytes are silently truncated and + * attributes that would overflow the 612-byte payload budget are silently + * dropped — see {@link isContextTruncated} for how to detect that. Array + * length must not exceed 256. + */ +export interface ContextOptions { + traceId: Uint8Array; + spanId: Uint8Array; + attributes?: Array; +} + +/** + * Inputs to the methods returned by {@link makeNamedContext}. Same as + * {@link ContextOptions} but attributes are addressed by name; names are + * resolved to uint8 key indexes using the array passed to + * {@link makeNamedContext}. + */ +export interface NamedContextOptions { + traceId: Uint8Array; + spanId: Uint8Array; + namedAttributes?: + | Record + | Map + | Array<[string, unknown]>; +} + +/** + * OTEP-4719 process-context attributes corresponding to a particular + * {@link NamedContext}. Spread this into whatever attribute map the + * application hands to its OTEP-4719 process-context publisher. + */ +export interface ProcessContextAttributes { + readonly 'threadlocal.schema_version': 'nodejs_v1'; + readonly 'threadlocal.attribute_key_map': readonly string[]; + readonly 'threadlocal.nodejs_v1.wrapped_object_offset': number; + readonly 'threadlocal.nodejs_v1.tagged_size': number; +} + +/** + * Object returned by {@link makeNamedContext}. + */ +export interface NamedContext { + runWithContext(fn: () => T, opts: NamedContextOptions): T; + enterWithContext(opts: NamedContextOptions): void; + clearContext(): void; + appendAttributes( + namedAttributes: + | Record + | Map + | Array<[string, unknown]>, + ): void; + isContextTruncated(): boolean; + readonly processContextAttributes: ProcessContextAttributes; +} + +interface CtxWrap { + debugBytes(): Uint8Array; + append(attributes: Array | undefined): void; + isTruncated(): boolean; +} + +interface Addon { + otelThreadCtxWrap: new ( + traceId: Uint8Array, + spanId: Uint8Array, + attributes: Array | undefined, + ) => CtxWrap; + otelThreadCtxStoreAls(als: AsyncLocalStorage): void; + otelThreadCtxGetStoredAlsHash(): number; + otelThreadCtxWrappedObjectOffset: number; + otelThreadCtxTaggedSize: number; +} + +const SCHEMA_VERSION = 'nodejs_v1'; + +// V8 layout constants the addon captured from the V8 headers Node bundles. +// On non-Linux these fall back to values matching Node's standard build +// (no V8 pointer compression, no sandbox); the reader is Linux-only per +// the OTEP anyway, so the fallbacks just keep processContextAttributes +// consistent in shape. +let WRAPPED_OBJECT_OFFSET = 24; +let TAGGED_SIZE = 8; + +export let runWithContext: (fn: () => T, opts: ContextOptions) => T; +export let enterWithContext: (opts: ContextOptions) => void; +/** + * Detach any thread-context record from the current asynchronous scope. + * Subsequent reads in the same scope (until a new + * {@link runWithContext}/{@link enterWithContext} attaches one) see no + * active context. Idempotent. On non-Linux platforms this is a no-op. + */ +export let clearContext: () => void; +export let appendAttributes: ( + attributes: Array, +) => void; +export let isContextTruncated: () => boolean; + +// Debug accessor (not part of the stable API; for tests / reader dev). +export let _currentRecordBytes: () => Uint8Array | undefined = () => undefined; + +if (process.platform === 'linux') { + // eslint-disable-next-line @typescript-eslint/no-require-imports + const findBinding = require('node-gyp-build'); + const addon: Addon = findBinding(join(__dirname, '..', '..')); + WRAPPED_OBJECT_OFFSET = addon.otelThreadCtxWrappedObjectOffset; + TAGGED_SIZE = addon.otelThreadCtxTaggedSize; + + let als: AsyncLocalStorage | undefined; + + function asyncContextFrameError(): string | undefined { + const [major] = process.versions.node.split('.').map(Number); + if (process.execArgv.includes('--no-async-context-frame')) { + return 'Node explicitly launched with --no-async-context-frame'; + } + if (major >= 24) return undefined; + if (process.execArgv.includes('--experimental-async-context-frame')) { + return undefined; + } + if (major >= 22) { + return 'Node versions prior to v24 must be launched with --experimental-async-context-frame'; + } + return 'Node major versions prior to v22 do not support the feature at all'; + } + + function ensureHook(): AsyncLocalStorage { + if (als) return als; + const err = asyncContextFrameError(); + if (err) { + throw new Error( + `otel thread-ctx writer requires async_context_frame support, which is unavailable: ${err}.`, + ); + } + als = new AsyncLocalStorage(); + addon.otelThreadCtxStoreAls(als); + return als; + } + + function buildWrap(opts: ContextOptions): CtxWrap { + if (!opts || typeof opts !== 'object') { + throw new TypeError('options object required'); + } + ensureHook(); + return new addon.otelThreadCtxWrap( + opts.traceId, + opts.spanId, + opts.attributes, + ); + } + + runWithContext = function (fn: () => T, opts: ContextOptions): T { + const wrap = buildWrap(opts); + return ensureHook().run(wrap, fn); + }; + + enterWithContext = function (opts: ContextOptions): void { + const wrap = buildWrap(opts); + ensureHook().enterWith(wrap); + }; + + clearContext = function (): void { + // Idempotent: clearing when no hook has been installed yet (and + // therefore no context can be active) is a no-op. + if (!als) return; + als.enterWith(undefined as unknown as CtxWrap); + }; + + appendAttributes = function ( + attributes: Array, + ): void { + if (!als) { + throw new Error( + 'no active thread context; call runWithContext or enterWithContext first', + ); + } + const wrap = als.getStore(); + if (!wrap) { + throw new Error( + 'no active thread context; call runWithContext or enterWithContext first', + ); + } + wrap.append(attributes); + }; + + isContextTruncated = function (): boolean { + if (!als) return false; + const wrap = als.getStore(); + if (!wrap) return false; + return wrap.isTruncated(); + }; + + _currentRecordBytes = function (): Uint8Array | undefined { + if (!als) return undefined; + const wrap = als.getStore(); + if (!wrap) return undefined; + return wrap.debugBytes(); + }; +} else { + runWithContext = function (fn: () => T, _opts: ContextOptions): T { + return fn(); + }; + enterWithContext = function (_opts: ContextOptions): void {}; + clearContext = function (): void {}; + appendAttributes = function ( + _attributes: Array, + ): void {}; + isContextTruncated = function (): boolean { + return false; + }; +} + +/** + * Build name-addressed wrappers around {@link runWithContext}, + * {@link enterWithContext}, and {@link appendAttributes}. The supplied + * `keys` array is the same string list the caller publishes (or has + * published) as the `threadlocal.attribute_key_map` resource attribute in + * the OTEP-4719 process context: index N in this array is the uint8 key + * index N in the on-the-wire record. The mapping is captured once at + * factory time. + */ +export function makeNamedContext(keys: string[]): NamedContext { + if (!Array.isArray(keys)) { + throw new TypeError('keys must be an array of attribute names'); + } + if (keys.length > 256) { + throw new RangeError('keys array exceeds 256 entries'); + } + const indexByName = new Map(); + keys.forEach((name, i) => { + if (typeof name !== 'string') { + throw new TypeError('every key must be a string'); + } + if (indexByName.has(name)) { + throw new Error( + `duplicate key name at indexes ${indexByName.get(name)} and ${i}: ${name}`, + ); + } + indexByName.set(name, i); + }); + + function resolveAttributes( + named: + | Record + | Map + | Array<[string, unknown]> + | undefined, + ): Array | undefined { + if (named == null) return undefined; + const attributes: Array = []; + const set = (name: string, value: unknown) => { + const idx = indexByName.get(name); + if (idx === undefined) { + throw new Error(`unknown attribute name: ${name}`); + } + attributes[idx] = String(value); + }; + if (Array.isArray(named)) { + for (const [n, v] of named) set(n, v); + } else if (named instanceof Map) { + for (const [n, v] of named) set(n, v); + } else if (typeof named === 'object') { + for (const n of Object.keys(named)) + set(n, (named as Record)[n]); + } else { + throw new TypeError( + 'namedAttributes must be an object, Map, or array of pairs', + ); + } + return attributes; + } + + function toBaseOpts(opts: NamedContextOptions): ContextOptions { + if (!opts || typeof opts !== 'object') { + throw new TypeError('options object required'); + } + return { + traceId: opts.traceId, + spanId: opts.spanId, + attributes: resolveAttributes(opts.namedAttributes), + }; + } + + const processContextAttributes = Object.freeze({ + 'threadlocal.schema_version': SCHEMA_VERSION, + 'threadlocal.attribute_key_map': Object.freeze(keys.slice()), + 'threadlocal.nodejs_v1.wrapped_object_offset': WRAPPED_OBJECT_OFFSET, + 'threadlocal.nodejs_v1.tagged_size': TAGGED_SIZE, + }) as ProcessContextAttributes; + + return { + runWithContext(fn: () => T, opts: NamedContextOptions): T { + return runWithContext(fn, toBaseOpts(opts)); + }, + enterWithContext(opts: NamedContextOptions): void { + enterWithContext(toBaseOpts(opts)); + }, + clearContext(): void { + clearContext(); + }, + appendAttributes( + namedAttributes: + | Record + | Map + | Array<[string, unknown]>, + ): void { + appendAttributes(resolveAttributes(namedAttributes)!); + }, + isContextTruncated(): boolean { + return isContextTruncated(); + }, + processContextAttributes, + }; +} diff --git a/ts/test/test-otel-thread-ctx.ts b/ts/test/test-otel-thread-ctx.ts new file mode 100644 index 00000000..3cc140af --- /dev/null +++ b/ts/test/test-otel-thread-ctx.ts @@ -0,0 +1,856 @@ +/* + * Copyright 2026 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import assert from 'assert'; +import {strict as strictAssert} from 'assert'; +import {spawnSync} from 'node:child_process'; +import {join} from 'node:path'; + +import { + ContextOptions, + appendAttributes, + clearContext, + enterWithContext, + isContextTruncated, + makeNamedContext, + runWithContext, + _currentRecordBytes, +} from '../src/otel-thread-ctx'; + +const isLinux = process.platform === 'linux'; + +// Returns a plain Uint8Array (not a Buffer) so assert.deepStrictEqual against +// other Uint8Arrays — including the one the addon returns — succeeds. +function bytesFromHex(hex: string): Uint8Array { + return Uint8Array.from(Buffer.from(hex, 'hex')); +} + +const TRACE_ID_BYTES = bytesFromHex('0102030405060708090a0b0c0d0e0f10'); +const SPAN_ID_BYTES = bytesFromHex('1112131415161718'); + +interface Header { + traceId: Uint8Array; + spanId: Uint8Array; + valid: number; + reserved: number; + attrsDataSize: number; +} + +function decodeHeader(bytes: Uint8Array): Header { + strictAssert.ok(bytes.length >= 28, `record must be at least 28 bytes, got ${bytes.length}`); + const attrsDataSize = bytes[26] | (bytes[27] << 8); + strictAssert.equal( + bytes.length, + 28 + attrsDataSize, + `record length (${bytes.length}) must equal 28 + attrs_data_size (${attrsDataSize})`, + ); + return { + traceId: bytes.slice(0, 16), + spanId: bytes.slice(16, 24), + valid: bytes[24], + reserved: bytes[25], + attrsDataSize, + }; +} + +// Returns the attribute payload as a positional sparse array, mirroring the +// writer's input shape: index N is the value for uint8 key index N on the +// wire; unset slots are array holes. +function decodeAttrs(bytes: Uint8Array): Array { + const hdr = decodeHeader(bytes); + const out: Array = []; + let i = 28; + const end = i + hdr.attrsDataSize; + while (i < end) { + const idx = bytes[i++]; + const len = bytes[i++]; + out[idx] = Buffer.from(bytes.slice(i, i + len)).toString('utf8'); + i += len; + } + strictAssert.equal(i, end, 'attrs payload must be exactly attrsDataSize bytes'); + return out; +} + +function captureBytes(opts: { + traceId: Uint8Array; + spanId: Uint8Array; + attributes?: Array; +}): Uint8Array { + let bytes: Uint8Array | undefined; + runWithContext(() => { + bytes = _currentRecordBytes(); + }, opts); + return bytes as Uint8Array; +} + +(isLinux ? describe : describe.skip)('OTEP-4947 thread context (Linux-only)', () => { + describe('CtxWrap construction', () => { + it('accepts Uint8Array trace and span IDs', () => { + const bytes = captureBytes({traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + const hdr = decodeHeader(bytes); + strictAssert.deepEqual(hdr.traceId, TRACE_ID_BYTES); + strictAssert.deepEqual(hdr.spanId, SPAN_ID_BYTES); + strictAssert.equal(hdr.valid, 1); + strictAssert.equal(hdr.reserved, 0); + strictAssert.equal(hdr.attrsDataSize, 0); + }); + + it('accepts Buffer (Uint8Array subclass) trace and span IDs', () => { + const bytes = captureBytes({ + traceId: Buffer.from(TRACE_ID_BYTES), + spanId: Buffer.from(SPAN_ID_BYTES), + }); + const hdr = decodeHeader(bytes); + strictAssert.deepEqual(hdr.traceId, TRACE_ID_BYTES); + strictAssert.deepEqual(hdr.spanId, SPAN_ID_BYTES); + }); + + it('rejects wrong-length traceId', () => { + strictAssert.throws( + () => captureBytes({traceId: new Uint8Array(8), spanId: SPAN_ID_BYTES}), + /traceId must be/, + ); + }); + + it('rejects wrong-length spanId', () => { + strictAssert.throws( + () => captureBytes({traceId: TRACE_ID_BYTES, spanId: new Uint8Array(4)}), + /spanId must be/, + ); + }); + + it('rejects non-Uint8Array traceId', () => { + strictAssert.throws( + () => + captureBytes({ + traceId: 'a'.repeat(32) as unknown as Uint8Array, + spanId: SPAN_ID_BYTES, + }), + /traceId must be/, + ); + }); + }); + + describe('attribute encoding', () => { + it('leaves attrs_data empty when no attributes are provided', () => { + const bytes = captureBytes({traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + strictAssert.equal(decodeHeader(bytes).attrsDataSize, 0); + }); + + it('encodes attributes by position', () => { + const bytes = captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: ['GET', '/api/v1/widgets'], + }); + strictAssert.deepEqual(decodeAttrs(bytes), ['GET', '/api/v1/widgets']); + }); + + it('skips null and undefined slots', () => { + const bytes = captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: ['zero', null, undefined, 'three'], + }); + strictAssert.deepEqual(decodeAttrs(bytes), ['zero', , , 'three']); + }); + + it('skips trailing array holes', () => { + const attributes: Array = []; + attributes[5] = 'five'; + const bytes = captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes, + }); + strictAssert.deepEqual(decodeAttrs(bytes), [, , , , , 'five']); + }); + + it('coerces non-string values via toString', () => { + const bytes = captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: [42 as unknown as string, true as unknown as string], + }); + strictAssert.deepEqual(decodeAttrs(bytes), ['42', 'true']); + }); + + it('truncates values longer than 255 bytes to 255', () => { + const long = 'x'.repeat(300); + const bytes = captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: [long], + }); + strictAssert.deepEqual(decodeAttrs(bytes), ['x'.repeat(255)]); + }); + + it('does not split a multibyte UTF-8 codepoint at the truncation boundary', () => { + const euro = '€'; + const bytes = captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: [euro.repeat(86)], + }); + strictAssert.deepEqual(decodeAttrs(bytes), [euro.repeat(85)]); + strictAssert.equal(decodeHeader(bytes).attrsDataSize, 2 + 255); + + const bytes2 = captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: [euro.repeat(84) + 'éé'], + }); + strictAssert.deepEqual(decodeAttrs(bytes2), [euro.repeat(84) + 'é']); + strictAssert.equal(decodeHeader(bytes2).attrsDataSize, 2 + 254); + }); + + it('right-sizes an empty record to 28 bytes', () => { + const bytes = captureBytes({traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + strictAssert.equal(bytes.length, 28); + }); + + it('right-sizes a one-short-attribute record to 28 + 2 + len bytes', () => { + const bytes = captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: ['GET'], + }); + strictAssert.equal(bytes.length, 28 + 2 + 3); + }); + + it('skip-and-continue truncates past the 612-byte cap', () => { + const a = 'a'.repeat(255); + const b = 'b'.repeat(255); + const c = 'c'.repeat(255); + const d = 'd'.repeat(30); + let bytes: Uint8Array | undefined; + let truncated = false; + runWithContext( + () => { + bytes = _currentRecordBytes(); + truncated = isContextTruncated(); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES, attributes: [a, b, c, d]}, + ); + strictAssert.deepEqual(decodeAttrs(bytes!), [a, b, , d]); + strictAssert.equal(decodeHeader(bytes!).attrsDataSize, 514 + 32); + strictAssert.equal(truncated, true); + }); + + it('rejects attributes array longer than 256', () => { + const tooLong: Array = new Array(257); + strictAssert.throws( + () => + captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: tooLong, + }), + /must not exceed 256/, + ); + }); + + it('rejects non-array attributes argument', () => { + strictAssert.throws( + () => + captureBytes({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: {not: 'an array'} as unknown as Array, + }), + /attributes must be an array/, + ); + }); + }); + + describe('runWithContext lifecycle', () => { + it('returns the callback result', () => { + const result = runWithContext(() => 'ok', { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + }); + strictAssert.equal(result, 'ok'); + }); + + it('has no active record outside the call', () => { + strictAssert.equal(_currentRecordBytes(), undefined); + }); + + it('has no active record after the call returns', () => { + runWithContext(() => undefined, { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + }); + strictAssert.equal(_currentRecordBytes(), undefined); + }); + + it('restores the parent context after a nested call returns', () => { + const outerOpts = {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}; + const innerSpanBytes = bytesFromHex('aabbccddeeff0011'); + const innerOpts = {traceId: TRACE_ID_BYTES, spanId: innerSpanBytes}; + + runWithContext(() => { + const outerBefore = decodeHeader(_currentRecordBytes()!).spanId; + runWithContext(() => { + const inner = decodeHeader(_currentRecordBytes()!).spanId; + strictAssert.deepEqual(inner, innerSpanBytes); + }, innerOpts); + const outerAfter = decodeHeader(_currentRecordBytes()!).spanId; + strictAssert.deepEqual(outerBefore, outerAfter); + strictAssert.deepEqual(outerAfter, SPAN_ID_BYTES); + }, outerOpts); + }); + + it('keeps the same record after awaits', async () => { + await runWithContext(async () => { + const before = decodeHeader(_currentRecordBytes()!).spanId; + await Promise.resolve(); + const afterMicro = decodeHeader(_currentRecordBytes()!).spanId; + await new Promise(setImmediate); + const afterMacro = decodeHeader(_currentRecordBytes()!).spanId; + strictAssert.deepEqual(before, SPAN_ID_BYTES); + strictAssert.deepEqual(afterMicro, SPAN_ID_BYTES); + strictAssert.deepEqual(afterMacro, SPAN_ID_BYTES); + }, {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + }); + + it('keeps concurrent async calls isolated', async () => { + const aSpan = bytesFromHex('1111111111111111'); + const bSpan = bytesFromHex('2222222222222222'); + + async function run(spanBytes: Uint8Array) { + return runWithContext(async () => { + const observed: Uint8Array[] = []; + for (let i = 0; i < 4; i++) { + observed.push(decodeHeader(_currentRecordBytes()!).spanId); + await Promise.resolve(); + } + return observed; + }, {traceId: TRACE_ID_BYTES, spanId: spanBytes}); + } + + const [aObs, bObs] = await Promise.all([run(aSpan), run(bSpan)]); + for (const s of aObs) strictAssert.deepEqual(s, aSpan); + for (const s of bObs) strictAssert.deepEqual(s, bSpan); + }); + }); + + describe('enterWithContext', () => { + it('attaches the record to the current async scope', () => { + runWithContext(() => { + strictAssert.deepEqual( + decodeHeader(_currentRecordBytes()!).spanId, + SPAN_ID_BYTES, + ); + + const newSpan = bytesFromHex('aabbccddeeff0011'); + enterWithContext({traceId: TRACE_ID_BYTES, spanId: newSpan}); + strictAssert.deepEqual(decodeHeader(_currentRecordBytes()!).spanId, newSpan); + + return Promise.resolve().then(() => { + strictAssert.deepEqual( + decodeHeader(_currentRecordBytes()!).spanId, + newSpan, + ); + }); + }, {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + + strictAssert.equal(_currentRecordBytes(), undefined); + }); + + it('requires an options object', () => { + strictAssert.throws( + () => enterWithContext(undefined as unknown as ContextOptions), + /options object required/, + ); + }); + }); + + describe('clearContext', () => { + it('detaches the active record within a scope', () => { + runWithContext( + () => { + strictAssert.ok(_currentRecordBytes()); + clearContext(); + strictAssert.equal(_currentRecordBytes(), undefined); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}, + ); + }); + + it('makes appendAttributes throw and isContextTruncated return false', () => { + runWithContext( + () => { + clearContext(); + strictAssert.throws( + () => appendAttributes(['v']), + /no active thread context/, + ); + strictAssert.equal(isContextTruncated(), false); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}, + ); + }); + + it('is idempotent (calling with no context or twice is a no-op)', () => { + clearContext(); + strictAssert.equal(_currentRecordBytes(), undefined); + runWithContext( + () => { + clearContext(); + clearContext(); + strictAssert.equal(_currentRecordBytes(), undefined); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}, + ); + }); + + it('lets a nested runWithContext re-establish a record', () => { + runWithContext( + () => { + clearContext(); + const innerSpan = bytesFromHex('aabbccddeeff0011'); + runWithContext( + () => { + strictAssert.deepEqual( + decodeHeader(_currentRecordBytes()!).spanId, + innerSpan, + ); + }, + {traceId: TRACE_ID_BYTES, spanId: innerSpan}, + ); + // After the inner runWithContext returns, we're back to the + // post-clear state in the outer scope. + strictAssert.equal(_currentRecordBytes(), undefined); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}, + ); + }); + + it('lets enterWithContext re-establish a record', () => { + runWithContext( + () => { + clearContext(); + const newSpan = bytesFromHex('aabbccddeeff0011'); + enterWithContext({traceId: TRACE_ID_BYTES, spanId: newSpan}); + strictAssert.deepEqual( + decodeHeader(_currentRecordBytes()!).spanId, + newSpan, + ); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}, + ); + }); + + it('named.clearContext detaches the active record', () => { + const named = makeNamedContext(['route']); + named.runWithContext( + () => { + strictAssert.ok(_currentRecordBytes()); + named.clearContext(); + strictAssert.equal(_currentRecordBytes(), undefined); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: {route: '/x'}, + }, + ); + }); + }); + + describe('appendAttributes', () => { + it('adds entries to the current record', () => { + runWithContext( + () => { + strictAssert.deepEqual(decodeAttrs(_currentRecordBytes()!), ['GET']); + appendAttributes([, , '200']); + strictAssert.deepEqual(decodeAttrs(_currentRecordBytes()!), [ + 'GET', + , + '200', + ]); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES, attributes: ['GET']}, + ); + }); + + it('writes in-place when bytes fit in the slack', () => { + runWithContext( + () => { + const before = _currentRecordBytes()!; + appendAttributes([, 'ab']); + const after = _currentRecordBytes()!; + strictAssert.deepEqual(decodeAttrs(after), ['xxx', 'ab']); + strictAssert.equal(after.length, before.length + 2 + 2); + strictAssert.deepEqual(after.slice(0, 26), before.slice(0, 26)); + strictAssert.deepEqual(after.slice(28, 33), before.slice(28, 33)); + strictAssert.equal(after[24], 1); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES, attributes: ['xxx']}, + ); + }); + + it('grows the record geometrically when slack runs out', () => { + runWithContext(() => { + const v = 'y'.repeat(60); + for (let i = 0; i < 8; i++) { + const append: Array = []; + append[i] = v; + appendAttributes(append); + } + const decoded = decodeAttrs(_currentRecordBytes()!); + for (let i = 0; i < 8; i++) { + strictAssert.equal(decoded[i], v, `slot ${i}`); + } + strictAssert.equal( + decodeHeader(_currentRecordBytes()!).attrsDataSize, + 8 * 62, + ); + }, {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + }); + + it('throws when there is no current context', () => { + strictAssert.throws(() => appendAttributes(['v']), /no active thread context/); + }); + + it('is a no-op when given an empty array', () => { + runWithContext(() => { + const before = _currentRecordBytes(); + appendAttributes([]); + const after = _currentRecordBytes(); + strictAssert.deepEqual(after, before); + }, {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + }); + + it('is a no-op when all slots are null/undefined', () => { + runWithContext(() => { + const before = _currentRecordBytes(); + appendAttributes([null, undefined, , null]); + const after = _currentRecordBytes(); + strictAssert.deepEqual(after, before); + }, {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + }); + + it('silently drops entries past the 612-byte cap and sets the truncated flag', () => { + const big = 'a'.repeat(255); + runWithContext(() => { + appendAttributes([big, big]); + strictAssert.equal(isContextTruncated(), false); + appendAttributes([, , big]); + strictAssert.equal(isContextTruncated(), true); + strictAssert.equal(decodeHeader(_currentRecordBytes()!).attrsDataSize, 514); + const small = 'x'.repeat(30); + appendAttributes([, , , small]); + const decoded = decodeAttrs(_currentRecordBytes()!); + strictAssert.equal(decoded[0], big); + strictAssert.equal(decoded[1], big); + strictAssert.equal(decoded[2], undefined); + strictAssert.equal(decoded[3], small); + strictAssert.equal(isContextTruncated(), true); + }, {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}); + }); + + it('propagates through async continuations', async () => { + await runWithContext( + async () => { + appendAttributes([, 'after-await']); + await Promise.resolve(); + strictAssert.deepEqual(decodeAttrs(_currentRecordBytes()!), [ + 'before', + 'after-await', + ]); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: ['before'], + }, + ); + }); + }); + + describe('isContextTruncated', () => { + it('returns false outside a context', () => { + strictAssert.equal(isContextTruncated(), false); + }); + + it('returns false for a non-truncated record', () => { + runWithContext( + () => { + strictAssert.equal(isContextTruncated(), false); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + attributes: ['GET', '/x'], + }, + ); + }); + }); + + describe('makeNamedContext', () => { + it('rejects non-array keys', () => { + strictAssert.throws( + () => makeNamedContext({} as unknown as string[]), + /must be an array/, + ); + }); + + it('rejects more than 256 keys', () => { + const tooMany = Array.from({length: 257}, (_, i) => `k${i}`); + strictAssert.throws(() => makeNamedContext(tooMany), /exceeds 256/); + }); + + it('rejects duplicate names', () => { + strictAssert.throws( + () => makeNamedContext(['x', 'y', 'x']), + /duplicate key name/, + ); + }); + + it('rejects non-string entries', () => { + strictAssert.throws( + () => makeNamedContext(['ok', 42 as unknown as string]), + /must be a string/, + ); + }); + + it('returns an object exposing all five NamedContext methods', () => { + const named = makeNamedContext(['a']); + strictAssert.equal(typeof named.runWithContext, 'function'); + strictAssert.equal(typeof named.enterWithContext, 'function'); + strictAssert.equal(typeof named.clearContext, 'function'); + strictAssert.equal(typeof named.appendAttributes, 'function'); + strictAssert.equal(typeof named.isContextTruncated, 'function'); + }); + + it('resolves namedAttributes given as an object', () => { + const named = makeNamedContext(['http.method', 'http.route']); + let bytes: Uint8Array | undefined; + named.runWithContext( + () => { + bytes = _currentRecordBytes(); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: {'http.method': 'GET', 'http.route': '/x'}, + }, + ); + strictAssert.deepEqual(decodeAttrs(bytes!), ['GET', '/x']); + }); + + it('resolves namedAttributes given as a Map', () => { + const named = makeNamedContext(['a', 'b']); + let bytes: Uint8Array | undefined; + named.runWithContext( + () => { + bytes = _currentRecordBytes(); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: new Map([ + ['a', 'A'], + ['b', 'B'], + ]), + }, + ); + strictAssert.deepEqual(decodeAttrs(bytes!), ['A', 'B']); + }); + + it('resolves namedAttributes given as an array of pairs', () => { + const named = makeNamedContext(['a', 'b']); + let bytes: Uint8Array | undefined; + named.runWithContext( + () => { + bytes = _currentRecordBytes(); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: [ + ['a', 'A'], + ['b', 'B'], + ], + }, + ); + strictAssert.deepEqual(decodeAttrs(bytes!), ['A', 'B']); + }); + + it('rejects unknown names', () => { + const named = makeNamedContext(['a']); + strictAssert.throws( + () => + named.runWithContext(() => undefined, { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: {unknown: 'v'}, + }), + /unknown attribute name: unknown/, + ); + }); + + it('coerces non-string values', () => { + const named = makeNamedContext(['n']); + let bytes: Uint8Array | undefined; + named.runWithContext( + () => { + bytes = _currentRecordBytes(); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: {n: 7}, + }, + ); + strictAssert.deepEqual(decodeAttrs(bytes!), ['7']); + }); + + it('enterWithContext attaches a name-addressed record', () => { + const named = makeNamedContext(['route']); + runWithContext( + () => { + named.enterWithContext({ + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: {route: '/x'}, + }); + strictAssert.deepEqual(decodeAttrs(_currentRecordBytes()!), ['/x']); + }, + {traceId: TRACE_ID_BYTES, spanId: SPAN_ID_BYTES}, + ); + }); + + it('appendAttributes appends by name', () => { + const named = makeNamedContext(['http.method', 'http.route', 'http.status']); + named.runWithContext( + () => { + named.appendAttributes({'http.status': '500'}); + strictAssert.deepEqual(decodeAttrs(_currentRecordBytes()!), [ + 'GET', + '/x', + '500', + ]); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: {'http.method': 'GET', 'http.route': '/x'}, + }, + ); + }); + + it('appendAttributes rejects unknown names', () => { + const named = makeNamedContext(['known']); + named.runWithContext( + () => { + strictAssert.throws( + () => named.appendAttributes({unknown: 'v'}), + /unknown attribute name: unknown/, + ); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: {known: 'k'}, + }, + ); + }); + + it('isContextTruncated mirrors the top-level function', () => { + const named = makeNamedContext(['a', 'b', 'c']); + named.runWithContext( + () => { + strictAssert.equal(named.isContextTruncated(), false); + appendAttributes([ + , + , + 'c'.repeat(255), + , + , + 'd'.repeat(255), + , + , + 'e'.repeat(255), + ]); + strictAssert.equal(named.isContextTruncated(), true); + }, + { + traceId: TRACE_ID_BYTES, + spanId: SPAN_ID_BYTES, + namedAttributes: {a: 'a', b: 'b'}, + }, + ); + }); + + describe('processContextAttributes', () => { + it('matches the input keys plus the V8 layout constants', () => { + const keys = ['http.method', 'http.route', 'user.id']; + const named = makeNamedContext(keys); + const pca = named.processContextAttributes; + strictAssert.equal(pca['threadlocal.schema_version'], 'nodejs_v1'); + strictAssert.deepEqual(pca['threadlocal.attribute_key_map'], keys); + strictAssert.equal(pca['threadlocal.nodejs_v1.wrapped_object_offset'], 24); + strictAssert.equal(pca['threadlocal.nodejs_v1.tagged_size'], 8); + strictAssert.deepEqual(Object.keys(pca).sort(), [ + 'threadlocal.attribute_key_map', + 'threadlocal.nodejs_v1.tagged_size', + 'threadlocal.nodejs_v1.wrapped_object_offset', + 'threadlocal.schema_version', + ]); + }); + + it('is frozen and a defensive copy', () => { + const keys = ['http.method', 'http.route']; + const named = makeNamedContext(keys); + const pca = named.processContextAttributes; + strictAssert.ok(Object.isFrozen(pca)); + strictAssert.ok(Object.isFrozen(pca['threadlocal.attribute_key_map'])); + keys.push('mutated.after'); + strictAssert.deepEqual(pca['threadlocal.attribute_key_map'], [ + 'http.method', + 'http.route', + ]); + strictAssert.throws(() => { + (pca as unknown as Record)['threadlocal.schema_version'] = + 'tampered'; + }, /read-only|read only|TypeError/i); + }); + }); + }); + + describe('discovery contract', () => { + it('exports otel_thread_ctx_nodejs_v1 as a TLS dynsym', function () { + const addon = join(__dirname, '..', '..', 'build', 'Release', 'dd_pprof.node'); + const r = spawnSync('readelf', ['--dyn-syms', '--wide', addon], { + encoding: 'utf8', + }); + if (r.error && (r.error as NodeJS.ErrnoException).code === 'ENOENT') { + this.skip(); + } + strictAssert.equal(r.status, 0, `readelf failed: ${r.stderr}`); + const line = r.stdout + .split('\n') + .find((l) => /\sotel_thread_ctx_nodejs_v1$/.test(l)); + assert.ok(line, 'otel_thread_ctx_nodejs_v1 not present in dynamic symbol table'); + assert.match(line!, /\bTLS\b/, `expected TLS type, got: ${line!.trim()}`); + assert.match(line!, /\bGLOBAL\b/, `expected GLOBAL binding, got: ${line!.trim()}`); + assert.match(line!, /\bDEFAULT\b/, `expected DEFAULT visibility, got: ${line!.trim()}`); + }); + }); +});