using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Text.RegularExpressions; using System.Threading; using MongoDB.Bson; using MongoDB.Driver; using MongoDB.Driver.Builders; using Docstoc.Infrastructure; using log4net; namespace Docstoc.Harvest.DataAccess.Mongo { public class MongoAccessRetryException : Exception { public MongoAccessRetryException(int actualTries, string msg) : base(msg) { ActualTries = actualTries; } public MongoAccessRetryException(int actualTries, string msg, Exception inner) : base(msg, inner) { ActualTries = actualTries; } public MongoAccessRetryException(int actualTries) { ActualTries = actualTries; } /// /// int count of how many actual retries /// public int ActualTries { get; protected set; } } public sealed class MongoAccess : IDisposable { //1 time caches static readonly ILog _logger = LogManager.GetLogger(typeof(MongoAccess)); static readonly int RETRIES = 1; static readonly TimeSpan RETRY_WAIT = TimeSpan.FromMilliseconds(250); static readonly Regex SAFE_CONN_ERR; public event Action OnError; public event Action OnWarning; #region static ctor static MongoAccess() { try { RETRY_WAIT = ConfigurationManager.AppSettings.ExtractValue("MongoRetryWait", RETRY_WAIT); RETRIES = ConfigurationManager.AppSettings.ExtractValue("MongoRetries", RETRIES); SAFE_CONN_ERR = new Regex(@"not\s+master", RegexOptions.IgnoreCase | RegexOptions.Compiled); } catch (Exception ex) { _logger.Error("MongoAccess static CTOR", ex); throw; } } #endregion readonly MongoServer _server; //private use only #region instance ctor /// /// Constructor that take an appConfig entry key name, storing mongo url. /// /// appConfig entry key name that store mongo url public MongoAccess(string configKey) : this(ExtractMongoUrlFromConfig(configKey, null)) { } /// /// Constructor that take an appConfig entry key name, storing mongo url /// and an override boolean for slaveOK, meant to be used for readonly slaves in a replica set cluster /// /// appConfig entry key name that store mongo url /// public MongoAccess(string configKey, bool slaveOk) : this(ExtractMongoUrlFromConfig(configKey, slaveOk)) { } static MongoUrl ExtractMongoUrlFromConfig(string configKey, bool? slaveOk) { MongoUrl url = null; try { if (!string.IsNullOrEmpty(configKey)) { string urlk = ConfigurationManager.AppSettings[configKey]; if (string.IsNullOrEmpty(urlk)) throw (new ConfigurationErrorsException("configKey of value \"" + configKey + "\" does not exists!")); else if (slaveOk == null) url = new MongoUrl(urlk); else { var msb = new MongoUrlBuilder(urlk); if (slaveOk.Value) { msb.SlaveOk = true; msb.SafeMode = SafeMode.False; } if (!string.IsNullOrEmpty(msb.ReplicaSetName)) msb.ConnectionMode = ConnectionMode.ReplicaSet; else msb.ConnectionMode = ConnectionMode.Direct; url = msb.ToMongoUrl(); } } } catch(Exception ex) { _logger.Fatal("ExtractMongoUrlFromConfig", ex); throw; } return url; } /// /// Constructor that take MongoUrl object directly. /// /// MongoUrl of server path public MongoAccess(MongoUrl serverPath) { try { _server = serverPath == null ? MongoServer.Create() : MongoServer.Create(serverPath); } catch (Exception ex) { _logger.Error("MongoAccess CTOR", ex); if (OnError != null) OnError(this, ex); throw; } } #endregion #region core server command public void ReConnect() { //if (_server.State == MongoServerState.Disconnecting || _server.State == MongoServerState.Disconnected) if (_server.State == MongoServerState.Disconnected) _server.Reconnect(); } /// /// Execute an action of MongoServer object. /// retries and retryWait are configured by default. /// If the number of retires are exceeded, an error will be raised. /// /// Action of MongoServer /// Number of actual tries public int Server(Action mongoServer) { return Server(mongoServer, RETRIES, RETRY_WAIT); } /// /// Execute an action of MongoServer object. /// If the number of retires are exceeded, an error will be raised. /// /// Action of MongoServer /// how many times to retry a failed query due to replication/connection failure /// how long to wait between each retry attempt /// Number of actual tries public int Server(Action mongoServer, int retries, TimeSpan tryWait) { if (mongoServer == null) throw (new ArgumentNullException("mongoServer")); if (retries < 0) retries = 0; if (tryWait < TimeSpan.Zero) tryWait = TimeSpan.Zero; int actualTries = 0; Exception lastError = null; do { try { lastError = null; //if (_server.State == MongoServerState.Disconnecting || _server.State == MongoServerState.Disconnected || _server.State == MongoServerState.Unknown) if (_server.State == MongoServerState.Disconnected) { if (actualTries > 0) _server.Reconnect(); else _server.Connect(); //ensure connection } mongoServer(_server); //do work... actualTries++; break; //on success } catch (System.Net.Sockets.SocketException ex) { lastError = ReConnect(ex, ref actualTries, tryWait); } catch (MongoSafeModeException ex) { if (!string.IsNullOrEmpty(ex.Message) && SAFE_CONN_ERR.IsMatch(ex.Message)) lastError = ReConnect(ex, ref actualTries, tryWait); else //log & rethrow { _logger.Error("Server", ex); Disconnect(ex); throw; } } catch (TimeoutException ex) { lastError = ReConnect(ex, ref actualTries, tryWait == TimeSpan.Zero ? RETRY_WAIT : tryWait); } catch (MongoConnectionException ex) { lastError = ReConnect(ex, ref actualTries, tryWait); } catch (System.IO.IOException ex) { if(ex.Message.ToLower().Contains("unable to read data")) lastError = ReConnect(ex, ref actualTries, tryWait == TimeSpan.Zero ? RETRY_WAIT : tryWait); else //log & rethrow { _logger.Error("Server", ex); Disconnect(ex); throw; } } catch (Exception ex) //general catch, log, & rethrow { _logger.Error("Server", ex); Disconnect(ex); throw; } } while (actualTries <= retries); //cut off if (lastError != null && actualTries > retries) { lastError = new MongoAccessRetryException(actualTries, "Exceeded " + retries + " retries.", lastError); Disconnect(lastError); throw (lastError); } return actualTries; } void Disconnect(Exception ex) { //NOTE: REMOVE if there is a problem with errors & disconnection --Huy if (_server.State == MongoServerState.Connected || _server.State == MongoServerState.Connecting) { _logger.Error("Server is forcing a volumntary disconnect. This code might cause problem."); _server.Disconnect(); //force connection refresh... } if (OnError != null) OnError(this, ex); } Exception ReConnect(Exception e, ref int actualTries, TimeSpan tryWait) { actualTries++; if (_disposed == 0) { _logger.WarnFormat("ReConnect: {0} in {1} s: {2}", actualTries, tryWait, e); if (tryWait > TimeSpan.Zero) Thread.Sleep(tryWait); } else _logger.ErrorFormat("ReConnect: {0}: {1}", actualTries, e); if (OnWarning != null) OnWarning(this, e); return e; } //reconnect when a specific error has occured Exception ReConnect(Exception ex, TimeSpan timeout, ref int actualTries) { if (_disposed == 0) { actualTries++; _logger.WarnFormat("ReConnect: {0} in {1} s: {2}", actualTries, timeout, ex.Message); Thread.Sleep(timeout); } else _logger.Error("ReConnect", ex); if (OnWarning != null) OnWarning(this, ex); return ex; } #endregion #region databse command /// /// Execute an action of MongoDatabase object using provided database name. /// sticky is false by default. retries and retryWait are configured by default. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// Action of MongoDatabase /// Number of actual tries public int Database(string database, Action mongoDB) { return Database(database, mongoDB, false, RETRIES, RETRY_WAIT); } /// /// Execute an action of MongoDatabase object using provided database name. /// retries and retryWait are configured by default. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// Action of MongoDatabase /// boolean to force all operation to stay on a single connection /// Number of actual tries public int Database(string database, Action mongoDB, bool sticky) { return Database(database, mongoDB, sticky, RETRIES, RETRY_WAIT); } /// /// Execute an action of MongoDatabase object using provided database name. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// Action of MongoDatabase /// boolean to force all operation to stay on a single connection /// int of how many times to retry a failed query due to replication/connection failure /// TimeSpan of how long to wait between each retry attempt /// Number of actual tries public int Database(string database, Action mongoDB, bool sticky, int retries, TimeSpan tryWait) { if (string.IsNullOrEmpty(database)) throw (new ArgumentException("database name can not be null or empty", "database")); else if (mongoDB == null) throw (new ArgumentNullException("mongoDB")); return Server(ms => { MongoDatabase db = null; int actualTries = 0; do { if (actualTries > 0 && db == null && tryWait > TimeSpan.Zero) { if (_logger.IsWarnEnabled) _logger.WarnFormat("Unable to fetch an instance of mongodb: {0} on try #{1}, sleeping for: {2}", database, actualTries, tryWait); Thread.Sleep(tryWait); } db = ms.GetDatabase(database); actualTries++; } while (db != null && actualTries <= retries); if (db == null) throw (new MongoConnectionException(string.Format("Unable to fetch an instance of mongodb: {0} after {1} retries.", database, retries))); if (sticky) { using (db.RequestStart()) { mongoDB(db); } } else mongoDB(db); }, retries, tryWait); } #endregion #region collection command /// /// Execute an action of MongoCollection of type BsonDocument using provided database and collection name. /// sticky is false by default. retries and retryWait are configured by default. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// string name of the mongo collection /// Action of MongoCollection /// Number of actual tries public int Collection(string database, string collection, Action> mongoCol) { return Collection(database, collection, mongoCol, true, RETRIES, RETRY_WAIT); } /// /// Execute an action of MongoCollection of type BsonDocument using provided database and collection name. /// retries and retryWait are configured by default. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// string name of the mongo collection /// Action of MongoCollection /// boolean to force all operation to stay on a single connection /// Number of actual tries public int Collection(string database, string collection, Action> mongoCol, bool sticky) { return Collection(database, collection, mongoCol, sticky, RETRIES, RETRY_WAIT); } /// /// Execute an action of MongoCollection of type BsonDocument using provided database and collection name. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// string name of the mongo collection /// Action of MongoCollection /// boolean to force all operation to stay on a single connection /// int of how many times to retry a failed query due to replication/connection failure /// TimeSpan of how long to wait between each retry attempt /// Number of actual tries public int Collection(string database, string collection, Action> mongoCol, bool sticky, int retries, TimeSpan tryWait) { return Collection(database, collection, mongoCol, sticky, retries, tryWait); } /// /// Execute a generic action of MongoCollection type T object using provided database and collection name. /// sticky is false by default. retries and retryWait are configured by default. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// string name of the mongo collection /// Generic Action of MongoCollection type T /// Number of actual tries public int Collection(string database, string collection, Action> mongoCol) { return Collection(database, collection, mongoCol, false, RETRIES, RETRY_WAIT); } /// /// Execute a generic action of MongoCollection type T object using provided database and collection name. /// retries and retryWait are configured by default. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// string name of the mongo collection /// Generic Action of MongoCollection type T /// boolean to force all operation to stay on a single connection /// Number of actual tries public int Collection(string database, string collection, Action> mongoCol, bool sticky) { return Collection(database, collection, mongoCol, sticky, RETRIES, RETRY_WAIT); } /// /// Execute a generic action of MongoCollection type T object using provided database and collection name. /// If the number of retries are exceeded, an error will be raised. /// /// string name of the mongo database /// string name of the mongo collection /// Generic Action of MongoCollection type T /// boolean to force all operation to stay on a single connection /// int of how many times to retry a failed query due to replication/connection failure /// TimeSpan of how long to wait between each retry attempt /// Number of actual tries public int Collection(string database, string collection, Action> mongoCol, bool sticky, int retries, TimeSpan tryWait) { if (string.IsNullOrEmpty(collection)) throw (new ArgumentException("collection name can not be null or empty", "collection")); else if (mongoCol == null) throw (new ArgumentNullException("mongoCol")); return Database(database, db => { using(db.RequestStart()) mongoCol(db.GetCollection(collection)); }, sticky, retries, tryWait); } #endregion #region dispose method int _disposed = 0; /// /// Call to close server connection if it is open. Using statement is recommended instead of direct call. /// public void Dispose() { if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0) { if (_server != null && (_server.State == MongoServerState.Connected || _server.State == MongoServerState.Connecting)) _server.Disconnect(); } } #endregion } }