diff --git a/src/github.com/mongodb/mongo-tools/vendor/github.com/mongodb/mongo-tools-common/archive/demultiplexer.go b/src/github.com/mongodb/mongo-tools/vendor/github.com/mongodb/mongo-tools-common/archive/demultiplexer.go index a2fce90..af8cb18 100644 --- a/src/github.com/mongodb/mongo-tools/vendor/github.com/mongodb/mongo-tools-common/archive/demultiplexer.go +++ b/src/github.com/mongodb/mongo-tools/vendor/github.com/mongodb/mongo-tools-common/archive/demultiplexer.go @@ -26,6 +26,7 @@ import ( type DemuxOut interface { Write([]byte) (int, error) Close() error + Closing() error Sum64() (uint64, bool) } @@ -139,6 +140,7 @@ func (demux *Demultiplexer) HeaderBSON(buf []byte) error { if rcr, ok := demux.outs[demux.currentNamespace].(*RegularCollectionReceiver); ok { rcr.err = io.EOF } + demux.outs[demux.currentNamespace].Closing() demux.outs[demux.currentNamespace].Close() demux.NamespaceStatus[demux.currentNamespace] = NamespaceClosed length := int64(demux.lengths[demux.currentNamespace]) @@ -244,6 +246,7 @@ type RegularCollectionReceiver struct { closeOnce sync.Once openOnce sync.Once err error + demuxOutFinished bool } func (receiver *RegularCollectionReceiver) Sum64() (uint64, bool) { @@ -344,10 +347,20 @@ func (receiver *RegularCollectionReceiver) Write(buf []byte) (int, error) { return len(buf), nil } +// Closing is part of the DemuxOut interface. It only set the demuxOutFinished flag +func (receiver *RegularCollectionReceiver) Closing() error { + receiver.demuxOutFinished = true + return nil +} + // Close is part of the DemuxOut as well as the intents.file interface. It only closes the readLenChan, as that is what will // cause the RegularCollectionReceiver.Read() to receive EOF // Close will get called twice, once in the demultiplexer, and again when the restore goroutine is done with its intent.file func (receiver *RegularCollectionReceiver) Close() error { + // intent.file should not close readLenChan before demultiplexer because it may still write to this + if !receiver.demuxOutFinished { + return nil + } receiver.closeOnce.Do(func() { close(receiver.readLenChan) // make sure that we don't return until any reader has finished @@ -378,6 +391,11 @@ func (cache *SpecialCollectionCache) Open() error { return nil } +// Closing is part of the DemuxOut interface, and does nothing +func (cache *SpecialCollectionCache) Closing() error { + return nil +} + // Close is part of the both interfaces, and it does nothing func (cache *SpecialCollectionCache) Close() error { cache.Intent.Size = int64(cache.buf.Len()) @@ -421,6 +439,11 @@ func (*MutedCollection) Write(b []byte) (int, error) { return len(b), nil } +// Closing is part of the DemuxOut interface, and does nothing +func (*MutedCollection) Closing() error { + return nil +} + // Close is part of the intents.file interface, and does nothing func (*MutedCollection) Close() error { return nil