-
Type: Bug
-
Resolution: Fixed
-
Priority: Major - P3
-
Affects Version/s: None
-
Component/s: Change Streams
-
None
When running a change stream cursor it should be resilient to replicaset changes.
Reproducing the error:
1. Create a change stream cursor
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017,localhost:27018,localhost:27019/?readPreference=secondary"); // Select the MongoDB database. MongoDatabase database = mongoClient.getDatabase("testChangeStreams"); database.drop(); sleep(); // Select the collection to query. MongoCollection<Document> collection = database.getCollection("documents"); /* * Example 1 * Create a simple change stream against an existing collection. */ System.out.println("1. Initial document from the Change Stream:"); // Create the change stream cursor. MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = collection.watch().cursor(); // Insert a test document into the collection. collection.insertOne(Document.parse("{username: 'alice123', name: 'Alice'}")); ChangeStreamDocument<Document> next = cursor.next(); System.out.println(next); next = cursor.tryNext(); while (next == null) { sleep(); cursor.tryNext(); }
Then hide the secondary node the cursor is using eg:
cfg = rs.conf()
cfg.members[1].priority = 0
cfg.members[1].hidden = true
rs.reconfig(cfg)
Produces this error:
Caused by: java.lang.IllegalStateException: state should be: open at com.mongodb.assertions.Assertions.isTrue(Assertions.java:79) at com.mongodb.internal.connection.DefaultServer.getConnection(DefaultServer.java:94) at com.mongodb.internal.binding.ClusterBinding$ClusterBindingConnectionSource.getConnection(ClusterBinding.java:141) at com.mongodb.client.internal.ClientSessionBinding$SessionBindingConnectionSource.getConnection(ClientSessionBinding.java:148) at com.mongodb.internal.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:264) at com.mongodb.internal.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:219) at com.mongodb.internal.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:203) at com.mongodb.internal.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:96) at com.mongodb.internal.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:92) at com.mongodb.internal.operation.ChangeStreamBatchCursor.resumeableOperation(ChangeStreamBatchCursor.java:182)
Expected/desired result: the change stream should resume on a different secondary.
This may impact tailable cursors as well.
Note: When using a primary read preference, the change stream is resilient to topology changes.
- is related to
-
CSHARP-3963 MongoConnectionPoolPausedException should be resumable for change stream
- Closed
-
KAFKA-212 Closing cursor exception
- Closed