From d6e7a82bdb14827920a951187115266101acad1a Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 1 May 2026 18:46:03 -0400 Subject: [PATCH 1/4] PYTHON-5676 Phase 1: Remove dead legacy OP_QUERY code from run_operation() Since MIN_SUPPORTED_WIRE_VERSION=8 (MongoDB 4.2+), use_command() always returns True and OP_MSG is always used. Remove the unreachable OP_QUERY branches from Server.run_operation(), along with the dead else-branches in Cursor and CommandCursor that referenced _OpReply. - Hard-code use_cmd=True / apply_timeout=True in run_operation() - Always use user_fields=_CURSOR_DOC_FIELDS and legacy_response=False - Remove isinstance(reply, _OpMsg) exhaust check (reply is always _OpMsg) - Hard-code from_command=True in Response/PinnedResponse construction - Remove dead else-branches with assert isinstance(response.data, _OpReply) in Cursor._send_message and CommandCursor._send_message - Add scope document PYTHON-5676-scope.md outlining all four phases --- PYTHON-5676-scope.md | 183 +++++++++++++++++++++++++ pymongo/asynchronous/command_cursor.py | 13 +- pymongo/asynchronous/cursor.py | 34 ++--- pymongo/asynchronous/server.py | 56 +++----- pymongo/synchronous/command_cursor.py | 13 +- pymongo/synchronous/cursor.py | 34 ++--- pymongo/synchronous/server.py | 56 +++----- 7 files changed, 251 insertions(+), 138 deletions(-) create mode 100644 PYTHON-5676-scope.md diff --git a/PYTHON-5676-scope.md b/PYTHON-5676-scope.md new file mode 100644 index 0000000000..56a7fc0da2 --- /dev/null +++ b/PYTHON-5676-scope.md @@ -0,0 +1,183 @@ +# PYTHON-5676: Consolidate Command Execution Logic — Scope Document + +## Context + +PyMongo has accumulated several distinct code paths for executing commands against the server. When cross-cutting logic (e.g., backpressure, CSOT, monitoring) needs to be added or changed, engineers must find and update multiple paths. The goal is a single central path for all database operations, making future changes local to one place. + +PYTHON-1357 (Refactor Cursor and CommandCursor) is now Closed — the prerequisite work is done. + +--- + +## Current Command Execution Paths + +All paths are duplicated across `pymongo/asynchronous/` (source of truth) and `pymongo/synchronous/` (auto-generated via `tools/synchro.sh`). + +### Path 1 — Standard commands (most operations) +``` +Collection._command() / Database.command() / conn.command() (direct) + → Connection.command() [pool.py:344] — session, write concern, server API, reauth + → network.command() [network.py:61] — encode OP_MSG, APM, logging, CSOT, encryption, send/recv +``` +Used by: insert, update, delete, aggregate, distinct, find_one, createIndex, dropCollection, db.command(), etc. + +### Path 2 — Cursor operations (find / getMore) +``` +Cursor._send_message() + → MongoClient._run_operation() [mongo_client.py:1911] + → Server.run_operation() [server.py:138] — send/recv + FULL APM/logging (≈80 lines, near-identical to network.command()) +``` +**Bypasses `network.command()` entirely.** Has its own APM/monitoring and logging code. + +### Path 3 — Acknowledged bulk writes (collection-level) +``` +_Bulk._execute_batch() + → _Bulk.write_command() [bulk.py:244] — APM/logging wrapper (≈80 lines) + → Connection.write_command() [pool.py:480] — pre-encoded bytes, raw send+recv, no APM +``` +**Bypasses both `Connection.command()` and `network.command()`.** Note: for *encrypted* bulk writes, this path already uses `Connection.command()` — the non-encrypted path is the outlier. + +### Path 4 — Unacknowledged bulk writes +``` +_Bulk._execute_batch_unack() + → _Bulk.unack_write() [bulk.py:329] — APM/logging wrapper (≈70 lines) + → Connection.unack_write() [pool.py:469] — raw send only, no recv +``` +**Bypasses `Connection.command()` and `network.command()`.** + +### Path 5 — Client-level bulk writes (cross-collection) +`client_bulk.py` mirrors Paths 3 and 4 with its own `write_command()` and `unack_write()` wrappers — another full copy of the APM/logging boilerplate. + +--- + +## Key Findings + +**1. APM/logging code is copy-pasted 5+ times.** +`Server.run_operation()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, `_ClientBulk.unack_write()` all contain the same ~70-80 line block of `_COMMAND_LOGGER.isEnabledFor(DEBUG)` + `_debug_log(...)` + `listeners.publish_command_start/success/failure(...)`. The reference implementation is `network.command()`. + +**2. Dead legacy OP_QUERY code in `Server.run_operation()`.** +`MIN_SUPPORTED_WIRE_VERSION = 8` (MongoDB 4.2+, `common.py`). OP_MSG was introduced at wire version 6. The `use_cmd = False` branch (legacy OP_QUERY path) in `run_operation()` can never be reached for any currently supported server. The `_Query.use_command()` check at `message.py:1622` confirms: for wire version ≥ 8, it is unconditionally True. + +**3. `_op_msg()` already handles bulk write Type 1 sections.** +`message._op_msg()` (line 394) detects insert/update/delete commands, pops the documents field, and encodes them as a Type 1 section. The non-encrypted bulk write path bypasses this and does its own single-pass encode+batch-size-check via `_do_batched_op_msg`. Unifying would require separating batch determination from encoding (currently combined for performance). + +**4. Encrypted bulk writes already use `Connection.command()`.** +In `_Bulk._execute_batch()` (bulk.py:444), the `if self.is_encrypted:` branch calls `bwc.conn.command()`. The non-encrypted `else` branch goes through `Connection.write_command()`. These should be the same path. + +**5. Async is generated from async source.** +All `pymongo/synchronous/*.py` files are auto-generated from `pymongo/asynchronous/*.py` via `tools/synchro.sh`. All code changes must be made in the `asynchronous/` files only. + +--- + +## Proposed Consolidation: Phased Approach + +### Phase 1 — Remove dead OP_QUERY code from `run_operation()` *(Low risk)* + +**What:** Delete the `use_cmd = False` branches from `Server.run_operation()`. Remove the conditional `if use_cmd: ... else: ...` blocks for `user_fields`/`legacy_response` and response-building. + +**Files:** `pymongo/asynchronous/server.py`, `pymongo/asynchronous/cursor.py` (remove the dead `else` branches referencing `_OpReply`) + +**Why now:** This is purely dead code removal — no behavior change. It simplifies Phase 3 significantly and is independently safe. + +**Verification:** Full test suite; specifically `test/test_cursor.py` and `test/test_unified_format.py`. + +--- + +### Phase 2 — Extract shared APM/logging helpers *(Low risk)* + +**What:** Extract the duplicated APM/logging block into shared helper functions in `pymongo/helpers_shared.py` (or a new `pymongo/command_helpers.py`). All five duplicate sites call the helpers instead of inlining the code. + +```python +# No sync/async split needed — these functions contain no I/O +def _log_and_publish_command_started(client, listeners, cmd, dbname, request_id, conn): + ... + + +def _log_and_publish_command_succeeded( + client, listeners, cmd, dbname, request_id, conn, reply, duration +): + ... + + +def _log_and_publish_command_failed( + client, listeners, cmd, dbname, request_id, conn, failure, duration +): + ... +``` + +**Files:** `pymongo/helpers_shared.py` (or new `pymongo/command_helpers.py`), `pymongo/asynchronous/network.py`, `pymongo/asynchronous/server.py`, `pymongo/asynchronous/bulk.py`, `pymongo/asynchronous/client_bulk.py` + +**Why this matters:** This is the direct solution to the stated problem. When future cross-cutting logic needs to be added to "command execution", there is one place to add it. + +**Note on APM operationId:** `_BulkWriteContext` passes `op_id` (not `request_id`) as the `operationId` in APM events. This is spec-required behavior that must be preserved in the helpers. + +**Verification:** `test/test_command_monitoring.py`, `test/test_monitoring.py`, `test/test_unified_format.py`. + +--- + +### Phase 3 — Unify `Server.run_operation()` with `network.command()` *(Medium risk)* + +**What:** Refactor `network.command()` into a two-layer structure: + +- `_network_command_core()` — performs all work (encode, APM, send, recv) and returns `(response_doc, raw_reply, request_id, duration)` +- `command()` — thin wrapper returning just `response_doc` (existing callers unchanged) + +`Server.run_operation()` (after Phase 1+2) calls `_network_command_core()` for the actual transport, then wraps the result in `Response`/`PinnedResponse`. + +**Key complexity:** `RawBatchCursor._unpack_response()` (`cursor.py:1196`) calls `response.raw_response(cursor_id)` on the raw `_OpMsg` object — it needs the raw reply, not just the decoded dict. The `_network_command_core()` design satisfies this by exposing `raw_reply`. The `@_handle_reauth` decorator on `run_operation()` must also be preserved. + +**Files:** `pymongo/asynchronous/network.py`, `pymongo/asynchronous/server.py` + +**Result:** Cursor operations (find/getMore) fully share the command execution path with all other operations. Single place to add transport-level logic. + +**Verification:** `test/test_cursor.py`, exhaust cursor tests, `test/test_encryption.py`, `test/test_csot.py`. + +--- + +### Phase 4 — Unify non-encrypted bulk write path with `Connection.command()` *(Higher risk, defer)* + +**What:** Change `_Bulk._execute_batch()` to use `Connection.command()` for non-encrypted bulk writes, matching the already-unified encrypted path. Requires `_BulkWriteContext.batch_command()` to return a command dict instead of pre-encoded bytes. + +**The challenge:** `_do_batched_op_msg` performs batch-size determination and encoding in a single pass (including a C extension: `_cmessage._batched_op_msg`). Separating batch determination from encoding requires a two-pass approach, adding encoding overhead. **Needs benchmarking before committing.** + +`_EncryptedBulkWriteContext.batch_command()` encodes to bytes and then deserializes back via `_inflate_bson` — aligning the non-encrypted path with this approach may be the right model. + +**Result after Phase 4:** `Connection.write_command()`, `Connection.unack_write()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, and `_ClientBulk.unack_write()` have no callers and can be removed. + +**Gate:** Bulk write benchmarks before/after. Suggested regression threshold: ≤2% throughput degradation. + +--- + +## Definition of Done + +- All database operations route through `Connection.command()` → `network.command()` for actual transport +- APM/logging code exists in one location +- `Connection.write_command()`, `Connection.unack_write()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, `_ClientBulk.unack_write()` are removed (or reduced to thin wrappers) +- No performance regression (bulk write benchmarks) +- No behavioral regression (full spec test suite passes) + +--- + +## Risk Summary + +| Phase | Risk | Effort | Value | +|-------|------|--------|-------| +| 1 — Remove dead OP_QUERY code | Low | ~1 day | Medium (simplification) | +| 2 — Extract APM helpers | Low | ~2 days | **High** (solves the stated problem) | +| 3 — Unify run_operation with network.command | Medium | ~3 days | High (true path consolidation) | +| 4 — Unify bulk write bytes path | Higher | ~1 week + benchmarking | Medium (removes last bypass) | + +Phases 1 and 2 can be done together in one PR. Phase 3 should be a separate PR. Phase 4 should be gated on benchmarking. + +--- + +## Critical Files + +| File | Role | +|------|------| +| `pymongo/asynchronous/network.py:61` | Central `command()` — reference implementation | +| `pymongo/asynchronous/server.py:138` | `run_operation()` — cursor path bypass | +| `pymongo/asynchronous/pool.py:344,469,480` | `Connection.command`, `unack_write`, `write_command` | +| `pymongo/asynchronous/bulk.py:244,329,444` | `_Bulk.write_command`, `unack_write`, `_execute_batch` | +| `pymongo/asynchronous/client_bulk.py:229,320` | `_ClientBulk.write_command`, `unack_write` | +| `pymongo/message.py:394,695,1622` | `_op_msg`, `_BulkWriteContext.batch_command`, `_Query.use_command` | +| `pymongo/common.py` | `MIN_SUPPORTED_WIRE_VERSION = 8` | diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index 5a59c67a15..34194899e1 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -189,15 +189,10 @@ async def _send_message(self, operation: _GetMore) -> None: if isinstance(response, PinnedResponse): if not self._sock_mgr: self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type] - if response.from_command: - cursor = response.docs[0]["cursor"] - documents = cursor["nextBatch"] - self._postbatchresumetoken = cursor.get("postBatchResumeToken") - self._id = cursor["id"] - else: - documents = response.docs - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + cursor = response.docs[0]["cursor"] + documents = cursor["nextBatch"] + self._postbatchresumetoken = cursor.get("postBatchResumeToken") + self._id = cursor["id"] if self._id == 0: await self.close() diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index a60c082ade..f7c1671777 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -1020,29 +1020,23 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None: cmd_name = operation.name docs = response.docs - if response.from_command: - if cmd_name != "explain": - cursor = docs[0]["cursor"] - self._id = cursor["id"] - if cmd_name == "find": - documents = cursor["firstBatch"] - # Update the namespace used for future getMore commands. - ns = cursor.get("ns") - if ns: - self._dbname, self._collname = ns.split(".", 1) - else: - documents = cursor["nextBatch"] - self._data = deque(documents) - self._retrieved += len(documents) + if cmd_name != "explain": + cursor = docs[0]["cursor"] + self._id = cursor["id"] + if cmd_name == "find": + documents = cursor["firstBatch"] + # Update the namespace used for future getMore commands. + ns = cursor.get("ns") + if ns: + self._dbname, self._collname = ns.split(".", 1) else: - self._id = 0 - self._data = deque(docs) - self._retrieved += len(docs) + documents = cursor["nextBatch"] + self._data = deque(documents) + self._retrieved += len(documents) else: - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + self._id = 0 self._data = deque(docs) - self._retrieved += response.data.number_returned + self._retrieved += len(docs) if self._id == 0: # Don't wait for garbage collection to call __del__, return the diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index f212306174..aaf7ef82b9 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -37,7 +37,7 @@ _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query +from pymongo.message import _convert_exception, _GetMore, _Query from pymongo.response import PinnedResponse, Response if TYPE_CHECKING: @@ -161,13 +161,13 @@ async def run_operation( publish = listeners.enabled_for_commands start = datetime.now() - use_cmd = operation.use_command(conn) + operation.use_command(conn) more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come - cmd, dbn = await self.operation_to_command(operation, conn, use_cmd) + cmd, dbn = await self.operation_to_command(operation, conn, True) if more_to_come: request_id = 0 else: - message = operation.get_message(read_preference, conn, use_cmd) + message = operation.get_message(read_preference, conn, True) request_id, data, max_doc_size = self._split_message(message) if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): @@ -208,23 +208,16 @@ async def run_operation( reply = await conn.receive_message(request_id) # Unpack and check for command errors. - if use_cmd: - user_fields = _CURSOR_DOC_FIELDS - legacy_response = False - else: - user_fields = None - legacy_response = True docs = unpack_res( reply, operation.cursor_id, operation.codec_options, - legacy_response=legacy_response, - user_fields=user_fields, + legacy_response=False, + user_fields=_CURSOR_DOC_FIELDS, ) - if use_cmd: - first = docs[0] - await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] + first = docs[0] + await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] + _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] except Exception as exc: duration = datetime.now() - start if isinstance(exc, (NotPrimaryError, OperationFailure)): @@ -263,18 +256,8 @@ async def run_operation( ) raise duration = datetime.now() - start - # Must publish in find / getMore / explain command response - # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs + # Must publish in find / getMore / explain command response format. + res = docs[0] if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _COMMAND_LOGGER, @@ -308,21 +291,14 @@ async def run_operation( # Decrypt response. client = operation.client # type: ignore[assignment] if client and client._encrypter: - if use_cmd: - decrypted = await client._encrypter.decrypt(reply.raw_command_response()) - docs = _decode_all_selective(decrypted, operation.codec_options, user_fields) + decrypted = await client._encrypter.decrypt(reply.raw_command_response()) + docs = _decode_all_selective(decrypted, operation.codec_options, _CURSOR_DOC_FIELDS) response: Response if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) + more_to_come = reply.more_to_come if operation.conn_mgr: operation.conn_mgr.update_exhaust(more_to_come) response = PinnedResponse( @@ -331,7 +307,7 @@ async def run_operation( conn=conn, duration=duration, request_id=request_id, - from_command=use_cmd, + from_command=True, docs=docs, more_to_come=more_to_come, ) @@ -341,7 +317,7 @@ async def run_operation( address=self._description.address, duration=duration, request_id=request_id, - from_command=use_cmd, + from_command=True, docs=docs, ) diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 34f60c6540..b2023ad5de 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -189,15 +189,10 @@ def _send_message(self, operation: _GetMore) -> None: if isinstance(response, PinnedResponse): if not self._sock_mgr: self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type] - if response.from_command: - cursor = response.docs[0]["cursor"] - documents = cursor["nextBatch"] - self._postbatchresumetoken = cursor.get("postBatchResumeToken") - self._id = cursor["id"] - else: - documents = response.docs - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + cursor = response.docs[0]["cursor"] + documents = cursor["nextBatch"] + self._postbatchresumetoken = cursor.get("postBatchResumeToken") + self._id = cursor["id"] if self._id == 0: self.close() diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 5a721d8e06..909671dac5 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -1018,29 +1018,23 @@ def _send_message(self, operation: Union[_Query, _GetMore]) -> None: cmd_name = operation.name docs = response.docs - if response.from_command: - if cmd_name != "explain": - cursor = docs[0]["cursor"] - self._id = cursor["id"] - if cmd_name == "find": - documents = cursor["firstBatch"] - # Update the namespace used for future getMore commands. - ns = cursor.get("ns") - if ns: - self._dbname, self._collname = ns.split(".", 1) - else: - documents = cursor["nextBatch"] - self._data = deque(documents) - self._retrieved += len(documents) + if cmd_name != "explain": + cursor = docs[0]["cursor"] + self._id = cursor["id"] + if cmd_name == "find": + documents = cursor["firstBatch"] + # Update the namespace used for future getMore commands. + ns = cursor.get("ns") + if ns: + self._dbname, self._collname = ns.split(".", 1) else: - self._id = 0 - self._data = deque(docs) - self._retrieved += len(docs) + documents = cursor["nextBatch"] + self._data = deque(documents) + self._retrieved += len(documents) else: - assert isinstance(response.data, _OpReply) - self._id = response.data.cursor_id + self._id = 0 self._data = deque(docs) - self._retrieved += response.data.number_returned + self._retrieved += len(docs) if self._id == 0: # Don't wait for garbage collection to call __del__, return the diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index f57420918b..6073117f5a 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -36,7 +36,7 @@ _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query +from pymongo.message import _convert_exception, _GetMore, _Query from pymongo.response import PinnedResponse, Response from pymongo.synchronous.helpers import _handle_reauth @@ -161,13 +161,13 @@ def run_operation( publish = listeners.enabled_for_commands start = datetime.now() - use_cmd = operation.use_command(conn) + operation.use_command(conn) more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come - cmd, dbn = self.operation_to_command(operation, conn, use_cmd) + cmd, dbn = self.operation_to_command(operation, conn, True) if more_to_come: request_id = 0 else: - message = operation.get_message(read_preference, conn, use_cmd) + message = operation.get_message(read_preference, conn, True) request_id, data, max_doc_size = self._split_message(message) if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): @@ -208,23 +208,16 @@ def run_operation( reply = conn.receive_message(request_id) # Unpack and check for command errors. - if use_cmd: - user_fields = _CURSOR_DOC_FIELDS - legacy_response = False - else: - user_fields = None - legacy_response = True docs = unpack_res( reply, operation.cursor_id, operation.codec_options, - legacy_response=legacy_response, - user_fields=user_fields, + legacy_response=False, + user_fields=_CURSOR_DOC_FIELDS, ) - if use_cmd: - first = docs[0] - operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] + first = docs[0] + operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] + _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] except Exception as exc: duration = datetime.now() - start if isinstance(exc, (NotPrimaryError, OperationFailure)): @@ -263,18 +256,8 @@ def run_operation( ) raise duration = datetime.now() - start - # Must publish in find / getMore / explain command response - # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs + # Must publish in find / getMore / explain command response format. + res = docs[0] if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( _COMMAND_LOGGER, @@ -308,21 +291,14 @@ def run_operation( # Decrypt response. client = operation.client # type: ignore[assignment] if client and client._encrypter: - if use_cmd: - decrypted = client._encrypter.decrypt(reply.raw_command_response()) - docs = _decode_all_selective(decrypted, operation.codec_options, user_fields) + decrypted = client._encrypter.decrypt(reply.raw_command_response()) + docs = _decode_all_selective(decrypted, operation.codec_options, _CURSOR_DOC_FIELDS) response: Response if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) + more_to_come = reply.more_to_come if operation.conn_mgr: operation.conn_mgr.update_exhaust(more_to_come) response = PinnedResponse( @@ -331,7 +307,7 @@ def run_operation( conn=conn, duration=duration, request_id=request_id, - from_command=use_cmd, + from_command=True, docs=docs, more_to_come=more_to_come, ) @@ -341,7 +317,7 @@ def run_operation( address=self._description.address, duration=duration, request_id=request_id, - from_command=use_cmd, + from_command=True, docs=docs, ) From 7e8c01f8746b3f1d0c5a0d2c8703f93ab62f3467 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 1 May 2026 19:22:31 -0400 Subject: [PATCH 2/4] PYTHON-5676 Phase 2: Extract shared APM/logging helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eliminate the ~13-line _COMMAND_LOGGER/_debug_log boilerplate that was copy-pasted identically across five command execution paths. Add three helpers to pymongo/command_helpers.py and call them from all five sites. - New pymongo/command_helpers.py: _log_command_started, _log_command_succeeded, _log_command_failed — pure functions, no I/O, no async/sync split needed - network.py, server.py, bulk.py, client_bulk.py: replace inline _debug_log blocks with helper calls; remove _COMMAND_LOGGER/_CommandStatusMessage imports where no longer needed --- pymongo/asynchronous/bulk.py | 139 +++++++--------------------- pymongo/asynchronous/client_bulk.py | 139 +++++++--------------------- pymongo/asynchronous/network.py | 82 ++++++---------- pymongo/asynchronous/server.py | 71 ++++---------- pymongo/command_helpers.py | 110 ++++++++++++++++++++++ pymongo/synchronous/bulk.py | 139 +++++++--------------------- pymongo/synchronous/client_bulk.py | 139 +++++++--------------------- pymongo/synchronous/network.py | 82 ++++++---------- pymongo/synchronous/server.py | 71 ++++---------- 9 files changed, 342 insertions(+), 630 deletions(-) create mode 100644 pymongo/command_helpers.py diff --git a/pymongo/asynchronous/bulk.py b/pymongo/asynchronous/bulk.py index 4a54f9eb3f..f429daf8b0 100644 --- a/pymongo/asynchronous/bulk.py +++ b/pymongo/asynchronous/bulk.py @@ -20,7 +20,6 @@ import copy import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -45,6 +44,11 @@ _raise_bulk_write_error, _Run, ) +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.common import ( validate_is_document_type, validate_ok_for_replace, @@ -57,7 +61,6 @@ OperationFailure, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _DELETE, _INSERT, @@ -252,44 +255,15 @@ async def write_command( ) -> dict[str, Any]: """A proxy for SocketInfo.write_command that handles event publishing.""" cmd[bwc.field] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: bwc._start(cmd, request_id, docs) try: reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] await client._process_response(reply, bwc.session) # type: ignore[arg-type] @@ -299,24 +273,17 @@ async def write_command( failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: bwc._fail(request_id, failure, duration) @@ -337,22 +304,7 @@ async def unack_write( client: AsyncMongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for AsyncConnection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: cmd = bwc._start(cmd, request_id, docs) try: @@ -363,23 +315,9 @@ async def unack_write( else: # Comply with APM spec. reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) except Exception as exc: @@ -390,24 +328,17 @@ async def unack_write( failure = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: assert bwc.start_time is not None bwc._fail(request_id, failure, duration) diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 015947d7ef..c29bb64c66 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -20,7 +20,6 @@ import copy import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -48,6 +47,11 @@ _merge_command, _throw_client_bulk_write_exception, ) +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.common import ( validate_is_document_type, validate_ok_for_replace, @@ -63,7 +67,6 @@ WaitQueueTimeoutError, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _ClientBulkWriteContext, _convert_client_bulk_exception, @@ -239,44 +242,15 @@ async def write_command( """A proxy for AsyncConnection.write_command that handles event publishing.""" cmd["ops"] = op_docs cmd["nsInfo"] = ns_docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: bwc._start(cmd, request_id, op_docs, ns_docs) try: reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] # Process the response from the server. @@ -287,24 +261,17 @@ async def write_command( failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: bwc._fail(request_id, failure, duration) @@ -328,22 +295,7 @@ async def unack_write( client: AsyncMongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for AsyncConnection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: cmd = bwc._start(cmd, request_id, op_docs, ns_docs) try: @@ -354,23 +306,9 @@ async def unack_write( else: # Comply with APM spec. reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) except Exception as exc: @@ -381,24 +319,17 @@ async def unack_write( failure = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: assert bwc.start_time is not None bwc._fail(request_id, failure, duration) diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index 5a5dc7fa2c..e1d2e3e6ad 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -16,7 +16,6 @@ from __future__ import annotations import datetime -import logging from typing import ( TYPE_CHECKING, Any, @@ -30,12 +29,16 @@ from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.compression_support import _NO_COMPRESSION from pymongo.errors import ( NotPrimaryError, OperationFailure, ) -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate from pymongo.network_layer import ( @@ -160,22 +163,7 @@ async def command( if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=spec, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_started(client, conn, spec, dbname, request_id, request_id) if publish: assert listeners is not None assert address is not None @@ -222,24 +210,17 @@ async def command( else: failure = message._convert_exception(exc) if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + conn, + spec, + dbname, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if publish: assert listeners is not None assert address is not None @@ -256,24 +237,17 @@ async def command( raise duration = datetime.datetime.now() - start if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=response_doc, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) + _log_command_succeeded( + client, + conn, + spec, + dbname, + request_id, + request_id, + response_doc, + duration, + "speculativeAuthenticate" in orig, + ) if publish: assert listeners is not None assert address is not None diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index aaf7ef82b9..3e37dae249 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -28,12 +28,15 @@ from bson import _decode_all_selective from pymongo.asynchronous.helpers import _handle_reauth +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( - _COMMAND_LOGGER, _SDAM_LOGGER, - _CommandStatusMessage, _debug_log, _SDAMStatusMessage, ) @@ -170,22 +173,7 @@ async def run_operation( message = operation.get_message(read_preference, conn, True) request_id, data, max_doc_size = self._split_message(message) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_started(client, conn, cmd, dbn, request_id, request_id) if publish: if "$db" not in cmd: @@ -224,24 +212,17 @@ async def run_operation( failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + conn, + cmd, + dbn, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if publish: assert listeners is not None listeners.publish_command_failure( @@ -258,23 +239,7 @@ async def run_operation( duration = datetime.now() - start # Must publish in find / getMore / explain command response format. res = docs[0] - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=res, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_succeeded(client, conn, cmd, dbn, request_id, request_id, res, duration) if publish: assert listeners is not None listeners.publish_command_success( diff --git a/pymongo/command_helpers.py b/pymongo/command_helpers.py new file mode 100644 index 0000000000..b09bae7975 --- /dev/null +++ b/pymongo/command_helpers.py @@ -0,0 +1,110 @@ +# Copyright 2025-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared helpers for command monitoring and logging.""" +from __future__ import annotations + +import datetime +import logging +from typing import Any, Mapping + +from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log + + +def _log_command_started( + client: Any, + conn: Any, + cmd: Mapping[str, Any], + dbname: str, + request_id: int, + operation_id: int, +) -> None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.STARTED, + clientId=client._topology_settings._topology_id, + command=cmd, + commandName=next(iter(cmd)), + databaseName=dbname, + requestId=request_id, + operationId=operation_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=conn.address[0], + serverPort=conn.address[1], + serviceId=conn.service_id, + ) + + +def _log_command_succeeded( + client: Any, + conn: Any, + cmd: Mapping[str, Any], + dbname: str, + request_id: int, + operation_id: int, + reply: Any, + duration: datetime.timedelta, + speculative_authenticate: bool = False, +) -> None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.SUCCEEDED, + clientId=client._topology_settings._topology_id, + durationMS=duration, + reply=reply, + commandName=next(iter(cmd)), + databaseName=dbname, + requestId=request_id, + operationId=operation_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=conn.address[0], + serverPort=conn.address[1], + serviceId=conn.service_id, + speculative_authenticate=speculative_authenticate, + ) + + +def _log_command_failed( + client: Any, + conn: Any, + cmd: Mapping[str, Any], + dbname: str, + request_id: int, + operation_id: int, + failure: Any, + duration: datetime.timedelta, + is_server_side_error: bool, +) -> None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.FAILED, + clientId=client._topology_settings._topology_id, + durationMS=duration, + failure=failure, + commandName=next(iter(cmd)), + databaseName=dbname, + requestId=request_id, + operationId=operation_id, + driverConnectionId=conn.id, + serverConnectionId=conn.server_connection_id, + serverHost=conn.address[0], + serverPort=conn.address[1], + serviceId=conn.service_id, + isServerSideError=is_server_side_error, + ) diff --git a/pymongo/synchronous/bulk.py b/pymongo/synchronous/bulk.py index 22d6a7a76a..73929e0642 100644 --- a/pymongo/synchronous/bulk.py +++ b/pymongo/synchronous/bulk.py @@ -20,7 +20,6 @@ import copy import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -43,6 +42,11 @@ _raise_bulk_write_error, _Run, ) +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.common import ( validate_is_document_type, validate_ok_for_replace, @@ -55,7 +59,6 @@ OperationFailure, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _DELETE, _INSERT, @@ -252,44 +255,15 @@ def write_command( ) -> dict[str, Any]: """A proxy for SocketInfo.write_command that handles event publishing.""" cmd[bwc.field] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: bwc._start(cmd, request_id, docs) try: reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] client._process_response(reply, bwc.session) # type: ignore[arg-type] @@ -299,24 +273,17 @@ def write_command( failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: bwc._fail(request_id, failure, duration) @@ -337,22 +304,7 @@ def unack_write( client: MongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for Connection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: cmd = bwc._start(cmd, request_id, docs) try: @@ -363,23 +315,9 @@ def unack_write( else: # Comply with APM spec. reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) except Exception as exc: @@ -390,24 +328,17 @@ def unack_write( failure = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: assert bwc.start_time is not None bwc._fail(request_id, failure, duration) diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index 1134594ae9..285701e8fa 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -20,7 +20,6 @@ import copy import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -48,6 +47,11 @@ _merge_command, _throw_client_bulk_write_exception, ) +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.common import ( validate_is_document_type, validate_ok_for_replace, @@ -63,7 +67,6 @@ WaitQueueTimeoutError, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _ClientBulkWriteContext, _convert_client_bulk_exception, @@ -239,44 +242,15 @@ def write_command( """A proxy for Connection.write_command that handles event publishing.""" cmd["ops"] = op_docs cmd["nsInfo"] = ns_docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: bwc._start(cmd, request_id, op_docs, ns_docs) try: reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] # Process the response from the server. @@ -287,24 +261,17 @@ def write_command( failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: bwc._fail(request_id, failure, duration) @@ -328,22 +295,7 @@ def unack_write( client: MongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for Connection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_started(client, bwc.conn, cmd, bwc.db_name, request_id, request_id) if bwc.publish: cmd = bwc._start(cmd, request_id, op_docs, ns_docs) try: @@ -354,23 +306,9 @@ def unack_write( else: # Comply with APM spec. reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) + _log_command_succeeded( + client, bwc.conn, cmd, bwc.db_name, request_id, request_id, reply, duration + ) if bwc.publish: bwc._succeed(request_id, reply, duration) except Exception as exc: @@ -381,24 +319,17 @@ def unack_write( failure = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + bwc.conn, + cmd, + bwc.db_name, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if bwc.publish: assert bwc.start_time is not None bwc._fail(request_id, failure, duration) diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 7d9bca4d58..3ab3664e72 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -16,7 +16,6 @@ from __future__ import annotations import datetime -import logging from typing import ( TYPE_CHECKING, Any, @@ -30,12 +29,16 @@ from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.compression_support import _NO_COMPRESSION from pymongo.errors import ( NotPrimaryError, OperationFailure, ) -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate from pymongo.network_layer import ( @@ -160,22 +163,7 @@ def command( if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=spec, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_started(client, conn, spec, dbname, request_id, request_id) if publish: assert listeners is not None assert address is not None @@ -222,24 +210,17 @@ def command( else: failure = message._convert_exception(exc) if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + conn, + spec, + dbname, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if publish: assert listeners is not None assert address is not None @@ -256,24 +237,17 @@ def command( raise duration = datetime.datetime.now() - start if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=response_doc, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) + _log_command_succeeded( + client, + conn, + spec, + dbname, + request_id, + request_id, + response_doc, + duration, + "speculativeAuthenticate" in orig, + ) if publish: assert listeners is not None assert address is not None diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 6073117f5a..9d033df670 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -27,12 +27,15 @@ ) from bson import _decode_all_selective +from pymongo.command_helpers import ( + _log_command_failed, + _log_command_started, + _log_command_succeeded, +) from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( - _COMMAND_LOGGER, _SDAM_LOGGER, - _CommandStatusMessage, _debug_log, _SDAMStatusMessage, ) @@ -170,22 +173,7 @@ def run_operation( message = operation.get_message(read_preference, conn, True) request_id, data, max_doc_size = self._split_message(message) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_started(client, conn, cmd, dbn, request_id, request_id) if publish: if "$db" not in cmd: @@ -224,24 +212,17 @@ def run_operation( failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) + _log_command_failed( + client, + conn, + cmd, + dbn, + request_id, + request_id, + failure, + duration, + isinstance(exc, OperationFailure), + ) if publish: assert listeners is not None listeners.publish_command_failure( @@ -258,23 +239,7 @@ def run_operation( duration = datetime.now() - start # Must publish in find / getMore / explain command response format. res = docs[0] - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=res, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) + _log_command_succeeded(client, conn, cmd, dbn, request_id, request_id, res, duration) if publish: assert listeners is not None listeners.publish_command_success( From 75298bcd4327d7f555d25c7ca67a3a66ae8561a0 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Fri, 1 May 2026 20:11:22 -0400 Subject: [PATCH 3/4] PYTHON-5676 Phase 3: Unify run_operation() with network.command() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract _network_command_core() from network.command() to handle the common send/receive/APM/decrypt pipeline. Both the standard command path and the cursor (find/getMore) path now use the same core function. - New _network_command_core() in network.py: takes a pre-built message and handles APM, send_message, receive_message, unpack, process, check, decrypt — returns (docs, reply, duration) - command() is now a thin wrapper: encode → _network_command_core → return docs[0] - run_operation() is now a thin wrapper: build message → _network_command_core → wrap in Response/PinnedResponse - _CURSOR_DOC_FIELDS moved from server.py to network.py (only used in core) - server.py drops _decode_all_selective, _check_command_response, _convert_exception, NotPrimaryError, OperationFailure imports --- PYTHON-5676-scope.md | 183 -------------------- pymongo/asynchronous/network.py | 293 ++++++++++++++++++++------------ pymongo/asynchronous/server.py | 131 ++++---------- pymongo/synchronous/network.py | 293 ++++++++++++++++++++------------ pymongo/synchronous/server.py | 131 ++++---------- 5 files changed, 424 insertions(+), 607 deletions(-) delete mode 100644 PYTHON-5676-scope.md diff --git a/PYTHON-5676-scope.md b/PYTHON-5676-scope.md deleted file mode 100644 index 56a7fc0da2..0000000000 --- a/PYTHON-5676-scope.md +++ /dev/null @@ -1,183 +0,0 @@ -# PYTHON-5676: Consolidate Command Execution Logic — Scope Document - -## Context - -PyMongo has accumulated several distinct code paths for executing commands against the server. When cross-cutting logic (e.g., backpressure, CSOT, monitoring) needs to be added or changed, engineers must find and update multiple paths. The goal is a single central path for all database operations, making future changes local to one place. - -PYTHON-1357 (Refactor Cursor and CommandCursor) is now Closed — the prerequisite work is done. - ---- - -## Current Command Execution Paths - -All paths are duplicated across `pymongo/asynchronous/` (source of truth) and `pymongo/synchronous/` (auto-generated via `tools/synchro.sh`). - -### Path 1 — Standard commands (most operations) -``` -Collection._command() / Database.command() / conn.command() (direct) - → Connection.command() [pool.py:344] — session, write concern, server API, reauth - → network.command() [network.py:61] — encode OP_MSG, APM, logging, CSOT, encryption, send/recv -``` -Used by: insert, update, delete, aggregate, distinct, find_one, createIndex, dropCollection, db.command(), etc. - -### Path 2 — Cursor operations (find / getMore) -``` -Cursor._send_message() - → MongoClient._run_operation() [mongo_client.py:1911] - → Server.run_operation() [server.py:138] — send/recv + FULL APM/logging (≈80 lines, near-identical to network.command()) -``` -**Bypasses `network.command()` entirely.** Has its own APM/monitoring and logging code. - -### Path 3 — Acknowledged bulk writes (collection-level) -``` -_Bulk._execute_batch() - → _Bulk.write_command() [bulk.py:244] — APM/logging wrapper (≈80 lines) - → Connection.write_command() [pool.py:480] — pre-encoded bytes, raw send+recv, no APM -``` -**Bypasses both `Connection.command()` and `network.command()`.** Note: for *encrypted* bulk writes, this path already uses `Connection.command()` — the non-encrypted path is the outlier. - -### Path 4 — Unacknowledged bulk writes -``` -_Bulk._execute_batch_unack() - → _Bulk.unack_write() [bulk.py:329] — APM/logging wrapper (≈70 lines) - → Connection.unack_write() [pool.py:469] — raw send only, no recv -``` -**Bypasses `Connection.command()` and `network.command()`.** - -### Path 5 — Client-level bulk writes (cross-collection) -`client_bulk.py` mirrors Paths 3 and 4 with its own `write_command()` and `unack_write()` wrappers — another full copy of the APM/logging boilerplate. - ---- - -## Key Findings - -**1. APM/logging code is copy-pasted 5+ times.** -`Server.run_operation()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, `_ClientBulk.unack_write()` all contain the same ~70-80 line block of `_COMMAND_LOGGER.isEnabledFor(DEBUG)` + `_debug_log(...)` + `listeners.publish_command_start/success/failure(...)`. The reference implementation is `network.command()`. - -**2. Dead legacy OP_QUERY code in `Server.run_operation()`.** -`MIN_SUPPORTED_WIRE_VERSION = 8` (MongoDB 4.2+, `common.py`). OP_MSG was introduced at wire version 6. The `use_cmd = False` branch (legacy OP_QUERY path) in `run_operation()` can never be reached for any currently supported server. The `_Query.use_command()` check at `message.py:1622` confirms: for wire version ≥ 8, it is unconditionally True. - -**3. `_op_msg()` already handles bulk write Type 1 sections.** -`message._op_msg()` (line 394) detects insert/update/delete commands, pops the documents field, and encodes them as a Type 1 section. The non-encrypted bulk write path bypasses this and does its own single-pass encode+batch-size-check via `_do_batched_op_msg`. Unifying would require separating batch determination from encoding (currently combined for performance). - -**4. Encrypted bulk writes already use `Connection.command()`.** -In `_Bulk._execute_batch()` (bulk.py:444), the `if self.is_encrypted:` branch calls `bwc.conn.command()`. The non-encrypted `else` branch goes through `Connection.write_command()`. These should be the same path. - -**5. Async is generated from async source.** -All `pymongo/synchronous/*.py` files are auto-generated from `pymongo/asynchronous/*.py` via `tools/synchro.sh`. All code changes must be made in the `asynchronous/` files only. - ---- - -## Proposed Consolidation: Phased Approach - -### Phase 1 — Remove dead OP_QUERY code from `run_operation()` *(Low risk)* - -**What:** Delete the `use_cmd = False` branches from `Server.run_operation()`. Remove the conditional `if use_cmd: ... else: ...` blocks for `user_fields`/`legacy_response` and response-building. - -**Files:** `pymongo/asynchronous/server.py`, `pymongo/asynchronous/cursor.py` (remove the dead `else` branches referencing `_OpReply`) - -**Why now:** This is purely dead code removal — no behavior change. It simplifies Phase 3 significantly and is independently safe. - -**Verification:** Full test suite; specifically `test/test_cursor.py` and `test/test_unified_format.py`. - ---- - -### Phase 2 — Extract shared APM/logging helpers *(Low risk)* - -**What:** Extract the duplicated APM/logging block into shared helper functions in `pymongo/helpers_shared.py` (or a new `pymongo/command_helpers.py`). All five duplicate sites call the helpers instead of inlining the code. - -```python -# No sync/async split needed — these functions contain no I/O -def _log_and_publish_command_started(client, listeners, cmd, dbname, request_id, conn): - ... - - -def _log_and_publish_command_succeeded( - client, listeners, cmd, dbname, request_id, conn, reply, duration -): - ... - - -def _log_and_publish_command_failed( - client, listeners, cmd, dbname, request_id, conn, failure, duration -): - ... -``` - -**Files:** `pymongo/helpers_shared.py` (or new `pymongo/command_helpers.py`), `pymongo/asynchronous/network.py`, `pymongo/asynchronous/server.py`, `pymongo/asynchronous/bulk.py`, `pymongo/asynchronous/client_bulk.py` - -**Why this matters:** This is the direct solution to the stated problem. When future cross-cutting logic needs to be added to "command execution", there is one place to add it. - -**Note on APM operationId:** `_BulkWriteContext` passes `op_id` (not `request_id`) as the `operationId` in APM events. This is spec-required behavior that must be preserved in the helpers. - -**Verification:** `test/test_command_monitoring.py`, `test/test_monitoring.py`, `test/test_unified_format.py`. - ---- - -### Phase 3 — Unify `Server.run_operation()` with `network.command()` *(Medium risk)* - -**What:** Refactor `network.command()` into a two-layer structure: - -- `_network_command_core()` — performs all work (encode, APM, send, recv) and returns `(response_doc, raw_reply, request_id, duration)` -- `command()` — thin wrapper returning just `response_doc` (existing callers unchanged) - -`Server.run_operation()` (after Phase 1+2) calls `_network_command_core()` for the actual transport, then wraps the result in `Response`/`PinnedResponse`. - -**Key complexity:** `RawBatchCursor._unpack_response()` (`cursor.py:1196`) calls `response.raw_response(cursor_id)` on the raw `_OpMsg` object — it needs the raw reply, not just the decoded dict. The `_network_command_core()` design satisfies this by exposing `raw_reply`. The `@_handle_reauth` decorator on `run_operation()` must also be preserved. - -**Files:** `pymongo/asynchronous/network.py`, `pymongo/asynchronous/server.py` - -**Result:** Cursor operations (find/getMore) fully share the command execution path with all other operations. Single place to add transport-level logic. - -**Verification:** `test/test_cursor.py`, exhaust cursor tests, `test/test_encryption.py`, `test/test_csot.py`. - ---- - -### Phase 4 — Unify non-encrypted bulk write path with `Connection.command()` *(Higher risk, defer)* - -**What:** Change `_Bulk._execute_batch()` to use `Connection.command()` for non-encrypted bulk writes, matching the already-unified encrypted path. Requires `_BulkWriteContext.batch_command()` to return a command dict instead of pre-encoded bytes. - -**The challenge:** `_do_batched_op_msg` performs batch-size determination and encoding in a single pass (including a C extension: `_cmessage._batched_op_msg`). Separating batch determination from encoding requires a two-pass approach, adding encoding overhead. **Needs benchmarking before committing.** - -`_EncryptedBulkWriteContext.batch_command()` encodes to bytes and then deserializes back via `_inflate_bson` — aligning the non-encrypted path with this approach may be the right model. - -**Result after Phase 4:** `Connection.write_command()`, `Connection.unack_write()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, and `_ClientBulk.unack_write()` have no callers and can be removed. - -**Gate:** Bulk write benchmarks before/after. Suggested regression threshold: ≤2% throughput degradation. - ---- - -## Definition of Done - -- All database operations route through `Connection.command()` → `network.command()` for actual transport -- APM/logging code exists in one location -- `Connection.write_command()`, `Connection.unack_write()`, `_Bulk.write_command()`, `_Bulk.unack_write()`, `_ClientBulk.write_command()`, `_ClientBulk.unack_write()` are removed (or reduced to thin wrappers) -- No performance regression (bulk write benchmarks) -- No behavioral regression (full spec test suite passes) - ---- - -## Risk Summary - -| Phase | Risk | Effort | Value | -|-------|------|--------|-------| -| 1 — Remove dead OP_QUERY code | Low | ~1 day | Medium (simplification) | -| 2 — Extract APM helpers | Low | ~2 days | **High** (solves the stated problem) | -| 3 — Unify run_operation with network.command | Medium | ~3 days | High (true path consolidation) | -| 4 — Unify bulk write bytes path | Higher | ~1 week + benchmarking | Medium (removes last bypass) | - -Phases 1 and 2 can be done together in one PR. Phase 3 should be a separate PR. Phase 4 should be gated on benchmarking. - ---- - -## Critical Files - -| File | Role | -|------|------| -| `pymongo/asynchronous/network.py:61` | Central `command()` — reference implementation | -| `pymongo/asynchronous/server.py:138` | `run_operation()` — cursor path bypass | -| `pymongo/asynchronous/pool.py:344,469,480` | `Connection.command`, `unack_write`, `write_command` | -| `pymongo/asynchronous/bulk.py:244,329,444` | `_Bulk.write_command`, `unack_write`, `_execute_batch` | -| `pymongo/asynchronous/client_bulk.py:229,320` | `_ClientBulk.write_command`, `unack_write` | -| `pymongo/message.py:394,695,1622` | `_op_msg`, `_BulkWriteContext.batch_command`, `_Query.use_command` | -| `pymongo/common.py` | `MIN_SUPPORTED_WIRE_VERSION = 8` | diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index e1d2e3e6ad..a7696af442 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -19,6 +19,7 @@ from typing import ( TYPE_CHECKING, Any, + Callable, Mapping, MutableMapping, Optional, @@ -41,10 +42,6 @@ ) from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate -from pymongo.network_layer import ( - async_receive_message, - async_sendall, -) if TYPE_CHECKING: from bson import CodecOptions @@ -60,115 +57,51 @@ _IS_SYNC = False +_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} -async def command( + +async def _network_command_core( conn: AsyncConnection, dbname: str, spec: MutableMapping[str, Any], - is_mongos: bool, - read_preference: Optional[_ServerMode], + request_id: int, + msg: Optional[bytes], + max_doc_size: int, codec_options: CodecOptions[_DocumentType], session: Optional[AsyncClientSession], client: Optional[AsyncMongoClient[Any]], + listeners: Optional[_EventListeners], + address: Optional[_Address], + start: datetime.datetime, check: bool = True, allowable_errors: Optional[Sequence[Union[str, int]]] = None, - address: Optional[_Address] = None, - listeners: Optional[_EventListeners] = None, - max_bson_size: Optional[int] = None, - read_concern: Optional[ReadConcern] = None, parse_write_concern_error: bool = False, - collation: Optional[_CollationIn] = None, - compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, - use_op_msg: bool = False, - unacknowledged: bool = False, user_fields: Optional[Mapping[str, Any]] = None, - exhaust_allowed: bool = False, - write_concern: Optional[WriteConcern] = None, -) -> _DocumentType: - """Execute a command over the socket, or raise socket.error. + unacknowledged: bool = False, + more_to_come: bool = False, + unpack_res: Optional[Callable[..., list[_DocumentOut]]] = None, + cursor_id: Optional[int] = None, + orig: Optional[MutableMapping[str, Any]] = None, + speculative_hello: bool = False, +) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]: + """Send/receive a command and return (docs, raw_reply, duration). - :param conn: a AsyncConnection instance - :param dbname: name of the database on which to run the command - :param spec: a command document as an ordered dict type, eg SON. - :param is_mongos: are we connected to a mongos? - :param read_preference: a read preference - :param codec_options: a CodecOptions instance - :param session: optional AsyncClientSession instance. - :param client: optional AsyncMongoClient instance for updating $clusterTime. - :param check: raise OperationFailure if there are errors - :param allowable_errors: errors to ignore if `check` is True - :param address: the (host, port) of `conn` - :param listeners: An instance of :class:`~pymongo.monitoring.EventListeners` - :param max_bson_size: The maximum encoded bson size for this server - :param read_concern: The read concern for this command. - :param parse_write_concern_error: Whether to parse the ``writeConcernError`` - field in the command response. - :param collation: The collation for this command. - :param compression_ctx: optional compression Context. - :param use_op_msg: True if we should use OP_MSG. - :param unacknowledged: True if this is an unacknowledged command. - :param user_fields: Response fields that should be decoded - using the TypeDecoders from codec_options, passed to - bson._decode_all_selective. - :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed. + Handles APM logging, send/receive, unpacking, response processing, + and decryption. Both the standard command path and the cursor + (find/getMore) path go through this function. """ - name = next(iter(spec)) - ns = dbname + ".$cmd" - speculative_hello = False - - # Publish the original command document, perhaps with lsid and $clusterTime. - orig = spec - if is_mongos and not use_op_msg: - assert read_preference is not None - spec = message._maybe_add_read_preference(spec, read_preference) - if read_concern and not (session and session.in_transaction): - if read_concern.level: - spec["readConcern"] = read_concern.document - if session: - session._update_read_concern(spec, conn) - if collation is not None: - spec["collation"] = collation - publish = listeners is not None and listeners.enabled_for_commands - start = datetime.datetime.now() - if publish: - speculative_hello = _is_speculative_authenticate(name, spec) - - if compression_ctx and name.lower() in _NO_COMPRESSION: - compression_ctx = None - - if client and client._encrypter and not client._encrypter._bypass_auto_encryption: - spec = orig = await client._encrypter.encrypt(dbname, spec, codec_options) - - # Support CSOT - if client: - conn.apply_timeout(client, spec) - _csot.apply_write_concern(spec, write_concern) - - if use_op_msg: - flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 - flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 - request_id, msg, size, max_doc_size = message._op_msg( - flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx - ) - # If this is an unacknowledged write then make sure the encoded doc(s) - # are small enough, otherwise rely on the server to return an error. - if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: - message._raise_document_too_large(name, size, max_bson_size) - else: - request_id, msg, size = message._query( - 0, ns, 0, -1, spec, None, codec_options, compression_ctx - ) + name = next(iter(spec)) + reply: Optional[_OpMsg] = None + docs: list[_DocumentOut] = [] - if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: - message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) if client is not None: _log_command_started(client, conn, spec, dbname, request_id, request_id) if publish: assert listeners is not None assert address is not None listeners.publish_command_start( - orig, + orig if orig is not None else spec, dbname, request_id, address, @@ -177,19 +110,32 @@ async def command( ) try: - await async_sendall(conn.conn.get_conn, msg) - if use_op_msg and unacknowledged: - # Unacknowledged, fake a successful command response. - reply = None - response_doc: _DocumentOut = {"ok": 1} + if more_to_come: + reply = await conn.receive_message(None) else: - reply = await async_receive_message(conn, request_id) - conn.more_to_come = reply.more_to_come - unpacked_docs = reply.unpack_response( - codec_options=codec_options, user_fields=user_fields - ) + assert msg is not None + await conn.send_message(msg, max_doc_size) + if unacknowledged: + # Unacknowledged write: fake a successful command response. + docs = [{"ok": 1}] # type: ignore[list-item] + else: + reply = await conn.receive_message(request_id) - response_doc = unpacked_docs[0] + if reply is not None: + conn.more_to_come = reply.more_to_come + if unpack_res is not None: + docs = unpack_res( + reply, + cursor_id, + codec_options, + legacy_response=False, + user_fields=_CURSOR_DOC_FIELDS, + ) + else: + docs = list( + reply.unpack_response(codec_options=codec_options, user_fields=user_fields) + ) + response_doc = docs[0] if not conn.ready: cluster_time = response_doc.get("$clusterTime") if cluster_time: @@ -244,16 +190,16 @@ async def command( dbname, request_id, request_id, - response_doc, + docs[0], duration, - "speculativeAuthenticate" in orig, + speculative_hello, ) if publish: assert listeners is not None assert address is not None listeners.publish_command_success( duration, - response_doc, + docs[0], name, request_id, address, @@ -263,10 +209,137 @@ async def command( database_name=dbname, ) - if client and client._encrypter and reply: + # Decrypt response. + if client and client._encrypter and reply is not None: decrypted = await client._encrypter.decrypt(reply.raw_command_response()) - response_doc = cast( - "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] + decrypt_fields = _CURSOR_DOC_FIELDS if unpack_res is not None else user_fields + docs = list(_decode_all_selective(decrypted, codec_options, decrypt_fields)) + + return docs, reply, duration + + +async def command( + conn: AsyncConnection, + dbname: str, + spec: MutableMapping[str, Any], + is_mongos: bool, + read_preference: Optional[_ServerMode], + codec_options: CodecOptions[_DocumentType], + session: Optional[AsyncClientSession], + client: Optional[AsyncMongoClient[Any]], + check: bool = True, + allowable_errors: Optional[Sequence[Union[str, int]]] = None, + address: Optional[_Address] = None, + listeners: Optional[_EventListeners] = None, + max_bson_size: Optional[int] = None, + read_concern: Optional[ReadConcern] = None, + parse_write_concern_error: bool = False, + collation: Optional[_CollationIn] = None, + compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, + use_op_msg: bool = False, + unacknowledged: bool = False, + user_fields: Optional[Mapping[str, Any]] = None, + exhaust_allowed: bool = False, + write_concern: Optional[WriteConcern] = None, +) -> _DocumentType: + """Execute a command over the socket, or raise socket.error. + + :param conn: a AsyncConnection instance + :param dbname: name of the database on which to run the command + :param spec: a command document as an ordered dict type, eg SON. + :param is_mongos: are we connected to a mongos? + :param read_preference: a read preference + :param codec_options: a CodecOptions instance + :param session: optional AsyncClientSession instance. + :param client: optional AsyncMongoClient instance for updating $clusterTime. + :param check: raise OperationFailure if there are errors + :param allowable_errors: errors to ignore if `check` is True + :param address: the (host, port) of `conn` + :param listeners: An instance of :class:`~pymongo.monitoring.EventListeners` + :param max_bson_size: The maximum encoded bson size for this server + :param read_concern: The read concern for this command. + :param parse_write_concern_error: Whether to parse the ``writeConcernError`` + field in the command response. + :param collation: The collation for this command. + :param compression_ctx: optional compression Context. + :param use_op_msg: True if we should use OP_MSG. + :param unacknowledged: True if this is an unacknowledged command. + :param user_fields: Response fields that should be decoded + using the TypeDecoders from codec_options, passed to + bson._decode_all_selective. + :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed. + """ + name = next(iter(spec)) + ns = dbname + ".$cmd" + speculative_hello = False + + # Publish the original command document, perhaps with lsid and $clusterTime. + orig = spec + if is_mongos and not use_op_msg: + assert read_preference is not None + spec = message._maybe_add_read_preference(spec, read_preference) + if read_concern and not (session and session.in_transaction): + if read_concern.level: + spec["readConcern"] = read_concern.document + if session: + session._update_read_concern(spec, conn) + if collation is not None: + spec["collation"] = collation + + publish = listeners is not None and listeners.enabled_for_commands + start = datetime.datetime.now() + if publish: + speculative_hello = _is_speculative_authenticate(name, spec) + + if compression_ctx and name.lower() in _NO_COMPRESSION: + compression_ctx = None + + if client and client._encrypter and not client._encrypter._bypass_auto_encryption: + spec = orig = await client._encrypter.encrypt(dbname, spec, codec_options) + + # Support CSOT + if client: + conn.apply_timeout(client, spec) + _csot.apply_write_concern(spec, write_concern) + + if use_op_msg: + flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 + flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 + request_id, msg, size, max_doc_size = message._op_msg( + flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx + ) + # If this is an unacknowledged write then make sure the encoded doc(s) + # are small enough, otherwise rely on the server to return an error. + if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: + message._raise_document_too_large(name, size, max_bson_size) + else: + request_id, msg, size = message._query( + 0, ns, 0, -1, spec, None, codec_options, compression_ctx ) + max_doc_size = 0 + + if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: + message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) - return response_doc # type: ignore[return-value] + docs, _reply, _duration = await _network_command_core( + conn=conn, + dbname=dbname, + spec=spec, + request_id=request_id, + msg=msg, + max_doc_size=max_doc_size, + codec_options=codec_options, + session=session, + client=client, + listeners=listeners, + address=address, + start=start, + check=check, + allowable_errors=allowable_errors, + parse_write_concern_error=parse_write_concern_error, + user_fields=user_fields, + unacknowledged=unacknowledged, + orig=orig, + speculative_hello=speculative_hello, + ) + return cast("_DocumentType", docs[0]) diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 3e37dae249..3f881de2f5 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -26,21 +26,14 @@ Union, ) -from bson import _decode_all_selective from pymongo.asynchronous.helpers import _handle_reauth -from pymongo.command_helpers import ( - _log_command_failed, - _log_command_started, - _log_command_succeeded, -) -from pymongo.errors import NotPrimaryError, OperationFailure -from pymongo.helpers_shared import _check_command_response +from pymongo.asynchronous.network import _network_command_core from pymongo.logger import ( _SDAM_LOGGER, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _Query +from pymongo.message import _GetMore, _Query from pymongo.response import PinnedResponse, Response if TYPE_CHECKING: @@ -58,8 +51,6 @@ _IS_SYNC = False -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - class Server: def __init__( @@ -161,7 +152,6 @@ async def run_operation( :param client: An AsyncMongoClient instance. """ assert listeners is not None - publish = listeners.enabled_for_commands start = datetime.now() operation.use_command(conn) @@ -169,101 +159,38 @@ async def run_operation( cmd, dbn = await self.operation_to_command(operation, conn, True) if more_to_come: request_id = 0 + msg = None + max_doc_size = 0 else: - message = operation.get_message(read_preference, conn, True) - request_id, data, max_doc_size = self._split_message(message) - - _log_command_started(client, conn, cmd, dbn, request_id, request_id) - - if publish: - if "$db" not in cmd: - cmd["$db"] = dbn - assert listeners is not None - listeners.publish_command_start( - cmd, - dbn, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - ) - - try: - if more_to_come: - reply = await conn.receive_message(None) - else: - await conn.send_message(data, max_doc_size) - reply = await conn.receive_message(request_id) - - # Unpack and check for command errors. - docs = unpack_res( - reply, - operation.cursor_id, - operation.codec_options, - legacy_response=False, - user_fields=_CURSOR_DOC_FIELDS, - ) - first = docs[0] - await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] - except Exception as exc: - duration = datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - _log_command_failed( - client, - conn, - cmd, - dbn, - request_id, - request_id, - failure, - duration, - isinstance(exc, OperationFailure), - ) - if publish: - assert listeners is not None - listeners.publish_command_failure( - duration, - failure, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) - raise - duration = datetime.now() - start - # Must publish in find / getMore / explain command response format. - res = docs[0] - _log_command_succeeded(client, conn, cmd, dbn, request_id, request_id, res, duration) - if publish: - assert listeners is not None - listeners.publish_command_success( - duration, - res, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) - - # Decrypt response. - client = operation.client # type: ignore[assignment] - if client and client._encrypter: - decrypted = await client._encrypter.decrypt(reply.raw_command_response()) - docs = _decode_all_selective(decrypted, operation.codec_options, _CURSOR_DOC_FIELDS) + op_message = operation.get_message(read_preference, conn, True) + request_id, msg, max_doc_size = self._split_message(op_message) + + if listeners.enabled_for_commands and "$db" not in cmd: + cmd["$db"] = dbn + + docs, reply, duration = await _network_command_core( + conn=conn, + dbname=dbn, + spec=cmd, + request_id=request_id, + msg=msg, + max_doc_size=max_doc_size, + codec_options=operation.codec_options, + session=operation.session, # type: ignore[arg-type] + client=client, + listeners=listeners, + address=conn.address, + start=start, + more_to_come=more_to_come, + unpack_res=unpack_res, + cursor_id=operation.cursor_id, + ) response: Response - + client = operation.client # type: ignore[assignment] if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] conn.pin_cursor() - more_to_come = reply.more_to_come + more_to_come = reply.more_to_come # type: ignore[union-attr] if operation.conn_mgr: operation.conn_mgr.update_exhaust(more_to_come) response = PinnedResponse( diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 3ab3664e72..93a5d0c6ec 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -19,6 +19,7 @@ from typing import ( TYPE_CHECKING, Any, + Callable, Mapping, MutableMapping, Optional, @@ -41,10 +42,6 @@ ) from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate -from pymongo.network_layer import ( - receive_message, - sendall, -) if TYPE_CHECKING: from bson import CodecOptions @@ -60,115 +57,51 @@ _IS_SYNC = True +_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} -def command( + +def _network_command_core( conn: Connection, dbname: str, spec: MutableMapping[str, Any], - is_mongos: bool, - read_preference: Optional[_ServerMode], + request_id: int, + msg: Optional[bytes], + max_doc_size: int, codec_options: CodecOptions[_DocumentType], session: Optional[ClientSession], client: Optional[MongoClient[Any]], + listeners: Optional[_EventListeners], + address: Optional[_Address], + start: datetime.datetime, check: bool = True, allowable_errors: Optional[Sequence[Union[str, int]]] = None, - address: Optional[_Address] = None, - listeners: Optional[_EventListeners] = None, - max_bson_size: Optional[int] = None, - read_concern: Optional[ReadConcern] = None, parse_write_concern_error: bool = False, - collation: Optional[_CollationIn] = None, - compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, - use_op_msg: bool = False, - unacknowledged: bool = False, user_fields: Optional[Mapping[str, Any]] = None, - exhaust_allowed: bool = False, - write_concern: Optional[WriteConcern] = None, -) -> _DocumentType: - """Execute a command over the socket, or raise socket.error. + unacknowledged: bool = False, + more_to_come: bool = False, + unpack_res: Optional[Callable[..., list[_DocumentOut]]] = None, + cursor_id: Optional[int] = None, + orig: Optional[MutableMapping[str, Any]] = None, + speculative_hello: bool = False, +) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]: + """Send/receive a command and return (docs, raw_reply, duration). - :param conn: a Connection instance - :param dbname: name of the database on which to run the command - :param spec: a command document as an ordered dict type, eg SON. - :param is_mongos: are we connected to a mongos? - :param read_preference: a read preference - :param codec_options: a CodecOptions instance - :param session: optional ClientSession instance. - :param client: optional MongoClient instance for updating $clusterTime. - :param check: raise OperationFailure if there are errors - :param allowable_errors: errors to ignore if `check` is True - :param address: the (host, port) of `conn` - :param listeners: An instance of :class:`~pymongo.monitoring.EventListeners` - :param max_bson_size: The maximum encoded bson size for this server - :param read_concern: The read concern for this command. - :param parse_write_concern_error: Whether to parse the ``writeConcernError`` - field in the command response. - :param collation: The collation for this command. - :param compression_ctx: optional compression Context. - :param use_op_msg: True if we should use OP_MSG. - :param unacknowledged: True if this is an unacknowledged command. - :param user_fields: Response fields that should be decoded - using the TypeDecoders from codec_options, passed to - bson._decode_all_selective. - :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed. + Handles APM logging, send/receive, unpacking, response processing, + and decryption. Both the standard command path and the cursor + (find/getMore) path go through this function. """ - name = next(iter(spec)) - ns = dbname + ".$cmd" - speculative_hello = False - - # Publish the original command document, perhaps with lsid and $clusterTime. - orig = spec - if is_mongos and not use_op_msg: - assert read_preference is not None - spec = message._maybe_add_read_preference(spec, read_preference) - if read_concern and not (session and session.in_transaction): - if read_concern.level: - spec["readConcern"] = read_concern.document - if session: - session._update_read_concern(spec, conn) - if collation is not None: - spec["collation"] = collation - publish = listeners is not None and listeners.enabled_for_commands - start = datetime.datetime.now() - if publish: - speculative_hello = _is_speculative_authenticate(name, spec) - - if compression_ctx and name.lower() in _NO_COMPRESSION: - compression_ctx = None - - if client and client._encrypter and not client._encrypter._bypass_auto_encryption: - spec = orig = client._encrypter.encrypt(dbname, spec, codec_options) - - # Support CSOT - if client: - conn.apply_timeout(client, spec) - _csot.apply_write_concern(spec, write_concern) - - if use_op_msg: - flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 - flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 - request_id, msg, size, max_doc_size = message._op_msg( - flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx - ) - # If this is an unacknowledged write then make sure the encoded doc(s) - # are small enough, otherwise rely on the server to return an error. - if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: - message._raise_document_too_large(name, size, max_bson_size) - else: - request_id, msg, size = message._query( - 0, ns, 0, -1, spec, None, codec_options, compression_ctx - ) + name = next(iter(spec)) + reply: Optional[_OpMsg] = None + docs: list[_DocumentOut] = [] - if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: - message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) if client is not None: _log_command_started(client, conn, spec, dbname, request_id, request_id) if publish: assert listeners is not None assert address is not None listeners.publish_command_start( - orig, + orig if orig is not None else spec, dbname, request_id, address, @@ -177,19 +110,32 @@ def command( ) try: - sendall(conn.conn.get_conn, msg) - if use_op_msg and unacknowledged: - # Unacknowledged, fake a successful command response. - reply = None - response_doc: _DocumentOut = {"ok": 1} + if more_to_come: + reply = conn.receive_message(None) else: - reply = receive_message(conn, request_id) - conn.more_to_come = reply.more_to_come - unpacked_docs = reply.unpack_response( - codec_options=codec_options, user_fields=user_fields - ) + assert msg is not None + conn.send_message(msg, max_doc_size) + if unacknowledged: + # Unacknowledged write: fake a successful command response. + docs = [{"ok": 1}] # type: ignore[list-item] + else: + reply = conn.receive_message(request_id) - response_doc = unpacked_docs[0] + if reply is not None: + conn.more_to_come = reply.more_to_come + if unpack_res is not None: + docs = unpack_res( + reply, + cursor_id, + codec_options, + legacy_response=False, + user_fields=_CURSOR_DOC_FIELDS, + ) + else: + docs = list( + reply.unpack_response(codec_options=codec_options, user_fields=user_fields) + ) + response_doc = docs[0] if not conn.ready: cluster_time = response_doc.get("$clusterTime") if cluster_time: @@ -244,16 +190,16 @@ def command( dbname, request_id, request_id, - response_doc, + docs[0], duration, - "speculativeAuthenticate" in orig, + speculative_hello, ) if publish: assert listeners is not None assert address is not None listeners.publish_command_success( duration, - response_doc, + docs[0], name, request_id, address, @@ -263,10 +209,137 @@ def command( database_name=dbname, ) - if client and client._encrypter and reply: + # Decrypt response. + if client and client._encrypter and reply is not None: decrypted = client._encrypter.decrypt(reply.raw_command_response()) - response_doc = cast( - "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] + decrypt_fields = _CURSOR_DOC_FIELDS if unpack_res is not None else user_fields + docs = list(_decode_all_selective(decrypted, codec_options, decrypt_fields)) + + return docs, reply, duration + + +def command( + conn: Connection, + dbname: str, + spec: MutableMapping[str, Any], + is_mongos: bool, + read_preference: Optional[_ServerMode], + codec_options: CodecOptions[_DocumentType], + session: Optional[ClientSession], + client: Optional[MongoClient[Any]], + check: bool = True, + allowable_errors: Optional[Sequence[Union[str, int]]] = None, + address: Optional[_Address] = None, + listeners: Optional[_EventListeners] = None, + max_bson_size: Optional[int] = None, + read_concern: Optional[ReadConcern] = None, + parse_write_concern_error: bool = False, + collation: Optional[_CollationIn] = None, + compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None, + use_op_msg: bool = False, + unacknowledged: bool = False, + user_fields: Optional[Mapping[str, Any]] = None, + exhaust_allowed: bool = False, + write_concern: Optional[WriteConcern] = None, +) -> _DocumentType: + """Execute a command over the socket, or raise socket.error. + + :param conn: a Connection instance + :param dbname: name of the database on which to run the command + :param spec: a command document as an ordered dict type, eg SON. + :param is_mongos: are we connected to a mongos? + :param read_preference: a read preference + :param codec_options: a CodecOptions instance + :param session: optional ClientSession instance. + :param client: optional MongoClient instance for updating $clusterTime. + :param check: raise OperationFailure if there are errors + :param allowable_errors: errors to ignore if `check` is True + :param address: the (host, port) of `conn` + :param listeners: An instance of :class:`~pymongo.monitoring.EventListeners` + :param max_bson_size: The maximum encoded bson size for this server + :param read_concern: The read concern for this command. + :param parse_write_concern_error: Whether to parse the ``writeConcernError`` + field in the command response. + :param collation: The collation for this command. + :param compression_ctx: optional compression Context. + :param use_op_msg: True if we should use OP_MSG. + :param unacknowledged: True if this is an unacknowledged command. + :param user_fields: Response fields that should be decoded + using the TypeDecoders from codec_options, passed to + bson._decode_all_selective. + :param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed. + """ + name = next(iter(spec)) + ns = dbname + ".$cmd" + speculative_hello = False + + # Publish the original command document, perhaps with lsid and $clusterTime. + orig = spec + if is_mongos and not use_op_msg: + assert read_preference is not None + spec = message._maybe_add_read_preference(spec, read_preference) + if read_concern and not (session and session.in_transaction): + if read_concern.level: + spec["readConcern"] = read_concern.document + if session: + session._update_read_concern(spec, conn) + if collation is not None: + spec["collation"] = collation + + publish = listeners is not None and listeners.enabled_for_commands + start = datetime.datetime.now() + if publish: + speculative_hello = _is_speculative_authenticate(name, spec) + + if compression_ctx and name.lower() in _NO_COMPRESSION: + compression_ctx = None + + if client and client._encrypter and not client._encrypter._bypass_auto_encryption: + spec = orig = client._encrypter.encrypt(dbname, spec, codec_options) + + # Support CSOT + if client: + conn.apply_timeout(client, spec) + _csot.apply_write_concern(spec, write_concern) + + if use_op_msg: + flags = _OpMsg.MORE_TO_COME if unacknowledged else 0 + flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0 + request_id, msg, size, max_doc_size = message._op_msg( + flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx + ) + # If this is an unacknowledged write then make sure the encoded doc(s) + # are small enough, otherwise rely on the server to return an error. + if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size: + message._raise_document_too_large(name, size, max_bson_size) + else: + request_id, msg, size = message._query( + 0, ns, 0, -1, spec, None, codec_options, compression_ctx ) + max_doc_size = 0 + + if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: + message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) - return response_doc # type: ignore[return-value] + docs, _reply, _duration = _network_command_core( + conn=conn, + dbname=dbname, + spec=spec, + request_id=request_id, + msg=msg, + max_doc_size=max_doc_size, + codec_options=codec_options, + session=session, + client=client, + listeners=listeners, + address=address, + start=start, + check=check, + allowable_errors=allowable_errors, + parse_write_concern_error=parse_write_concern_error, + user_fields=user_fields, + unacknowledged=unacknowledged, + orig=orig, + speculative_hello=speculative_hello, + ) + return cast("_DocumentType", docs[0]) diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 9d033df670..212b5e5b40 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -26,22 +26,15 @@ Union, ) -from bson import _decode_all_selective -from pymongo.command_helpers import ( - _log_command_failed, - _log_command_started, - _log_command_succeeded, -) -from pymongo.errors import NotPrimaryError, OperationFailure -from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( _SDAM_LOGGER, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _Query +from pymongo.message import _GetMore, _Query from pymongo.response import PinnedResponse, Response from pymongo.synchronous.helpers import _handle_reauth +from pymongo.synchronous.network import _network_command_core if TYPE_CHECKING: from queue import Queue @@ -58,8 +51,6 @@ _IS_SYNC = True -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - class Server: def __init__( @@ -161,7 +152,6 @@ def run_operation( :param client: A MongoClient instance. """ assert listeners is not None - publish = listeners.enabled_for_commands start = datetime.now() operation.use_command(conn) @@ -169,101 +159,38 @@ def run_operation( cmd, dbn = self.operation_to_command(operation, conn, True) if more_to_come: request_id = 0 + msg = None + max_doc_size = 0 else: - message = operation.get_message(read_preference, conn, True) - request_id, data, max_doc_size = self._split_message(message) - - _log_command_started(client, conn, cmd, dbn, request_id, request_id) - - if publish: - if "$db" not in cmd: - cmd["$db"] = dbn - assert listeners is not None - listeners.publish_command_start( - cmd, - dbn, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - ) - - try: - if more_to_come: - reply = conn.receive_message(None) - else: - conn.send_message(data, max_doc_size) - reply = conn.receive_message(request_id) - - # Unpack and check for command errors. - docs = unpack_res( - reply, - operation.cursor_id, - operation.codec_options, - legacy_response=False, - user_fields=_CURSOR_DOC_FIELDS, - ) - first = docs[0] - operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] - except Exception as exc: - duration = datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - _log_command_failed( - client, - conn, - cmd, - dbn, - request_id, - request_id, - failure, - duration, - isinstance(exc, OperationFailure), - ) - if publish: - assert listeners is not None - listeners.publish_command_failure( - duration, - failure, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) - raise - duration = datetime.now() - start - # Must publish in find / getMore / explain command response format. - res = docs[0] - _log_command_succeeded(client, conn, cmd, dbn, request_id, request_id, res, duration) - if publish: - assert listeners is not None - listeners.publish_command_success( - duration, - res, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) - - # Decrypt response. - client = operation.client # type: ignore[assignment] - if client and client._encrypter: - decrypted = client._encrypter.decrypt(reply.raw_command_response()) - docs = _decode_all_selective(decrypted, operation.codec_options, _CURSOR_DOC_FIELDS) + op_message = operation.get_message(read_preference, conn, True) + request_id, msg, max_doc_size = self._split_message(op_message) + + if listeners.enabled_for_commands and "$db" not in cmd: + cmd["$db"] = dbn + + docs, reply, duration = _network_command_core( + conn=conn, + dbname=dbn, + spec=cmd, + request_id=request_id, + msg=msg, + max_doc_size=max_doc_size, + codec_options=operation.codec_options, + session=operation.session, # type: ignore[arg-type] + client=client, + listeners=listeners, + address=conn.address, + start=start, + more_to_come=more_to_come, + unpack_res=unpack_res, + cursor_id=operation.cursor_id, + ) response: Response - + client = operation.client # type: ignore[assignment] if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] conn.pin_cursor() - more_to_come = reply.more_to_come + more_to_come = reply.more_to_come # type: ignore[union-attr] if operation.conn_mgr: operation.conn_mgr.update_exhaust(more_to_come) response = PinnedResponse( From b194f2aefa1a4f9e7f14e4265a442e53e4b824e7 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Tue, 5 May 2026 21:04:07 -0400 Subject: [PATCH 4/4] Fix typing errors --- pymongo/asynchronous/network.py | 8 ++++---- pymongo/asynchronous/server.py | 3 ++- pymongo/synchronous/network.py | 8 ++++---- pymongo/synchronous/server.py | 3 ++- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index a7696af442..bd51397ebe 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -40,7 +40,7 @@ NotPrimaryError, OperationFailure, ) -from pymongo.message import _OpMsg +from pymongo.message import _OpMsg, _OpReply from pymongo.monitoring import _is_speculative_authenticate if TYPE_CHECKING: @@ -83,7 +83,7 @@ async def _network_command_core( cursor_id: Optional[int] = None, orig: Optional[MutableMapping[str, Any]] = None, speculative_hello: bool = False, -) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]: +) -> tuple[list[_DocumentOut], Optional[Union[_OpReply, _OpMsg]], datetime.timedelta]: """Send/receive a command and return (docs, raw_reply, duration). Handles APM logging, send/receive, unpacking, response processing, @@ -92,7 +92,7 @@ async def _network_command_core( """ publish = listeners is not None and listeners.enabled_for_commands name = next(iter(spec)) - reply: Optional[_OpMsg] = None + reply: Optional[Union[_OpReply, _OpMsg]] = None docs: list[_DocumentOut] = [] if client is not None: @@ -213,7 +213,7 @@ async def _network_command_core( if client and client._encrypter and reply is not None: decrypted = await client._encrypter.decrypt(reply.raw_command_response()) decrypt_fields = _CURSOR_DOC_FIELDS if unpack_res is not None else user_fields - docs = list(_decode_all_selective(decrypted, codec_options, decrypt_fields)) + docs = list(_decode_all_selective(decrypted, codec_options, decrypt_fields)) # type: ignore[arg-type] return docs, reply, duration diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 3f881de2f5..855cc04a81 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -155,7 +155,7 @@ async def run_operation( start = datetime.now() operation.use_command(conn) - more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come + more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) cmd, dbn = await self.operation_to_command(operation, conn, True) if more_to_come: request_id = 0 @@ -186,6 +186,7 @@ async def run_operation( cursor_id=operation.cursor_id, ) + assert reply is not None response: Response client = operation.client # type: ignore[assignment] if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 93a5d0c6ec..1099af24d7 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -40,7 +40,7 @@ NotPrimaryError, OperationFailure, ) -from pymongo.message import _OpMsg +from pymongo.message import _OpMsg, _OpReply from pymongo.monitoring import _is_speculative_authenticate if TYPE_CHECKING: @@ -83,7 +83,7 @@ def _network_command_core( cursor_id: Optional[int] = None, orig: Optional[MutableMapping[str, Any]] = None, speculative_hello: bool = False, -) -> tuple[list[_DocumentOut], Optional[_OpMsg], datetime.timedelta]: +) -> tuple[list[_DocumentOut], Optional[Union[_OpReply, _OpMsg]], datetime.timedelta]: """Send/receive a command and return (docs, raw_reply, duration). Handles APM logging, send/receive, unpacking, response processing, @@ -92,7 +92,7 @@ def _network_command_core( """ publish = listeners is not None and listeners.enabled_for_commands name = next(iter(spec)) - reply: Optional[_OpMsg] = None + reply: Optional[Union[_OpReply, _OpMsg]] = None docs: list[_DocumentOut] = [] if client is not None: @@ -213,7 +213,7 @@ def _network_command_core( if client and client._encrypter and reply is not None: decrypted = client._encrypter.decrypt(reply.raw_command_response()) decrypt_fields = _CURSOR_DOC_FIELDS if unpack_res is not None else user_fields - docs = list(_decode_all_selective(decrypted, codec_options, decrypt_fields)) + docs = list(_decode_all_selective(decrypted, codec_options, decrypt_fields)) # type: ignore[arg-type] return docs, reply, duration diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 212b5e5b40..a6964518f4 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -155,7 +155,7 @@ def run_operation( start = datetime.now() operation.use_command(conn) - more_to_come = operation.conn_mgr and operation.conn_mgr.more_to_come + more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) cmd, dbn = self.operation_to_command(operation, conn, True) if more_to_come: request_id = 0 @@ -186,6 +186,7 @@ def run_operation( cursor_id=operation.cursor_id, ) + assert reply is not None response: Response client = operation.client # type: ignore[assignment] if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type]