using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.Builders;
using Docstoc.Infrastructure;
using log4net;
namespace Docstoc.Harvest.DataAccess.Mongo
{
public class MongoAccessRetryException : Exception
{
public MongoAccessRetryException(int actualTries, string msg) : base(msg)
{
ActualTries = actualTries;
}
public MongoAccessRetryException(int actualTries, string msg, Exception inner) : base(msg, inner)
{
ActualTries = actualTries;
}
public MongoAccessRetryException(int actualTries)
{
ActualTries = actualTries;
}
///
/// int count of how many actual retries
///
public int ActualTries { get; protected set; }
}
public sealed class MongoAccess : IDisposable
{
//1 time caches
static readonly ILog _logger = LogManager.GetLogger(typeof(MongoAccess));
static readonly int RETRIES = 1;
static readonly TimeSpan RETRY_WAIT = TimeSpan.FromMilliseconds(250);
static readonly Regex SAFE_CONN_ERR;
public event Action OnError;
public event Action OnWarning;
#region static ctor
static MongoAccess()
{
try
{
RETRY_WAIT = ConfigurationManager.AppSettings.ExtractValue("MongoRetryWait", RETRY_WAIT);
RETRIES = ConfigurationManager.AppSettings.ExtractValue("MongoRetries", RETRIES);
SAFE_CONN_ERR = new Regex(@"not\s+master", RegexOptions.IgnoreCase | RegexOptions.Compiled);
}
catch (Exception ex)
{
_logger.Error("MongoAccess static CTOR", ex);
throw;
}
}
#endregion
readonly MongoServer _server; //private use only
#region instance ctor
///
/// Constructor that take an appConfig entry key name, storing mongo url.
///
/// appConfig entry key name that store mongo url
public MongoAccess(string configKey) : this(ExtractMongoUrlFromConfig(configKey, null)) { }
///
/// Constructor that take an appConfig entry key name, storing mongo url
/// and an override boolean for slaveOK, meant to be used for readonly slaves in a replica set cluster
///
/// appConfig entry key name that store mongo url
///
public MongoAccess(string configKey, bool slaveOk) : this(ExtractMongoUrlFromConfig(configKey, slaveOk)) { }
static MongoUrl ExtractMongoUrlFromConfig(string configKey, bool? slaveOk)
{
MongoUrl url = null;
try
{
if (!string.IsNullOrEmpty(configKey))
{
string urlk = ConfigurationManager.AppSettings[configKey];
if (string.IsNullOrEmpty(urlk))
throw (new ConfigurationErrorsException("configKey of value \"" + configKey + "\" does not exists!"));
else if (slaveOk == null)
url = new MongoUrl(urlk);
else
{
var msb = new MongoUrlBuilder(urlk);
if (slaveOk.Value)
{
msb.SlaveOk = true;
msb.SafeMode = SafeMode.False;
}
if (!string.IsNullOrEmpty(msb.ReplicaSetName))
msb.ConnectionMode = ConnectionMode.ReplicaSet;
else
msb.ConnectionMode = ConnectionMode.Direct;
url = msb.ToMongoUrl();
}
}
}
catch(Exception ex)
{
_logger.Fatal("ExtractMongoUrlFromConfig", ex);
throw;
}
return url;
}
///
/// Constructor that take MongoUrl object directly.
///
/// MongoUrl of server path
public MongoAccess(MongoUrl serverPath)
{
try
{
_server = serverPath == null ? MongoServer.Create() : MongoServer.Create(serverPath);
}
catch (Exception ex)
{
_logger.Error("MongoAccess CTOR", ex);
if (OnError != null)
OnError(this, ex);
throw;
}
}
#endregion
#region core server command
public void ReConnect()
{
//if (_server.State == MongoServerState.Disconnecting || _server.State == MongoServerState.Disconnected)
if (_server.State == MongoServerState.Disconnected)
_server.Reconnect();
}
///
/// Execute an action of MongoServer object.
/// retries and retryWait are configured by default.
/// If the number of retires are exceeded, an error will be raised.
///
/// Action of MongoServer
/// Number of actual tries
public int Server(Action mongoServer)
{
return Server(mongoServer, RETRIES, RETRY_WAIT);
}
///
/// Execute an action of MongoServer object.
/// If the number of retires are exceeded, an error will be raised.
///
/// Action of MongoServer
/// how many times to retry a failed query due to replication/connection failure
/// how long to wait between each retry attempt
/// Number of actual tries
public int Server(Action mongoServer, int retries, TimeSpan tryWait)
{
if (mongoServer == null)
throw (new ArgumentNullException("mongoServer"));
if (retries < 0)
retries = 0;
if (tryWait < TimeSpan.Zero)
tryWait = TimeSpan.Zero;
int actualTries = 0;
Exception lastError = null;
do
{
try
{
lastError = null;
//if (_server.State == MongoServerState.Disconnecting || _server.State == MongoServerState.Disconnected || _server.State == MongoServerState.Unknown)
if (_server.State == MongoServerState.Disconnected)
{
if (actualTries > 0)
_server.Reconnect();
else
_server.Connect(); //ensure connection
}
mongoServer(_server); //do work...
actualTries++;
break; //on success
}
catch (System.Net.Sockets.SocketException ex)
{
lastError = ReConnect(ex, ref actualTries, tryWait);
}
catch (MongoSafeModeException ex)
{
if (!string.IsNullOrEmpty(ex.Message) && SAFE_CONN_ERR.IsMatch(ex.Message))
lastError = ReConnect(ex, ref actualTries, tryWait);
else //log & rethrow
{
_logger.Error("Server", ex);
Disconnect(ex);
throw;
}
}
catch (TimeoutException ex)
{
lastError = ReConnect(ex, ref actualTries, tryWait == TimeSpan.Zero ? RETRY_WAIT : tryWait);
}
catch (MongoConnectionException ex)
{
lastError = ReConnect(ex, ref actualTries, tryWait);
}
catch (System.IO.IOException ex)
{
if(ex.Message.ToLower().Contains("unable to read data"))
lastError = ReConnect(ex, ref actualTries, tryWait == TimeSpan.Zero ? RETRY_WAIT : tryWait);
else //log & rethrow
{
_logger.Error("Server", ex);
Disconnect(ex);
throw;
}
}
catch (Exception ex) //general catch, log, & rethrow
{
_logger.Error("Server", ex);
Disconnect(ex);
throw;
}
} while (actualTries <= retries); //cut off
if (lastError != null && actualTries > retries)
{
lastError = new MongoAccessRetryException(actualTries, "Exceeded " + retries + " retries.", lastError);
Disconnect(lastError);
throw (lastError);
}
return actualTries;
}
void Disconnect(Exception ex)
{
//NOTE: REMOVE if there is a problem with errors & disconnection --Huy
if (_server.State == MongoServerState.Connected || _server.State == MongoServerState.Connecting)
{
_logger.Error("Server is forcing a volumntary disconnect. This code might cause problem.");
_server.Disconnect(); //force connection refresh...
}
if (OnError != null)
OnError(this, ex);
}
Exception ReConnect(Exception e, ref int actualTries, TimeSpan tryWait)
{
actualTries++;
if (_disposed == 0)
{
_logger.WarnFormat("ReConnect: {0} in {1} s: {2}", actualTries, tryWait, e);
if (tryWait > TimeSpan.Zero)
Thread.Sleep(tryWait);
}
else
_logger.ErrorFormat("ReConnect: {0}: {1}", actualTries, e);
if (OnWarning != null)
OnWarning(this, e);
return e;
}
//reconnect when a specific error has occured
Exception ReConnect(Exception ex, TimeSpan timeout, ref int actualTries)
{
if (_disposed == 0)
{
actualTries++;
_logger.WarnFormat("ReConnect: {0} in {1} s: {2}", actualTries, timeout, ex.Message);
Thread.Sleep(timeout);
}
else
_logger.Error("ReConnect", ex);
if (OnWarning != null)
OnWarning(this, ex);
return ex;
}
#endregion
#region databse command
///
/// Execute an action of MongoDatabase object using provided database name.
/// sticky is false by default. retries and retryWait are configured by default.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// Action of MongoDatabase
/// Number of actual tries
public int Database(string database, Action mongoDB)
{
return Database(database, mongoDB, false, RETRIES, RETRY_WAIT);
}
///
/// Execute an action of MongoDatabase object using provided database name.
/// retries and retryWait are configured by default.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// Action of MongoDatabase
/// boolean to force all operation to stay on a single connection
/// Number of actual tries
public int Database(string database, Action mongoDB, bool sticky)
{
return Database(database, mongoDB, sticky, RETRIES, RETRY_WAIT);
}
///
/// Execute an action of MongoDatabase object using provided database name.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// Action of MongoDatabase
/// boolean to force all operation to stay on a single connection
/// int of how many times to retry a failed query due to replication/connection failure
/// TimeSpan of how long to wait between each retry attempt
/// Number of actual tries
public int Database(string database, Action mongoDB, bool sticky, int retries, TimeSpan tryWait)
{
if (string.IsNullOrEmpty(database))
throw (new ArgumentException("database name can not be null or empty", "database"));
else if (mongoDB == null)
throw (new ArgumentNullException("mongoDB"));
return Server(ms =>
{
MongoDatabase db = null;
int actualTries = 0;
do
{
if (actualTries > 0 && db == null && tryWait > TimeSpan.Zero)
{
if (_logger.IsWarnEnabled)
_logger.WarnFormat("Unable to fetch an instance of mongodb: {0} on try #{1}, sleeping for: {2}", database, actualTries, tryWait);
Thread.Sleep(tryWait);
}
db = ms.GetDatabase(database);
actualTries++;
} while (db != null && actualTries <= retries);
if (db == null)
throw (new MongoConnectionException(string.Format("Unable to fetch an instance of mongodb: {0} after {1} retries.", database, retries)));
if (sticky)
{
using (db.RequestStart())
{
mongoDB(db);
}
}
else
mongoDB(db);
}, retries, tryWait);
}
#endregion
#region collection command
///
/// Execute an action of MongoCollection of type BsonDocument using provided database and collection name.
/// sticky is false by default. retries and retryWait are configured by default.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// string name of the mongo collection
/// Action of MongoCollection
/// Number of actual tries
public int Collection(string database, string collection, Action> mongoCol)
{
return Collection(database, collection, mongoCol, true, RETRIES, RETRY_WAIT);
}
///
/// Execute an action of MongoCollection of type BsonDocument using provided database and collection name.
/// retries and retryWait are configured by default.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// string name of the mongo collection
/// Action of MongoCollection
/// boolean to force all operation to stay on a single connection
/// Number of actual tries
public int Collection(string database, string collection, Action> mongoCol, bool sticky)
{
return Collection(database, collection, mongoCol, sticky, RETRIES, RETRY_WAIT);
}
///
/// Execute an action of MongoCollection of type BsonDocument using provided database and collection name.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// string name of the mongo collection
/// Action of MongoCollection
/// boolean to force all operation to stay on a single connection
/// int of how many times to retry a failed query due to replication/connection failure
/// TimeSpan of how long to wait between each retry attempt
/// Number of actual tries
public int Collection(string database, string collection, Action> mongoCol, bool sticky, int retries, TimeSpan tryWait)
{
return Collection(database, collection, mongoCol, sticky, retries, tryWait);
}
///
/// Execute a generic action of MongoCollection type T object using provided database and collection name.
/// sticky is false by default. retries and retryWait are configured by default.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// string name of the mongo collection
/// Generic Action of MongoCollection type T
/// Number of actual tries
public int Collection(string database, string collection, Action> mongoCol)
{
return Collection(database, collection, mongoCol, false, RETRIES, RETRY_WAIT);
}
///
/// Execute a generic action of MongoCollection type T object using provided database and collection name.
/// retries and retryWait are configured by default.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// string name of the mongo collection
/// Generic Action of MongoCollection type T
/// boolean to force all operation to stay on a single connection
/// Number of actual tries
public int Collection(string database, string collection, Action> mongoCol, bool sticky)
{
return Collection(database, collection, mongoCol, sticky, RETRIES, RETRY_WAIT);
}
///
/// Execute a generic action of MongoCollection type T object using provided database and collection name.
/// If the number of retries are exceeded, an error will be raised.
///
/// string name of the mongo database
/// string name of the mongo collection
/// Generic Action of MongoCollection type T
/// boolean to force all operation to stay on a single connection
/// int of how many times to retry a failed query due to replication/connection failure
/// TimeSpan of how long to wait between each retry attempt
/// Number of actual tries
public int Collection(string database, string collection, Action> mongoCol, bool sticky, int retries, TimeSpan tryWait)
{
if (string.IsNullOrEmpty(collection))
throw (new ArgumentException("collection name can not be null or empty", "collection"));
else if (mongoCol == null)
throw (new ArgumentNullException("mongoCol"));
return Database(database,
db =>
{
using(db.RequestStart())
mongoCol(db.GetCollection(collection));
},
sticky, retries, tryWait);
}
#endregion
#region dispose method
int _disposed = 0;
///
/// Call to close server connection if it is open. Using statement is recommended instead of direct call.
///
public void Dispose()
{
if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0)
{
if (_server != null && (_server.State == MongoServerState.Connected || _server.State == MongoServerState.Connecting))
_server.Disconnect();
}
}
#endregion
}
}