Uploaded image for project: 'Go Driver'
  1. Go Driver
  2. GODRIVER-1648

calling cursor.Next() after changestream is invalidated enters spin loop

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 1.3.5
    • Affects Version/s: None
    • Component/s: CRUD
    • None

      (Using v1.3.4)
      When consuming a change stream in a for loop with successive calls to cursor.Next(), after the "invalidate" event is received on the stream the subsequent call to Next() never returns and CPU usage hits 100%.

      In practice the caller should probably avoid this by detecting the invalidate event and breaking the loop, but it seems like the driver should be able to avoid spinning the CPU when the stream goes idle after being invalidated.

      Here's a minimal test case to reproduce:

      package main
      
      import (
      	"context"
      	"fmt"
      	"log"
      	"testing"
      	"time"
      
      	"go.mongodb.org/mongo-driver/bson"
      	"go.mongodb.org/mongo-driver/bson/primitive"
      	"go.mongodb.org/mongo-driver/mongo"
      	"go.mongodb.org/mongo-driver/mongo/options"
      )
      
      func TestChangestream(t *testing.T) {
      	log.SetFlags(log.LstdFlags | log.Lshortfile)
      
      	client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://localhost:26000"))
      	if err != nil {
      		log.Fatal(err)
      	}
      
      	ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
      	err = client.Connect(ctx)
      
      	db1, coll1 := primitive.NewObjectID().Hex(), primitive.NewObjectID().Hex()
      
      	_, err = client.Database(db1).Collection(coll1).InsertOne(context.Background(), bson.D{{"x", "1"}})
      	if err != nil {
      		log.Fatal(err)
      	}
      
      	cursor, err := client.Database(db1).Watch(
      		context.Background(),
      		[]interface{}{},
      	)
      
      	if err != nil {
      		log.Fatal(err)
      	}
      
      	doneChan1 := make(chan struct{})
      	go func() {
      		defer close(doneChan1)
      		log.Println("starting changestream loop")
      		for {
      			log.Println("calling cursor next...")
      			if cursor.Next(context.Background()) {
      				event := bson.D{}
      				cursor.Decode(&event)
      				fmt.Println("got event", event)
      			}
      			log.Println("...cursor next returned.")
      			if err := cursor.Err(); err != nil {
      				log.Fatal(err)
      			}
      		}
      	}()
      
      	doneChan2 := make(chan struct{})
      	go func() {
      		defer close(doneChan2)
      		for i := 0; i < 5; i++ {
      			_, err := client.Database(db1).Collection(coll1).InsertOne(context.Background(), bson.D{{"test", i}})
      			if err != nil {
      				log.Fatal(err)
      			}
      			time.Sleep(time.Second)
      		}
      
      		err := client.Database(db1).Drop(context.Background())
      		if err != nil {
      			log.Fatal(err)
      		}
      	}()
      
      	<-doneChan1
      	<-doneChan2
      }
      
      

      A goroutine stack dump while the test is hung shows:

      goroutine 7 [select]:
      go.mongodb.org/mongo-driver/x/mongo/driver/topology.(*Server).update(0xc000139ae0)
      	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/server.go:391 +0x3ec
      created by go.mongodb.org/mongo-driver/x/mongo/driver/topology.(*Server).Connect
      	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/server.go:169 +0x14d
      
      goroutine 12 [runnable]:
      go.mongodb.org/mongo-driver/mongo.(*ChangeStream).emptyBatch(0xc0002a0000, 0xc0000e23dd)
      	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:602 +0x7d
      go.mongodb.org/mongo-driver/mongo.(*ChangeStream).updatePbrtFromCommand(0xc0002a0000)
      	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:294 +0x64
      go.mongodb.org/mongo-driver/mongo.(*ChangeStream).loopNext(0xc0002a0000, 0x1956780, 0xc0000260b0, 0x0)
      	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:564 +0x18c
      go.mongodb.org/mongo-driver/mongo.(*ChangeStream).next(0xc0002a0000, 0x1956780, 0xc0000260b0, 0x0, 0x2)
      	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:529 +0x170
      go.mongodb.org/mongo-driver/mongo.(*ChangeStream).Next(...)
      	/Users/mikeo/go/src/go.mongodb.org/mongo-driver/mongo/change_stream.go:500
      _/tmp/godriver.TestChangestream.func1(0xc00002e900, 0xc0002a0000)
      	/tmp/godriver/cursor_test.go:49 +0x111
      created by _/tmp/godriver.TestChangestream
      	/tmp/godriver/cursor_test.go:44 +0x4a5
      
      

            Assignee:
            kriti.ravindran@mongodb.com Kriti Ravindran (Inactive)
            Reporter:
            mikeo@mongodb.com Michael O'Brien
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved: