使用 Ruby 从零构建基于 mTLS 的 Paxos 一致性模块


我们面临一个棘手的部署场景:一组内部 Ruby 服务需要共享一个极小但要求强一致性的状态,例如一个动态的特性开关配置或一个服务的领导者信息。整个集群部署在零信任网络环境中,任何节点间的通信都必须被视为不安全。引入 etcd 或 ZooKeeper 这样的重型协调服务,会给我们的纯 Ruby 技术栈带来额外的运维负担和技术异构性。我们需要一个轻量级、内嵌式、并且本质安全的解决方案。

最终的决定是:用 Ruby 从零开始构建一个精简的、基于 Paxos 算法的分布式一致性模块。而“本质安全”这一要求,意味着双向 TLS (mTLS) 必须成为其通信的基石,而不是一个事后添加的功能。这不仅仅是一个算法实现,更是一次关于如何在非理想环境中构建可信组件的工程实践。

第一阶段:构建不可伪造的身份与信道 (mTLS)

在讨论 Paxos 的任何细节之前,我们必须先解决节点间身份认证和通信加密的问题。如果一个恶意节点可以伪装成集群成员,或者窃听正在协商的提议,那么任何一致性算法都将毫无意义。mTLS 是解决这个问题的标准方案,它要求通信双方都出示并验证对方的证书。

第一步是建立我们自己的小型证书颁发机构 (CA)。在真实项目中,这通常由专门的 PKI 设施管理,但为了构建一个自包含的系统,我们用 Ruby 的 OpenSSL 库来手动完成这个过程。

# scripts/generate_certs.rb
require 'openssl'
require 'fileutils'

# 清理并创建目录
FileUtils.rm_rf('./certs')
FileUtils.mkdir_p('./certs/ca')
FileUtils.mkdir_p('./certs/nodes')

# 1. 生成 CA 根证书
ca_key = OpenSSL::PKey::RSA.new(2048)
ca_cert = OpenSSL::X509::Certificate.new
ca_cert.version = 2
ca_cert.serial = Time.now.to_i
ca_cert.subject = OpenSSL::X509::Name.parse('/CN=MyPaxosCA')
ca_cert.issuer = ca_cert.subject
ca_cert.public_key = ca_key.public_key
ca_cert.not_before = Time.now
ca_cert.not_after = ca_cert.not_before + 2 * 365 * 24 * 60 * 60 # 2 years
ca_cert.add_extension(OpenSSL::X509::Extension.new('subjectKeyIdentifier', 'hash'))
ca_cert.add_extension(OpenSSL::X509::Extension.new('basicConstraints', 'CA:TRUE', true))
ca_cert.sign(ca_key, OpenSSL::Digest::SHA256.new)

File.write('./certs/ca/ca_key.pem', ca_key.to_pem)
File.write('./certs/ca/ca_cert.pem', ca_cert.to_pem)

puts 'CA certificate and key generated.'

# 2. 为每个节点生成证书,并由 CA 签署
NODE_COUNT = 3
(1..NODE_COUNT).each do |i|
  node_key = OpenSSL::PKey::RSA.new(2048)
  node_cert_req = OpenSSL::X509::Request.new
  node_cert_req.version = 0
  # Common Name (CN) 是节点的唯一标识
  node_cert_req.subject = OpenSSL::X509::Name.parse("/CN=node#{i}")
  node_cert_req.public_key = node_key.public_key
  node_cert_req.sign(node_key, OpenSSL::Digest::SHA256.new)

  node_cert = OpenSSL::X509::Certificate.new
  node_cert.version = 2
  node_cert.serial = Time.now.to_i + i
  node_cert.subject = node_cert_req.subject
  node_cert.issuer = ca_cert.subject
  node_cert.public_key = node_key.public_key
  node_cert.not_before = Time.now
  node_cert.not_after = node_cert.not_before + 1 * 365 * 24 * 60 * 60 # 1 year
  
  # 关键:设置证书用途为客户端和服务器认证
  ext_factory = OpenSSL::X509::ExtensionFactory.new
  ext_factory.subject_certificate = node_cert
  ext_factory.issuer_certificate = ca_cert
  node_cert.add_extension(ext_factory.create_extension('subjectKeyIdentifier', 'hash'))
  node_cert.add_extension(ext_factory.create_extension('keyUsage', 'digitalSignature,keyEncipherment'))
  node_cert.add_extension(ext_factory.create_extension('extendedKeyUsage', 'serverAuth,clientAuth'))
  
  node_cert.sign(ca_key, OpenSSL::Digest::SHA256.new)

  node_dir = "./certs/nodes/node#{i}"
  FileUtils.mkdir_p(node_dir)
  File.write("#{node_dir}/key.pem", node_key.to_pem)
  File.write("#{node_dir}/cert.pem", node_cert.to_pem)
  
  puts "Generated certificate for node#{i}."
end

这个脚本是整个安全体系的基石。它创建了一个根 CA,并为 3 个节点分别签发了证书。每个节点的证书 CN (Common Name) 字段,如 node1,将成为它在集群中的唯一身份标识。

接下来是实现 mTLS 通信层。我们需要一个 TlsServer 来接受连接,和一个 TlsClient 来发起连接。这里的核心在于正确配置 OpenSSL::SSL::SSLContext

# lib/secure_transport.rb
require 'openssl'
require 'socket'
require 'json'

# 通信层的核心,建立并管理 mTLS 连接
module SecureTransport
  class TlsServer
    attr_reader :node_id

    def initialize(node_id, port, peers)
      @node_id = node_id
      @port = port
      @peers = peers # 用于验证客户端身份
      @server = nil
      @logger = Logger.new(STDOUT)
      @logger.formatter = proc { |severity, datetime, progname, msg| "[#{datetime.strftime('%Y-%m-%d %H:%M:%S')}] [#{@node_id}] [SERVER] #{msg}\n" }
    end

    def start(&block)
      setup_context
      tcp_server = TCPServer.new(@port)
      @server = OpenSSL::SSL::SSLServer.new(tcp_server, @ctx)
      @logger.info("Listening on port #{@port} with mTLS enabled.")
      
      loop do
        Thread.new(@server.accept) do |client_socket|
          begin
            client_cn = client_socket.peer_cert.subject.to_a.find { |name, _, _| name == 'CN' }[1]
            
            # 这里的坑在于:必须严格验证客户端的 CN 是否在已知的对等节点列表中
            unless @peers.keys.include?(client_cn)
              @logger.warn("Rejected connection from unknown CN: #{client_cn}")
              client_socket.close
              next
            end
            
            @logger.info("Accepted connection from #{client_cn}")
            yield client_socket, client_cn
          rescue OpenSSL::SSL::SSLError => e
            @logger.error("SSL Error accepting connection: #{e.message}")
          ensure
            client_socket.close if client_socket && !client_socket.closed?
          end
        end
      end
    end

    private

    def setup_context
      @ctx = OpenSSL::SSL::SSLContext.new
      # 服务端必须提供自己的证书和私钥
      @ctx.cert = OpenSSL::X509::Certificate.new(File.read("./certs/nodes/#{@node_id}/cert.pem"))
      @ctx.key = OpenSSL::PKey::RSA.new(File.read("./certs/nodes/#{@node_id}/key.pem"))
      
      # 关键:配置 mTLS
      # 1. 设置CA证书,用于验证客户端证书的合法性
      @ctx.ca_file = './certs/ca/ca_cert.pem'
      # 2. 设置验证模式为 VERIFY_PEER,并强制客户端必须提供证书
      @ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
    end
  end

  class TlsClient
    def self.request(node_id, peer_id, peer_address, message)
      ctx = OpenSSL::SSL::SSLContext.new
      ctx.cert = OpenSSL::X509::Certificate.new(File.read("./certs/nodes/#{node_id}/cert.pem"))
      ctx.key = OpenSSL::PKey::RSA.new(File.read("./certs/nodes/#{node_id}/key.pem"))
      ctx.ca_file = './certs/ca/ca_cert.pem'
      ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER

      host, port = peer_address.split(':')
      tcp_socket = TCPSocket.new(host, port.to_i)
      ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ctx)
      
      # 客户端也必须验证服务端的CN,防止中间人攻击
      ssl_socket.hostname = peer_id 
      
      begin
        ssl_socket.connect
        ssl_socket.puts(message.to_json)
        response_json = ssl_socket.gets
        JSON.parse(response_json) if response_json
      rescue OpenSSL::SSL::SSLError => e
        Logger.new(STDOUT).error("[#{node_id}] [CLIENT] SSL Error connecting to #{peer_id}: #{e.message}")
        nil
      ensure
        ssl_socket.close if ssl_socket && !ssl_socket.closed?
      end
    end
  end
end

现在我们有了一个安全的通信层。任何节点间的通信都经过了加密和双向身份验证。这是进入 Paxos 协议实现的前提。

第二阶段:实现 Paxos 核心逻辑

Paxos 算法的核心是两阶段提交(Prepare-Promise, Propose-Accepted),以确保在可能出现网络分区和节点故障的情况下,所有节点最终对一个值达成共识。在一个 Multi-Paxos 系统中,我们实际上是在为一系列日志条目(instance)分别运行 Paxos 算法。

我们的节点将同时扮演 Proposer, Acceptor, 和 Learner 三个角色。

# lib/paxos_node.rb
require 'thread'
require 'logger'
require_relative 'secure_transport'

class PaxosNode
  include SecureTransport

  # Paxos 消息结构
  Message = Struct.new(:type, :instance, :proposal_number, :proposal_value, :from)

  def initialize(node_id, port, peers)
    @node_id = node_id
    @port = port
    @peers = peers # e.g., { 'node2' => 'localhost:8002', 'node3' => 'localhost:8003' }
    @quorum_size = (@peers.size + 1) / 2 + 1

    # Acceptor 状态 (持久化)
    # key: instance_id, value: highest proposal number promised
    @highest_promises = {} 
    # key: instance_id, value: { proposal_number: n, value: v }
    @accepted_values = {}
    @state_mutex = Mutex.new

    # Proposer 状态
    @proposal_number = 0
    @proposal_mutex = Mutex.new
    
    # Replicated Log (Learner 状态)
    @log = {}
    @log_mutex = Mutex.new
    @highest_log_instance = -1
    
    @logger = Logger.new(STDOUT)
    @logger.formatter = proc { |severity, datetime, progname, msg| "[#{datetime.strftime('%Y-%m-%d %H:%M:%S')}] [#{@node_id}] #{msg}\n" }

    @server = TlsServer.new(@node_id, @port, @peers.merge({@node_id => "localhost:#{@port}"}))
  end

  def start
    @logger.info("Node starting up...")
    # 在后台线程中启动 mTLS 服务器来接收来自其他节点的消息
    Thread.new do
      @server.start do |socket, client_cn|
        handle_connection(socket)
      end
    end
  end
  
  # 对外暴露的接口,用于提交一个新的值
  def propose(value)
    instance_to_propose = @log_mutex.synchronize { @highest_log_instance + 1 }
    
    @proposal_mutex.synchronize do
      # 保证 proposal number 的唯一性和单调递增
      # 格式: [timestamp, node_id_integer]
      @proposal_number = [Time.now.to_f, @node_id.gsub('node','').to_i]
    end
    
    # --- Phase 1a: Proposer sends Prepare ---
    prepare_message = Message.new(:prepare, instance_to_propose, @proposal_number, nil, @node_id).to_h
    @logger.info("Proposing value '#{value}' for instance #{instance_to_propose} with proposal N=#{@proposal_number.inspect}")
    
    promises = broadcast(prepare_message)
    
    # --- Phase 1b: Proposer waits for a quorum of Promises ---
    if promises.size < @quorum_size
      @logger.warn("Failed to get a quorum for Prepare phase. Got #{promises.size}, need #{@quorum_size}.")
      return false
    end
    
    @logger.info("Got quorum of promises (#{promises.size})")

    # 检查收到的 promises。如果任何一个 promise 包含了之前已接受的值,
    # 必须选择 proposal number 最高的那个值作为本次提议的值。
    # 这是 Paxos 算法保证安全性的关键。
    highest_accepted_promise = promises.compact.select { |p| p['accepted_value'] }.max_by { |p| p['proposal_number'] }

    value_to_propose = if highest_accepted_promise
      @logger.info("Found a previously accepted value from promises. Adopting it.")
      highest_accepted_promise['accepted_value']
    else
      value
    end

    # --- Phase 2a: Proposer sends Propose (Accept Request) ---
    propose_message = Message.new(:propose, instance_to_propose, @proposal_number, value_to_propose, @node_id).to_h
    acceptances = broadcast(propose_message)
    
    # --- Phase 2b: Proposer waits for a quorum of Accepted ---
    if acceptances.size < @quorum_size
      @logger.warn("Failed to get a quorum for Propose phase. Got #{acceptances.size}, need #{@quorum_size}.")
      return false
    end
    
    @logger.info("Value '#{value_to_propose}' accepted by a quorum for instance #{instance_to_propose}. Consensus reached.")
    
    # --- Learning Phase ---
    # 广播给所有节点(包括自己),告知最终结果
    learn_message = Message.new(:learn, instance_to_propose, nil, value_to_propose, @node_id).to_h
    broadcast(learn_message)
    learn(instance_to_propose, value_to_propose) # 自己也学习
    
    true
  end
  
  def get_log
    @log_mutex.synchronize { @log.dup }
  end

  private
  
  def handle_connection(socket)
    raw_message = socket.gets
    return unless raw_message
    
    message = JSON.parse(raw_message)
    response = case message['type'].to_sym
               when :prepare
                 handle_prepare(message)
               when :propose
                 handle_propose(message)
               when :learn
                 learn(message['instance'], message['proposal_value'])
                 { status: 'learned' }
               else
                 { error: 'unknown message type' }
               end
    socket.puts(response.to_json)
  end

  # --- Acceptor Logic ---
  def handle_prepare(message)
    @state_mutex.synchronize do
      instance = message['instance']
      proposal_number = message['proposal_number']
      
      @highest_promises[instance] ||= [-1.0, -1] # Initialize with a very small number

      if proposal_number > @highest_promises[instance]
        @logger.info("Promise for instance #{instance}, N=#{proposal_number.inspect}")
        @highest_promises[instance] = proposal_number
        # 持久化状态到磁盘(此处省略,但在生产系统中是必须的)
        
        # 返回 Promise,如果之前有接受过值,需要一并返回
        return { 
          status: 'promise', 
          proposal_number: @accepted_values[instance] ? @accepted_values[instance][:proposal_number] : nil,
          accepted_value: @accepted_values[instance] ? @accepted_values[instance][:value] : nil
        }
      else
        @logger.warn("Reject Prepare for instance #{instance}, N=#{proposal_number.inspect} is not higher than #{@highest_promises[instance].inspect}")
        return { status: 'rejected' }
      end
    end
  end
  
  def handle_propose(message)
    @state_mutex.synchronize do
      instance = message['instance']
      proposal_number = message['proposal_number']
      value = message['proposal_value']
      
      # 只有当提议号大于或等于我们承诺过的最高提议号时,才接受
      if proposal_number >= @highest_promises[instance]
        @logger.info("Accepted value '#{value}' for instance #{instance}, N=#{proposal_number.inspect}")
        @accepted_values[instance] = { proposal_number: proposal_number, value: value }
        # 持久化状态到磁盘
        return { status: 'accepted' }
      else
        @logger.warn("Reject Propose for instance #{instance}, N=#{proposal_number.inspect} is not >= #{@highest_promises[instance].inspect}")
        return { status: 'rejected' }
      end
    end
  end

  # --- Learner Logic ---
  def learn(instance, value)
    @log_mutex.synchronize do
      return if @log.key?(instance) # 避免重复学习
      @log[instance] = value
      @highest_log_instance = [@highest_log_instance, instance].max
      @logger.info("Learned value for instance #{instance}: '#{value}'")
      puts "Current Log: #{@log.sort.to_h}"
    end
  end
  
  # --- Network Communication ---
  def broadcast(message)
    responses = []
    threads = @peers.map do |peer_id, peer_address|
      Thread.new do
        response = TlsClient.request(@node_id, peer_id, peer_address, message)
        # 只收集成功的响应
        responses << response if response && (response['status'] == 'promise' || response['status'] == 'accepted')
      end
    end
    threads.each(&:join)
    
    # 也要处理自己作为 Acceptor 的情况
    self_response = case message[:type]
                    when :prepare then handle_prepare(message.transform_keys(&:to_s))
                    when :propose then handle_propose(message.transform_keys(&:to_s))
                    else nil
                    end
    responses << self_response if self_response && (self_response[:status] == 'promise' || self_response[:status] == 'accepted')

    responses.compact
  end
end

这段代码是整个系统的核心。它将 mTLS 通信与 Paxos 的状态机逻辑结合在一起。一个常见的错误是在实现 handle_preparehandle_propose 时,没有使用互斥锁(Mutex)来保护共享状态,这在高并发下会导致状态错乱,破坏一致性。另一个关键点是在广播后,Proposer 必须将自己的响应也计算在内,因为它自己也是一个 Acceptor。

下面是 Paxos 完整协商过程的流程图:

sequenceDiagram
    participant Proposer
    participant A1 as Acceptor 1
    participant A2 as Acceptor 2
    participant A3 as Acceptor 3

    Proposer->>+A1: Prepare(N=1)
    Proposer->>+A2: Prepare(N=1)
    Proposer->>+A3: Prepare(N=1)

    A1-->>-Proposer: Promise(N=1, prev_accepted=nil)
    A2-->>-Proposer: Promise(N=1, prev_accepted=nil)
    Note right of Proposer: Quorum (2/3) reached!

    A3-->>-Proposer: Promise(N=1, prev_accepted=nil)

    Proposer->>+A1: Propose(N=1, V='config_A')
    Proposer->>+A2: Propose(N=1, V='config_A')
    Proposer->>+A3: Propose(N=1, V='config_A')

    A1-->>-Proposer: Accepted(N=1, V='config_A')
    A2-->>-Proposer: Accepted(N=1, V='config_A')
    Note right of Proposer: Quorum (2/3) reached! Consensus achieved.

    A3-->>-Proposer: Accepted(N=1, V='config_A')
    
    Proposer->>A1: Learn(V='config_A')
    Proposer->>A2: Learn(V='config_A')
    Proposer->>A3: Learn(V='config_A')

第三阶段:组装并运行集群

现在,我们可以把所有部分组装起来。下面的启动脚本会根据传入的参数启动一个节点。

# run_node.rb
require_relative 'lib/paxos_node'

unless ARGV.length == 2
  puts "Usage: ruby run_node.rb <node_id> <port>"
  puts "Example: ruby run_node.rb node1 8001"
  exit 1
end

NODE_ID = ARGV[0]
PORT = ARGV[1].to_i

# 在真实项目中,这个配置应该来自配置文件或服务发现
PEERS = {
  'node1' => 'localhost:8001',
  'node2' => 'localhost:8002',
  'node3' => 'localhost:8003'
}

# 从集群中移除自己
peers_without_self = PEERS.reject { |id, _| id == NODE_ID }

node = PaxosNode.new(NODE_ID, PORT, peers_without_self)
node.start

# 交互式命令行,用于测试
puts "Node #{NODE_ID} is running. Type 'propose <value>' or 'log' or 'exit'."
loop do
  print "> "
  input = gets.chomp.split(' ', 2)
  command = input[0]
  
  case command
  when 'propose'
    if input[1]
      # 在一个单独的线程中发起提议,避免阻塞主线程
      Thread.new { node.propose(input[1]) }
    else
      puts "Usage: propose <value>"
    end
  when 'log'
    puts node.get_log.sort.to_h.inspect
  when 'exit'
    exit 0
  else
    puts "Unknown command: #{command}" unless command.empty?
  end
end

要运行这个集群,首先确保已生成证书 (ruby scripts/generate_certs.rb)。然后打开三个终端窗口,分别启动三个节点:

# Terminal 1
ruby run_node.rb node1 8001

# Terminal 2
ruby run_node.rb node2 8002

# Terminal 3
ruby run_node.rb node3 8003

启动后,在 node1 的终端里输入 propose apples。你会看到 node1 的日志输出了 Paxos 的各个阶段,最终达成共识。然后,在 node2node3 的终端里输入 log,你会看到它们也已经学习到了这个值:{0=>"apples"}。接着,在 node2 终端里输入 propose oranges,所有节点上的日志都会更新为 {0=>"apples", 1=>"oranges"}。这证明了我们的安全复制日志已经工作。

局限性与未来迭代方向

这个从零构建的模块验证了核心设计,但在投入生产前,还有几个重要的工程问题需要解决。

首先,领导者选举。目前的实现中,任何节点都可以发起提议,这在多个节点同时提议时可能导致活锁(livelock),即提议号不断升高但没有提议被接受。工业级的 Paxos 实现(如 Raft)通常会选举一个稳定的领导者,只有领导者可以发起提议,这大大简化了流程并提高了效率。我们的系统可以通过在 Paxos 日志本身之上实现一个租约机制来选举领导者。

其次,状态持久化@highest_promises@accepted_values 是 Acceptor 的关键状态,必须被持久化到磁盘。如果节点崩溃重启,它必须能恢复这些状态,否则可能破坏之前已达成的共识,违反 Paxos 的安全性保证。

第三,日志压缩。随着系统运行,日志会无限增长。必须实现快照(snapshotting)机制,定期将日志应用到某个状态机,然后将旧日志截断,以释放空间。

最后,集群成员变更。当前的节点列表是静态的。在真实环境中,我们需要动态地增加或移除节点。这是一个复杂的分布式一致性问题,通常也需要通过在 Paxos 日志中提交成员变更配置来实现。

尽管存在这些局限,但这个实践过程本身极具价值。它不仅深入到了 Paxos 算法的实现细节,更重要的是,它将安全通信(mTLS)作为系统设计的一等公民,构建了一个在不可信网络中具备基本生存能力的一致性内核。对于需要轻量级、嵌入式、强一致性且技术栈统一的场景,这条路径提供了一个可行的起点。


  目录