class Mongo::Cluster
Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.
@since 2.0.0
Copyright (C) 2018-2019 MongoDB, Inc.
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Constants
- CLUSTER_TIME
The cluster time key in responses from mongos servers.
@since 2.5.0
- IDLE_WRITE_PERIOD_SECONDS
How often an idle primary writes a no-op to the oplog.
@since 2.4.0
- MAX_READ_RETRIES
The default number of mongos read retries.
@since 2.1.1
- MAX_WRITE_RETRIES
The default number of mongos write retries.
@since 2.4.2
- READ_RETRY_INTERVAL
The default mongos read retry interval, in seconds.
@since 2.1.1
Attributes
@return [ Mongo::Server::AppMetadata ] The application metadata, used for connection
handshakes.
@since 2.4.0
@return [ BSON::Document ] The latest cluster time seen.
@since 2.5.0
@return [ Monitoring ] monitoring The monitoring.
@return [ Hash ] The options hash.
@api private
@return [ Array<String> ] The addresses of seed servers. Contains
addresses that were given to Cluster when it was instantiated, not current addresses that the cluster is using as a result of SDAM.
@since 2.7.0 @api private
@private
@since 2.5.1
@return [ Object ] The cluster topology.
Public Class Methods
Create a cluster for the provided client, for use when we don't want the client's original cluster instance to be the same.
@api private
@example Create a cluster for the client.
Cluster.create(client)
@param [ Client ] client The client to create on.
@return [ Cluster ] The cluster.
@since 2.0.0
# File lib/mongo/cluster.rb, line 203 def self.create(client) cluster = Cluster.new( client.cluster.addresses.map(&:to_s), Monitoring.new, client.cluster_options, ) client.instance_variable_set(:@cluster, cluster) end
Finalize the cluster for garbage collection.
@example Finalize the cluster.
Cluster.finalize(pools)
@param [ Hash<Address, Server::ConnectionPool> ] pools Ignored. @param [ PeriodicExecutor ] periodic_executor The periodic executor. @param [ SessionPool ] #session_pool The session pool.
@return [ Proc ] The Finalizer.
@since 2.2.0
# File lib/mongo/cluster.rb, line 359 def self.finalize(pools, periodic_executor, session_pool) proc do session_pool.end_sessions periodic_executor.stop! end end
Instantiate the new cluster.
@api private
@example Instantiate the cluster.
Mongo::Cluster.new(["127.0.0.1:27017"], monitoring)
@note Cluster should never be directly instantiated outside of a Client.
@note When connecting to a mongodb+srv:// URI, the client expands such a
URI into a list of servers and passes that list to the Cluster constructor. When connecting to a standalone mongod, the Cluster constructor receives the corresponding address as an array of one string.
@param [ Array<String> ] seeds The addresses of the configured servers @param [ Monitoring ] monitoring The monitoring. @param [ Hash ] options Options. Client constructor forwards its
options to Cluster constructor, although Cluster recognizes only a subset of the options recognized by Client.
@option options [ true, false ] :scan Whether to scan all seeds
in constructor. The default in driver version 2.x is to do so; driver version 3.x will not scan seeds in constructor. Opt in to the new behavior by setting this option to false. *Note:* setting this option to nil enables scanning seeds in constructor in driver version 2.x. Driver version 3.x will recognize this option but will ignore it and will never scan seeds in the constructor.
@option options [ true, false ] :monitoring_io For internal driver
use only. Set to false to prevent SDAM-related I/O from being done by this cluster or servers under it. Note: setting this option to false will make the cluster non-functional. It is intended for use in tests which manually invoke SDAM state transitions.
@since 2.0.0
# File lib/mongo/cluster.rb, line 90 def initialize(seeds, monitoring, options = Options::Redacted.new) if options[:monitoring_io] != false && !options[:server_selection_semaphore] raise ArgumentError, 'Need server selection semaphore' end @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new @options = options.freeze @app_metadata = Server::AppMetadata.new(@options) @update_lock = Mutex.new @sdam_flow_lock = Mutex.new @cluster_time = nil @cluster_time_lock = Mutex.new @topology = Topology.initial(self, monitoring, options) Session::SessionPool.create(self) # The opening topology is always unknown with no servers. # https://github.com/mongodb/specifications/pull/388 opening_topology = Topology::Unknown.new(options, monitoring, self) publish_sdam_event( Monitoring::TOPOLOGY_OPENING, Monitoring::Event::TopologyOpening.new(opening_topology) ) subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) @seeds = seeds servers = seeds.map do |seed| # Server opening events must be sent after topology change events. # Therefore separate server addition, done here before topoolgy change # event is published, from starting to monitor the server which is # done later. add(seed, monitor: false) end if seeds.size >= 1 # Recreate the topology to get the current server list into it @topology = topology.class.new(topology.options, topology.monitoring, self) publish_sdam_event( Monitoring::TOPOLOGY_CHANGED, Monitoring::Event::TopologyChanged.new(opening_topology, @topology) ) end servers.each do |server| server.start_monitoring end if options[:monitoring_io] == false # Omit periodic executor construction, because without servers # no commands can be sent to the cluster and there shouldn't ever # be anything that needs to be cleaned up. # # Also omit legacy single round of SDAM on the main thread, # as it would race with tests that mock SDAM responses. return end @cursor_reaper = CursorReaper.new @socket_reaper = SocketReaper.new(self) @periodic_executor = PeriodicExecutor.new(@cursor_reaper, @socket_reaper) @periodic_executor.run! ObjectSpace.define_finalizer(self, self.class.finalize({}, @periodic_executor, @session_pool)) @connecting = false @connected = true if options[:scan] != false server_selection_timeout = options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT # The server selection timeout can be very short especially in # tests, when the client waits for a synchronous scan before # starting server selection. Limiting the scan to server selection time # then aborts the scan before it can process even local servers. # Therefore, allow at least 3 seconds for the scan here. if server_selection_timeout < 3 server_selection_timeout = 3 end start_time = Time.now deadline = start_time + server_selection_timeout # Wait for the first scan of each server to complete, for # backwards compatibility. # If any servers are discovered during this SDAM round we are going to # wait for these servers to also be queried, and so on, up to the # server selection timeout or the 3 second minimum. loop do servers = servers_list.dup if servers.all? { |server| server.description.last_update_time >= start_time } break end if (time_remaining = deadline - Time.now) <= 0 break end options[:server_selection_semaphore].wait(time_remaining) end end end
Public Instance Methods
Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.
@example Is the cluster equal to the object?
cluster == other
@param [ Object ] other The object to compare to.
@return [ true, false ] If the objects are equal.
@since 2.0.0
# File lib/mongo/cluster.rb, line 471 def ==(other) return false unless other.is_a?(Cluster) addresses == other.addresses && options.merge(server_selection_semaphore: nil) == other.options.merge(server_selection_semaphore: nil) end
Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.
@example Add the server for the address to the cluster.
cluster.add('127.0.0.1:27018')
@param [ String ] host The address of the server to add.
@option options [ Boolean ] :monitor For internal driver use only:
whether to monitor the newly added server.
@return [ Server ] The newly added server, if not present already.
@since 2.0.0
# File lib/mongo/cluster.rb, line 575 def add(host, add_options=nil) address = Address.new(host, options) if !addresses.include?(address) server = Server.new(address, self, @monitoring, event_listeners, options.merge( monitor: false)) @update_lock.synchronize { @servers.push(server) } if add_options.nil? || add_options[:monitor] != false server.start_monitoring end server end end
The addresses in the cluster.
@example Get the addresses in the cluster.
cluster.addresses
@return [ Array<Mongo::Address> ] The addresses.
@since 2.0.6
# File lib/mongo/cluster.rb, line 306 def addresses servers_list.map(&:address).dup end
Whether the cluster object is connected to its cluster.
@return [ true|false ] Whether the cluster is connected.
@api private @since 2.7.0
# File lib/mongo/cluster.rb, line 281 def connected? !!@connected end
Disconnect all servers.
@note Applications should call Mongo::Client#close to disconnect from the cluster rather than calling this method. This method is for internal driver use only.
@example Disconnect the cluster's servers.
cluster.disconnect!
@param [ Boolean ] wait Whether to wait for background threads to
finish running.
@return [ true ] Always true.
@since 2.1.0
# File lib/mongo/cluster.rb, line 381 def disconnect!(wait=false) unless @connecting || @connected return true end @periodic_executor.stop! @servers.each do |server| if server.connected? server.disconnect!(wait) publish_sdam_event( Monitoring::SERVER_CLOSED, Monitoring::Event::ServerClosed.new(server.address, topology) ) end end publish_sdam_event( Monitoring::TOPOLOGY_CLOSED, Monitoring::Event::TopologyClosed.new(topology) ) @connecting = @connected = false true end
Determine if the cluster would select a readable server for the provided read preference.
@example Is a readable server present?
topology.has_readable_server?(server_selector)
@param [ ServerSelector ] server_selector The server
selector.
@return [ true, false ] If a readable server is present.
@since 2.4.0
# File lib/mongo/cluster.rb, line 490 def has_readable_server?(server_selector = nil) topology.has_readable_server?(self, server_selector) end
Determine if the cluster would select a writable server.
@example Is a writable server present?
topology.has_writable_server?
@return [ true, false ] If a writable server is present.
@since 2.4.0
# File lib/mongo/cluster.rb, line 502 def has_writable_server? topology.has_writable_server?(self) end
Get the nicer formatted string for use in inspection.
@example Inspect the cluster.
cluster.inspect
@return [ String ] The cluster inspection.
@since 2.0.0
# File lib/mongo/cluster.rb, line 328 def inspect "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.summary}>" end
Get the maximum number of times the cluster can retry a read operation on a mongos.
@example Get the max read retries.
cluster.max_read_retries
@return [ Integer ] The maximum retries.
@since 2.1.1
# File lib/mongo/cluster.rb, line 258 def max_read_retries options[:max_read_retries] || MAX_READ_RETRIES end
Get the next primary server we can send an operation to.
@example Get the next primary server.
cluster.next_primary
@param [ true, false ] ping Whether to ping the server before selection. Deprecated,
not necessary with the implementation of the Server Selection specification.
@return [ Mongo::Server ] A primary server.
@since 2.0.0
# File lib/mongo/cluster.rb, line 518 def next_primary(ping = true) @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY) @primary_selector.select_server(self) end
Get the connection pool for the server.
@example Get the connection pool.
cluster.pool(server)
@param [ Server ] server The server.
@return [ Server::ConnectionPool ] The connection pool.
@since 2.2.0 @deprecated
# File lib/mongo/cluster.rb, line 534 def pool(server) server.pool end
Get the interval, in seconds, in which a mongos read operation is retried.
@example Get the read retry interval.
cluster.read_retry_interval
@return [ Float ] The interval.
@since 2.1.1
# File lib/mongo/cluster.rb, line 271 def read_retry_interval options[:read_retry_interval] || READ_RETRY_INTERVAL end
Reconnect all servers.
@example Reconnect the cluster's servers.
cluster.reconnect!
@return [ true ] Always true.
@since 2.1.0 @deprecated Use Mongo::Client#reconnect to reconnect to the cluster instead of
calling this method. This method does not send SDAM events.
# File lib/mongo/cluster.rb, line 413 def reconnect! @connecting = true scan! servers.each do |server| server.reconnect! end @periodic_executor.restart! @connecting = false @connected = true end
Remove the server from the cluster for the provided address, if it exists.
@example Remove the server from the cluster.
server.remove('127.0.0.1:27017')
@param [ String ] host The host/port or socket address.
@return [ true|false ] Whether any servers were removed.
@since 2.0.0, return value added in 2.7.0
# File lib/mongo/cluster.rb, line 599 def remove(host) address = Address.new(host) removed_servers = @servers.select { |s| s.address == address } @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each do |server| if server.connected? server.disconnect! publish_sdam_event( Monitoring::SERVER_CLOSED, Monitoring::Event::ServerClosed.new(address, topology) ) end end removed_servers.any? end
Force a scan of all known servers in the cluster.
If the sync parameter is true which is the default, the scan is performed synchronously in the thread which called this method. Each server in the cluster is checked sequentially. If there are many servers in the cluster or they are slow to respond, this can be a long running operation.
If the sync parameter is false, this method instructs all server monitor threads to perform an immediate scan and returns without waiting for scan results.
@note In both synchronous and asynchronous scans, each monitor
thread maintains a minimum interval between scans, meaning calling this method may not initiate a scan on a particular server the very next instant.
@example Force a full cluster scan.
cluster.scan!
@return [ true ] Always true.
@since 2.0.0
# File lib/mongo/cluster.rb, line 447 def scan!(sync=true) if sync servers_list.each do |server| server.scan! end else servers_list.each do |server| server.monitor.scan_semaphore.signal end end true end
@api private
# File lib/mongo/cluster.rb, line 343 def server_selection_semaphore options[:server_selection_semaphore] end
Get a list of server candidates from the cluster that can have operations executed on them.
@example Get the server candidates for an operation.
cluster.servers
@return [ Array<Server> ] The candidate servers.
@since 2.0.0
# File lib/mongo/cluster.rb, line 294 def servers topology.servers(servers_list.compact).compact end
@api private
# File lib/mongo/cluster.rb, line 626 def servers_list @update_lock.synchronize { @servers.dup } end
@note This method is experimental and subject to change.
@api experimental @since 2.7.0
# File lib/mongo/cluster.rb, line 336 def summary "#<Cluster " + "topology=#{topology.summary} "+ "servers=[#{servers_list.map(&:summary).join(',')}]>" end
Update the max cluster time seen in a response.
@example Update the cluster time.
cluster.update_cluster_time(result)
@param [ Operation::Result ] result The operation result containing the cluster time.
@return [ Object ] The cluster time.
@since 2.5.0
# File lib/mongo/cluster.rb, line 548 def update_cluster_time(result) if cluster_time_doc = result.cluster_time @cluster_time_lock.synchronize do if @cluster_time.nil? @cluster_time = cluster_time_doc elsif cluster_time_doc[CLUSTER_TIME] > @cluster_time[CLUSTER_TIME] @cluster_time = cluster_time_doc end end end end
@api private
# File lib/mongo/cluster.rb, line 616 def update_topology(new_topology) old_topology = topology @topology = new_topology publish_sdam_event( Monitoring::TOPOLOGY_CHANGED, Monitoring::Event::TopologyChanged.new(old_topology, topology) ) end
Private Instance Methods
If options is set, validates that session and returns it. If deployment supports sessions, creates a new session and returns it. The session is implicit unless options is given. If deployment does not support session, returns nil.
@note This method will return nil if deployment has no data-bearing
servers at the time of the call.
# File lib/mongo/cluster.rb, line 642 def get_session(client, options = {}) return options[:session].validate!(self) if options[:session] if sessions_supported? Session.new(@session_pool.checkout, client, { implicit: true }.merge(options)) end end
Returns whether the deployment (as this term is defined in the sessions spec) supports sessions.
@note If the cluster has no data bearing servers, for example because
the deployment is in the middle of a failover, this method returns false.
# File lib/mongo/cluster.rb, line 662 def sessions_supported? if topology.data_bearing_servers? return !!topology.logical_session_timeout end begin ServerSelector.get(mode: :primary_preferred).select_server(self) !!topology.logical_session_timeout rescue Error::NoServerAvailable false end end
# File lib/mongo/cluster.rb, line 649 def with_session(client, options = {}) session = get_session(client, options) yield(session) ensure session.end_session if (session && session.implicit?) end