bulk_write fails with AttributeError when a batch fails with InvalidBSON:
[2024/08/12 10:41:58.788] FAILURE: AttributeError: 'InvalidBSON' object has no attribute 'details' () [2024/08/12 10:41:58.788] self = <test.asynchronous.test_client_bulk_write.TestClientBulkWriteCRUD testMethod=test_collects_write_errors_across_batches_ordered> [2024/08/12 10:41:58.788] @async_client_context.require_version_min(8, 0, 0, -24) [2024/08/12 10:41:58.788] async def test_collects_write_errors_across_batches_ordered(self): [2024/08/12 10:41:58.788] listener = OvertCommandListener() [2024/08/12 10:41:58.788] client = await async_rs_or_single_client(event_listeners=[listener]) [2024/08/12 10:41:58.788] self.addAsyncCleanup(client.aclose) [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] collection = client.db["coll"] [2024/08/12 10:41:58.788] self.addAsyncCleanup(collection.drop) [2024/08/12 10:41:58.788] await collection.drop() [2024/08/12 10:41:58.788] await collection.insert_one(document={"_id": 1}) [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] models = [] [2024/08/12 10:41:58.788] for _ in range(self.max_write_batch_size + 1): [2024/08/12 10:41:58.788] models.append( [2024/08/12 10:41:58.788] InsertOne( [2024/08/12 10:41:58.788] namespace="db.coll", [2024/08/12 10:41:58.788] document={"_id": 1}, [2024/08/12 10:41:58.788] ) [2024/08/12 10:41:58.788] ) [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] with self.assertRaises(ClientBulkWriteException) as context: [2024/08/12 10:41:58.788] > await client.bulk_write(models=models, ordered=True) [2024/08/12 10:41:58.788] test/asynchronous/test_client_bulk_write.py:215: [2024/08/12 10:41:58.788] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ [2024/08/12 10:41:58.788] pymongo/_csot.py:110: in csot_wrapper [2024/08/12 10:41:58.788] return await func(self, *args, **kwargs) [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:2349: in bulk_write [2024/08/12 10:41:58.788] return await blk.execute(session, _Op.BULK_WRITE) [2024/08/12 10:41:58.788] pymongo/asynchronous/client_bulk.py:783: in execute [2024/08/12 10:41:58.788] result = await self.execute_command(session, operation) [2024/08/12 10:41:58.788] pymongo/asynchronous/client_bulk.py:651: in execute_command [2024/08/12 10:41:58.788] await self.client._retryable_write( [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:1873: in _retryable_write [2024/08/12 10:41:58.788] return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id) [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:1759: in _retry_with_session [2024/08/12 10:41:58.788] return await self._retry_internal( [2024/08/12 10:41:58.788] pymongo/_csot.py:110: in csot_wrapper [2024/08/12 10:41:58.788] return await func(self, *args, **kwargs) [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:1794: in _retry_internal [2024/08/12 10:41:58.788] return await _ClientConnectionRetryable( [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:2530: in run [2024/08/12 10:41:58.788] return await self._read() if self._is_read else await self._write() [2024/08/12 10:41:58.788] pymongo/asynchronous/mongo_client.py:2650: in _write [2024/08/12 10:41:58.788] return await self._func(self._session, conn, self._retryable) # type: ignore [2024/08/12 10:41:58.788] pymongo/asynchronous/client_bulk.py:642: in retryable_bulk [2024/08/12 10:41:58.788] await self._execute_command( [2024/08/12 10:41:58.788] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ [2024/08/12 10:41:58.788] async def _execute_command( [2024/08/12 10:41:58.788] self, [2024/08/12 10:41:58.788] write_concern: WriteConcern, [2024/08/12 10:41:58.788] session: Optional[AsyncClientSession], [2024/08/12 10:41:58.788] conn: AsyncConnection, [2024/08/12 10:41:58.788] op_id: int, [2024/08/12 10:41:58.788] retryable: bool, [2024/08/12 10:41:58.788] full_result: MutableMapping[str, Any], [2024/08/12 10:41:58.788] final_write_concern: Optional[WriteConcern] = None, [2024/08/12 10:41:58.788] ) -> None: [2024/08/12 10:41:58.788] """Internal helper for executing batches of bulkWrite commands.""" [2024/08/12 10:41:58.788] db_name = "admin" [2024/08/12 10:41:58.788] cmd_name = "bulkWrite" [2024/08/12 10:41:58.788] listeners = self.client._event_listeners [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] # AsyncConnection.command validates the session, but we use [2024/08/12 10:41:58.788] # AsyncConnection.write_command [2024/08/12 10:41:58.788] conn.validate_session(self.client, session) [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] bwc = self.bulk_ctx_class( [2024/08/12 10:41:58.788] db_name, [2024/08/12 10:41:58.788] cmd_name, [2024/08/12 10:41:58.788] conn, [2024/08/12 10:41:58.788] op_id, [2024/08/12 10:41:58.788] listeners, # type: ignore[arg-type] [2024/08/12 10:41:58.788] session, [2024/08/12 10:41:58.788] self.client.codec_options, [2024/08/12 10:41:58.788] ) [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] while self.idx_offset < self.total_ops: [2024/08/12 10:41:58.788] # If this is the last possible batch, use the [2024/08/12 10:41:58.788] # final write concern. [2024/08/12 10:41:58.788] if self.total_ops - self.idx_offset <= bwc.max_write_batch_size: [2024/08/12 10:41:58.788] write_concern = final_write_concern or write_concern [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] # Construct the server command, specifying the relevant options. [2024/08/12 10:41:58.788] cmd = {"bulkWrite": 1} [2024/08/12 10:41:58.788] cmd["errorsOnly"] = not self.verbose_results [2024/08/12 10:41:58.788] cmd["ordered"] = self.ordered # type: ignore[assignment] [2024/08/12 10:41:58.788] not_in_transaction = session and not session.in_transaction [2024/08/12 10:41:58.788] if not_in_transaction or not session: [2024/08/12 10:41:58.788] _csot.apply_write_concern(cmd, write_concern) [2024/08/12 10:41:58.788] if self.bypass_doc_val is not None: [2024/08/12 10:41:58.788] cmd["bypassDocumentValidation"] = self.bypass_doc_val [2024/08/12 10:41:58.788] if self.comment: [2024/08/12 10:41:58.788] cmd["comment"] = self.comment # type: ignore[assignment] [2024/08/12 10:41:58.788] if self.let: [2024/08/12 10:41:58.788] cmd["let"] = self.let [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] if session: [2024/08/12 10:41:58.788] # Start a new retryable write unless one was already [2024/08/12 10:41:58.788] # started for this command. [2024/08/12 10:41:58.788] if retryable and not self.started_retryable_write: [2024/08/12 10:41:58.788] session._start_retryable_write() [2024/08/12 10:41:58.788] self.started_retryable_write = True [2024/08/12 10:41:58.788] session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn) [2024/08/12 10:41:58.788] conn.send_cluster_time(cmd, session, self.client) [2024/08/12 10:41:58.788] conn.add_server_api(cmd) [2024/08/12 10:41:58.788] # CSOT: apply timeout before encoding the command. [2024/08/12 10:41:58.788] conn.apply_timeout(self.client, cmd) [2024/08/12 10:41:58.788] ops = islice(self.ops, self.idx_offset, None) [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] # Run as many ops as possible in one server command. [2024/08/12 10:41:58.788] if write_concern.acknowledged: [2024/08/12 10:41:58.788] raw_result, to_send_ops, _ = await self._execute_batch(bwc, cmd, ops) # type: ignore[arg-type] [2024/08/12 10:41:58.788] result = copy.deepcopy(raw_result) [2024/08/12 10:41:58.788] [2024/08/12 10:41:58.788] # Top-level server/network error. [2024/08/12 10:41:58.788] if result.get("error"): [2024/08/12 10:41:58.788] error = result["error"] [2024/08/12 10:41:58.788] retryable_top_level_error = ( [2024/08/12 10:41:58.788] > isinstance(error.details, dict) [2024/08/12 10:41:58.788] and error.details.get("code", 0) in _RETRYABLE_ERROR_CODES [2024/08/12 10:41:58.788] ) [2024/08/12 10:41:58.788] E AttributeError: 'InvalidBSON' object has no attribute 'details' [2024/08/12 10:41:58.788] pymongo/asynchronous/client_bulk.py:553: AttributeError
We should guard the access on "error.details".
- is caused by
-
PYTHON-4550 Add MongoClient.bulk_write API
- Closed