Uploaded image for project: 'Java Driver'
  1. Java Driver
  2. JAVA-2858

ChangeStream cursor raises a MongoSocketReadException when there is no event since last replica set election

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 3.6.4, 3.7.1
    • Affects Version/s: 3.6.0
    • Component/s: Query Operations
    • None
    • Environment:
      openjdk version "1.8.0_151"
      ubuntu v16.04

      While testing the fix in JAVA-2821 for v3.7.0 (and v3.8.0-beta2) I noticed that change streams cursor resumes when the primary member fails, however only if there has been an event since the last election.

      For example:
      If a replica set with three members (PSS) 27017, 27018, 27019.
      The primary member is currently on `27017`

      1. Insert on the primary 27017 -> change stream event sent
      2. Stop the primary 27017 -> the cursor now works in v3.7.0+ and resumes correctly
      3. Wait for a new primary to be elected.
      4. Insert on the new primary 27018 -> change stream event sent
      5. Stop the current primary 27018 -> cursor resumes correctly.
      6. Wait for a new primary to be elected.
      7. This time do not trigger any events on the collection (you can perform events on another collection though). Stop the new primary 27019.
      8. The cursor will throw an exception com.mongodb.MongoSocketReadException

      Example code:

              MongoClient mongoClient;
              String mongoURI = "mongodb://localhost:27017,localhost:27018,localhost:27019/test?replicaSet=changestream";
              mongoClient = new MongoClient(new MongoClientURI(mongoURI));
      
              MongoDatabase database = mongoClient.getDatabase("dbname");
              MongoCollection<Document> collection = database.getCollection("collname");
              Block<ChangeStreamDocument<Document>> printBlock = new 
              Block<ChangeStreamDocument<Document>>() {
                  @Override
                  public void apply(final ChangeStreamDocument<Document> changeStreamDocument) {
                      System.out.println(changeStreamDocument);
                  }
              };
              MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();
              ChangeStreamDocument<Document> next = cursor.next();
              System.out.println(next);
              while(cursor.hasNext()){
                  next = cursor.next();
                  System.out.println(next);
              }
      

      The stack trace:

      INFO: Closed connection [connectionId{localValue:35, serverValue:4}] to ironhide.local:27019 because there was a socket exception raised by this connection.
      Exception in thread "main" com.mongodb.MongoSocketReadException: Prematurely reached end of stream
      	at com.mongodb.connection.SocketStream.read(SocketStream.java:87)
      	at com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:538)
      	at com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:409)
      	at com.mongodb.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:281)
      	at com.mongodb.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:247)
      	at com.mongodb.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:98)
      	at com.mongodb.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:441)
      	at com.mongodb.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:70)
      	at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:192)
      	at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:264)
      	at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:126)
      	at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:118)
      	at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:222)
      	at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:115)
      	at com.mongodb.operation.ChangeStreamBatchCursor$1.apply(ChangeStreamBatchCursor.java:58)
      	at com.mongodb.operation.ChangeStreamBatchCursor$1.apply(ChangeStreamBatchCursor.java:55)
      	at com.mongodb.operation.ChangeStreamBatchCursor.resumeableOperation(ChangeStreamBatchCursor.java:142)
      	at com.mongodb.operation.ChangeStreamBatchCursor.hasNext(ChangeStreamBatchCursor.java:55)
      	at com.mongodb.client.internal.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:54)
      	at com.tour.ChangeStream.main(ChangeStream.java:53)
      

            Assignee:
            jeff.yemin@mongodb.com Jeffrey Yemin
            Reporter:
            wan.bachtiar@mongodb.com Wan Bachtiar
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: