/* Copyright 2010-2013 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using MongoDB.Bson.IO;
using MongoDB.Bson.Serialization;
using MongoDB.Driver.Internal;
using MongoDB.Driver.Operations;
namespace MongoDB.Driver
{
///
/// Represents an instance of a MongoDB server host.
///
public sealed class MongoServerInstance
{
// private static fields
private static int __nextSequentialId;
// public events
///
/// Occurs when the value of the State property changes.
///
public event EventHandler StateChanged;
//internal events
internal event EventHandler AveragePingTimeChanged;
// private fields
private readonly object _serverInstanceLock = new object();
private readonly MongoServerSettings _settings;
private readonly MongoConnectionPool _connectionPool;
private readonly PingTimeAggregator _pingTimeAggregator;
private MongoServerAddress _address;
private Exception _connectException;
private bool _inStateVerification;
private ServerInformation _serverInfo;
private IPEndPoint _ipEndPoint;
private bool _permanentlyDisconnected;
private int _sequentialId;
private MongoServerState _state;
private Timer _stateVerificationTimer;
private MongoConnectionPool.AcquireConnectionOptions _stateVerificationAcquireConnectionOptions;
// constructors
///
/// Initializes a new instance of the class.
///
/// The settings.
/// The address.
internal MongoServerInstance(MongoServerSettings settings, MongoServerAddress address)
{
_settings = settings;
_address = address;
_sequentialId = Interlocked.Increment(ref __nextSequentialId);
_state = MongoServerState.Disconnected;
_serverInfo = new ServerInformation
{
MaxDocumentSize = MongoDefaults.MaxDocumentSize,
MaxMessageLength = MongoDefaults.MaxMessageLength,
InstanceType = MongoServerInstanceType.Unknown
};
_connectionPool = new MongoConnectionPool(this);
_pingTimeAggregator = new PingTimeAggregator(5);
_permanentlyDisconnected = false;
// Console.WriteLine("MongoServerInstance[{0}]: {1}", sequentialId, address);
_stateVerificationAcquireConnectionOptions = new MongoConnectionPool.AcquireConnectionOptions
{
OkToAvoidWaitingByCreatingNewConnection = false,
OkToExceedMaxConnectionPoolSize = true,
OkToExceedWaitQueueSize = true,
WaitQueueTimeout = TimeSpan.FromSeconds(2)
};
}
// internal properties
///
/// Gets the average ping time.
///
internal TimeSpan AveragePingTime
{
get { return _pingTimeAggregator.Average; }
}
///
/// Gets the replica set information.
///
internal ReplicaSetInformation ReplicaSetInformation
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.ReplicaSetInformation;
}
}
}
///
/// Gets the instance type.
///
public MongoServerInstanceType InstanceType
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.InstanceType;
}
}
}
// public properties
///
/// Gets the address of this server instance.
///
public MongoServerAddress Address
{
get
{
lock (_serverInstanceLock)
{
return _address;
}
}
internal set
{
lock (_serverInstanceLock)
{
_address = value;
}
}
}
///
/// Gets the version of this server instance.
///
public MongoServerBuildInfo BuildInfo
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.BuildInfo;
}
}
}
///
/// Gets the exception thrown the last time Connect was called (null if Connect did not throw an exception).
///
public Exception ConnectException
{
get
{
lock (_serverInstanceLock)
{
return _connectException;
}
}
}
///
/// Gets the connection pool for this server instance.
///
public MongoConnectionPool ConnectionPool
{
get { return _connectionPool; }
}
///
/// Gets whether this server instance is an arbiter instance.
///
public bool IsArbiter
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.IsArbiter;
}
}
}
///
/// Gets the result of the most recent ismaster command sent to this server instance.
///
public IsMasterResult IsMasterResult
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.IsMasterResult;
}
}
}
///
/// Gets whether this server instance is a passive instance.
///
public bool IsPassive
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.IsPassive;
}
}
}
///
/// Gets whether this server instance is a primary.
///
public bool IsPrimary
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.IsPrimary;
}
}
}
///
/// Gets whether this server instance is a secondary.
///
public bool IsSecondary
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.IsSecondary;
}
}
}
///
/// Gets the max document size for this server instance.
///
public int MaxDocumentSize
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.MaxDocumentSize;
}
}
}
///
/// Gets the max message length for this server instance.
///
public int MaxMessageLength
{
get
{
lock (_serverInstanceLock)
{
return _serverInfo.MaxMessageLength;
}
}
}
///
/// Gets the unique sequential Id for this server instance.
///
public int SequentialId
{
get { return _sequentialId; }
}
///
/// Gets the server for this server instance.
///
public MongoServerSettings Settings
{
get { return _settings; }
}
///
/// Gets the state of this server instance.
///
public MongoServerState State
{
get
{
lock (_serverInstanceLock)
{
return _state;
}
}
}
// public methods
///
/// Gets the IP end point of this server instance.
///
/// The IP end point of this server instance.
public IPEndPoint GetIPEndPoint()
{
// use a lock free algorithm because DNS lookups are rare and concurrent lookups are tolerable
// the intermediate variable is important to avoid race conditions
var ipEndPoint = Interlocked.CompareExchange(ref _ipEndPoint, null, null);
if (ipEndPoint == null)
{
var addressFamily = _settings.IPv6 ? AddressFamily.InterNetworkV6 : AddressFamily.InterNetwork;
ipEndPoint = _address.ToIPEndPoint(addressFamily);
Interlocked.CompareExchange(ref _ipEndPoint, _ipEndPoint, null);
}
return ipEndPoint;
}
///
/// Checks whether the server is alive (throws an exception if not).
///
public void Ping()
{
var connection = _connectionPool.AcquireConnection(_stateVerificationAcquireConnectionOptions);
try
{
Ping(connection);
}
finally
{
_connectionPool.ReleaseConnection(connection);
}
}
///
/// Verifies the state of the server instance.
///
public void VerifyState()
{
var connection = _connectionPool.AcquireConnection(_stateVerificationAcquireConnectionOptions);
try
{
try
{
Ping(connection);
}
catch
{
// ignore exceptions (if any occured state will already be set to Disconnected)
// Console.WriteLine("MongoServerInstance[{0}]: VerifyState failed: {1}.", sequentialId, ex.Message);
}
try
{
LookupServerInformation(connection);
}
catch
{
// ignore exceptions (if any occured state will already be set to Disconnected)
// Console.WriteLine("MongoServerInstance[{0}]: VerifyState failed: {1}.", sequentialId, ex.Message);
}
}
finally
{
_connectionPool.ReleaseConnection(connection);
}
}
// internal methods
///
/// Acquires the connection.
///
/// A MongoConnection.
internal MongoConnection AcquireConnection()
{
lock (_serverInstanceLock)
{
if (_state != MongoServerState.Connected)
{
var message = string.Format("Server instance {0} is no longer connected.", _address);
throw new InvalidOperationException(message);
}
}
return _connectionPool.AcquireConnection();
}
///
/// Connects this instance.
///
internal void Connect()
{
// Console.WriteLine("MongoServerInstance[{0}]: Connect() called.", sequentialId);
lock (_serverInstanceLock)
{
if (_permanentlyDisconnected || _state == MongoServerState.Connecting || _state == MongoServerState.Connected)
{
return;
}
_connectException = null;
// set the state manually here because SetState raises an event that shouldn't be raised
// while holding a lock.
_state = MongoServerState.Connecting;
}
// We know for certain that the state just changed
OnStateChanged();
try
{
var connection = _connectionPool.AcquireConnection();
try
{
try
{
Ping(connection);
}
catch(Exception ex)
{
//ignore the ping exception
}
try
{
LookupServerInformation(connection);
}
catch (Exception ex)
{
throw new Exception("Unable to connect to instance", ex);
}
SetState(MongoServerState.Connected);
}
finally
{
_connectionPool.ReleaseConnection(connection);
}
}
catch (Exception ex)
{
lock (_serverInstanceLock)
{
_connectException = ex;
}
_connectionPool.Clear();
Interlocked.Exchange(ref _connectException, ex);
SetState(MongoServerState.Disconnected);
throw;
}
finally
{
lock (_serverInstanceLock)
{
if (_stateVerificationTimer == null)
{
_stateVerificationTimer = new Timer(o => StateVerificationTimerCallback(), null, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10));
}
}
}
}
///
/// Disconnects this instance.
///
internal void Disconnect()
{
// Console.WriteLine("MongoServerInstance[{0}]: Disconnect called.", sequentialId);
lock (_serverInstanceLock)
{
if (_stateVerificationTimer != null)
{
_stateVerificationTimer.Dispose();
_stateVerificationTimer = null;
}
if (_state == MongoServerState.Disconnecting || _state == MongoServerState.Disconnected)
{
return;
}
// set the state here because SetState raises an event that should not be raised while holding a lock
_state = MongoServerState.Disconnecting;
}
// we know for certain state has just changed.
OnStateChanged();
try
{
_connectionPool.Clear();
}
finally
{
SetState(MongoServerState.Disconnected);
}
}
///
/// Disconnects this instance permanently.
///
internal void DisconnectPermanently()
{
lock (_serverInstanceLock)
{
_permanentlyDisconnected = true;
}
Disconnect();
}
///
/// Releases the connection.
///
/// The connection.
internal void ReleaseConnection(MongoConnection connection)
{
_connectionPool.ReleaseConnection(connection);
}
///
/// Sets the state.
///
/// The state.
internal void SetState(MongoServerState state)
{
lock (_serverInstanceLock)
{
if (_state == state)
{
return;
}
_state = state;
}
OnStateChanged();
}
// private methods
private void LookupServerInformation(MongoConnection connection)
{
IsMasterResult isMasterResult = null;
bool ok = false;
try
{
var isMasterCommand = new CommandDocument("ismaster", 1);
isMasterResult = RunCommandAs(connection, "admin", isMasterCommand);
MongoServerBuildInfo buildInfo;
try
{
var buildInfoCommand = new CommandDocument("buildinfo", 1);
var buildInfoResult = RunCommandAs(connection, "admin", buildInfoCommand);
buildInfo = MongoServerBuildInfo.FromCommandResult(buildInfoResult);
}
catch (MongoCommandException ex)
{
// short term fix: if buildInfo fails due to auth we don't know the server version; see CSHARP-324
if (ex.CommandResult.ErrorMessage != "need to login")
{
throw;
}
buildInfo = null;
}
ReplicaSetInformation replicaSetInformation = null;
MongoServerInstanceType instanceType = MongoServerInstanceType.StandAlone;
if (isMasterResult.IsReplicaSet)
{
var peers = isMasterResult.Hosts.Concat(isMasterResult.Passives).Concat(isMasterResult.Arbiters).ToList();
replicaSetInformation = new ReplicaSetInformation(isMasterResult.ReplicaSetName, isMasterResult.Primary, peers, isMasterResult.Tags);
instanceType = MongoServerInstanceType.ReplicaSetMember;
}
else if (isMasterResult.Message != null && isMasterResult.Message == "isdbgrid")
{
instanceType = MongoServerInstanceType.ShardRouter;
}
var newServerInfo = new ServerInformation
{
BuildInfo = buildInfo,
InstanceType = instanceType,
IsArbiter = isMasterResult.IsArbiterOnly,
IsMasterResult = isMasterResult,
IsPassive = isMasterResult.IsPassive,
IsPrimary = isMasterResult.IsPrimary,
IsSecondary = isMasterResult.IsSecondary,
MaxDocumentSize = isMasterResult.MaxBsonObjectSize,
MaxMessageLength = isMasterResult.MaxMessageLength,
ReplicaSetInformation = replicaSetInformation
};
MongoServerState currentState;
lock (_serverInstanceLock)
{
currentState = _state;
}
SetState(currentState, newServerInfo);
ok = true;
}
finally
{
if (!ok)
{
ServerInformation currentServerInfo;
lock (_serverInstanceLock)
{
currentServerInfo = _serverInfo;
}
// keep the current instance type, build info, and replica set info
// as these aren't relevent to state and are likely still correct.
var newServerInfo = new ServerInformation
{
BuildInfo = currentServerInfo.BuildInfo,
InstanceType = currentServerInfo.InstanceType,
IsArbiter = false,
IsMasterResult = isMasterResult,
IsPassive = false,
IsPrimary = false,
IsSecondary = false,
MaxDocumentSize = currentServerInfo.MaxDocumentSize,
MaxMessageLength = currentServerInfo.MaxMessageLength,
ReplicaSetInformation = currentServerInfo.ReplicaSetInformation
};
SetState(MongoServerState.Disconnected, newServerInfo);
}
}
}
private void OnAveragePingTimeChanged()
{
if (AveragePingTimeChanged != null)
{
try { AveragePingTimeChanged(this, EventArgs.Empty); }
catch { } // ignore exceptions
}
}
private void OnStateChanged()
{
if (StateChanged != null)
{
try { StateChanged(this, null); }
catch { } // ignore exceptions
}
}
private void Ping(MongoConnection connection)
{
try
{
var pingCommand = new CommandDocument("ping", 1);
Stopwatch stopwatch = Stopwatch.StartNew();
RunCommandAs(connection, "admin", pingCommand);
stopwatch.Stop();
var currentAverage = _pingTimeAggregator.Average;
_pingTimeAggregator.Include(stopwatch.Elapsed);
var newAverage = _pingTimeAggregator.Average;
if (currentAverage != newAverage)
{
OnAveragePingTimeChanged();
}
}
catch
{
_pingTimeAggregator.Clear();
SetState(MongoServerState.Disconnected);
throw;
}
}
private TCommandResult RunCommandAs(MongoConnection connection, string databaseName, IMongoCommand command)
where TCommandResult : CommandResult
{
var readerSettings = new BsonBinaryReaderSettings();
var writerSettings = new BsonBinaryWriterSettings();
var resultSerializer = BsonSerializer.LookupSerializer(typeof(TCommandResult));
var commandOperation = new CommandOperation(
databaseName,
readerSettings,
writerSettings,
command,
QueryFlags.SlaveOk,
null, // options
null, // readPreference
null, // serializationOptions
resultSerializer);
return commandOperation.Execute(connection);
}
private void StateVerificationTimerCallback()
{
if (_inStateVerification)
{
return;
}
_inStateVerification = true;
try
{
var connection = _connectionPool.AcquireConnection(_stateVerificationAcquireConnectionOptions);
try
{
try
{
Ping(connection);
}
catch
{
//Ignore internal ping Exception we absolutely want to have the LookupServerInformation to be called
}
LookupServerInformation(connection);
ThreadPool.QueueUserWorkItem(o => _connectionPool.MaintainPoolSize());
SetState(MongoServerState.Connected);
}
finally
{
_connectionPool.ReleaseConnection(connection);
}
}
catch { } // this is called in a timer thread and we don't want any exceptions escaping
finally
{
_inStateVerification = false;
}
}
/// This method must be called outside of a lock.
private void SetState(MongoServerState newState, ServerInformation newServerInfo)
{
bool raiseChangedEvent = false;
lock (_serverInstanceLock)
{
if (_state != newState)
{
_state = newState;
raiseChangedEvent = true;
}
if (newState == MongoServerState.Disconnected)
{
_connectionPool.Clear();
}
if (_serverInfo != newServerInfo && _serverInfo.IsDifferentFrom(newServerInfo))
{
_serverInfo = newServerInfo;
raiseChangedEvent = true;
}
}
if (raiseChangedEvent)
{
OnStateChanged();
}
}
// NOTE: while all these properties are mutable, it is purely for ease of use. This class is used as an immutable class.
private class ServerInformation
{
public MongoServerBuildInfo BuildInfo { get; set; }
public MongoServerInstanceType InstanceType { get; set; }
public bool IsArbiter { get; set; }
public IsMasterResult IsMasterResult { get; set; }
public bool IsPassive { get; set; }
public bool IsPrimary { get; set; }
public bool IsSecondary { get; set; }
public int MaxDocumentSize { get; set; }
public int MaxMessageLength { get; set; }
public ReplicaSetInformation ReplicaSetInformation { get; set; }
public bool IsDifferentFrom(ServerInformation other)
{
if (InstanceType != other.InstanceType)
{
return true;
}
if (IsPrimary != other.IsPrimary)
{
return true;
}
if (IsSecondary != other.IsSecondary)
{
return true;
}
if (IsPassive != other.IsPassive)
{
return true;
}
if (IsArbiter != other.IsArbiter)
{
return true;
}
if (MaxDocumentSize != other.MaxDocumentSize)
{
return true;
}
if (MaxMessageLength != other.MaxMessageLength)
{
return true;
}
if ((ReplicaSetInformation == null && other.ReplicaSetInformation != null) || (ReplicaSetInformation != other.ReplicaSetInformation))
{
return true;
}
return false;
}
}
}
}