我们面临一个棘手的部署场景:一组内部 Ruby 服务需要共享一个极小但要求强一致性的状态,例如一个动态的特性开关配置或一个服务的领导者信息。整个集群部署在零信任网络环境中,任何节点间的通信都必须被视为不安全。引入 etcd 或 ZooKeeper 这样的重型协调服务,会给我们的纯 Ruby 技术栈带来额外的运维负担和技术异构性。我们需要一个轻量级、内嵌式、并且本质安全的解决方案。
最终的决定是:用 Ruby 从零开始构建一个精简的、基于 Paxos 算法的分布式一致性模块。而“本质安全”这一要求,意味着双向 TLS (mTLS) 必须成为其通信的基石,而不是一个事后添加的功能。这不仅仅是一个算法实现,更是一次关于如何在非理想环境中构建可信组件的工程实践。
第一阶段:构建不可伪造的身份与信道 (mTLS)
在讨论 Paxos 的任何细节之前,我们必须先解决节点间身份认证和通信加密的问题。如果一个恶意节点可以伪装成集群成员,或者窃听正在协商的提议,那么任何一致性算法都将毫无意义。mTLS 是解决这个问题的标准方案,它要求通信双方都出示并验证对方的证书。
第一步是建立我们自己的小型证书颁发机构 (CA)。在真实项目中,这通常由专门的 PKI 设施管理,但为了构建一个自包含的系统,我们用 Ruby 的 OpenSSL
库来手动完成这个过程。
# scripts/generate_certs.rb
require 'openssl'
require 'fileutils'
# 清理并创建目录
FileUtils.rm_rf('./certs')
FileUtils.mkdir_p('./certs/ca')
FileUtils.mkdir_p('./certs/nodes')
# 1. 生成 CA 根证书
ca_key = OpenSSL::PKey::RSA.new(2048)
ca_cert = OpenSSL::X509::Certificate.new
ca_cert.version = 2
ca_cert.serial = Time.now.to_i
ca_cert.subject = OpenSSL::X509::Name.parse('/CN=MyPaxosCA')
ca_cert.issuer = ca_cert.subject
ca_cert.public_key = ca_key.public_key
ca_cert.not_before = Time.now
ca_cert.not_after = ca_cert.not_before + 2 * 365 * 24 * 60 * 60 # 2 years
ca_cert.add_extension(OpenSSL::X509::Extension.new('subjectKeyIdentifier', 'hash'))
ca_cert.add_extension(OpenSSL::X509::Extension.new('basicConstraints', 'CA:TRUE', true))
ca_cert.sign(ca_key, OpenSSL::Digest::SHA256.new)
File.write('./certs/ca/ca_key.pem', ca_key.to_pem)
File.write('./certs/ca/ca_cert.pem', ca_cert.to_pem)
puts 'CA certificate and key generated.'
# 2. 为每个节点生成证书,并由 CA 签署
NODE_COUNT = 3
(1..NODE_COUNT).each do |i|
node_key = OpenSSL::PKey::RSA.new(2048)
node_cert_req = OpenSSL::X509::Request.new
node_cert_req.version = 0
# Common Name (CN) 是节点的唯一标识
node_cert_req.subject = OpenSSL::X509::Name.parse("/CN=node#{i}")
node_cert_req.public_key = node_key.public_key
node_cert_req.sign(node_key, OpenSSL::Digest::SHA256.new)
node_cert = OpenSSL::X509::Certificate.new
node_cert.version = 2
node_cert.serial = Time.now.to_i + i
node_cert.subject = node_cert_req.subject
node_cert.issuer = ca_cert.subject
node_cert.public_key = node_key.public_key
node_cert.not_before = Time.now
node_cert.not_after = node_cert.not_before + 1 * 365 * 24 * 60 * 60 # 1 year
# 关键:设置证书用途为客户端和服务器认证
ext_factory = OpenSSL::X509::ExtensionFactory.new
ext_factory.subject_certificate = node_cert
ext_factory.issuer_certificate = ca_cert
node_cert.add_extension(ext_factory.create_extension('subjectKeyIdentifier', 'hash'))
node_cert.add_extension(ext_factory.create_extension('keyUsage', 'digitalSignature,keyEncipherment'))
node_cert.add_extension(ext_factory.create_extension('extendedKeyUsage', 'serverAuth,clientAuth'))
node_cert.sign(ca_key, OpenSSL::Digest::SHA256.new)
node_dir = "./certs/nodes/node#{i}"
FileUtils.mkdir_p(node_dir)
File.write("#{node_dir}/key.pem", node_key.to_pem)
File.write("#{node_dir}/cert.pem", node_cert.to_pem)
puts "Generated certificate for node#{i}."
end
这个脚本是整个安全体系的基石。它创建了一个根 CA,并为 3 个节点分别签发了证书。每个节点的证书 CN
(Common Name) 字段,如 node1
,将成为它在集群中的唯一身份标识。
接下来是实现 mTLS 通信层。我们需要一个 TlsServer
来接受连接,和一个 TlsClient
来发起连接。这里的核心在于正确配置 OpenSSL::SSL::SSLContext
。
# lib/secure_transport.rb
require 'openssl'
require 'socket'
require 'json'
# 通信层的核心,建立并管理 mTLS 连接
module SecureTransport
class TlsServer
attr_reader :node_id
def initialize(node_id, port, peers)
@node_id = node_id
@port = port
@peers = peers # 用于验证客户端身份
@server = nil
@logger = Logger.new(STDOUT)
@logger.formatter = proc { |severity, datetime, progname, msg| "[#{datetime.strftime('%Y-%m-%d %H:%M:%S')}] [#{@node_id}] [SERVER] #{msg}\n" }
end
def start(&block)
setup_context
tcp_server = TCPServer.new(@port)
@server = OpenSSL::SSL::SSLServer.new(tcp_server, @ctx)
@logger.info("Listening on port #{@port} with mTLS enabled.")
loop do
Thread.new(@server.accept) do |client_socket|
begin
client_cn = client_socket.peer_cert.subject.to_a.find { |name, _, _| name == 'CN' }[1]
# 这里的坑在于:必须严格验证客户端的 CN 是否在已知的对等节点列表中
unless @peers.keys.include?(client_cn)
@logger.warn("Rejected connection from unknown CN: #{client_cn}")
client_socket.close
next
end
@logger.info("Accepted connection from #{client_cn}")
yield client_socket, client_cn
rescue OpenSSL::SSL::SSLError => e
@logger.error("SSL Error accepting connection: #{e.message}")
ensure
client_socket.close if client_socket && !client_socket.closed?
end
end
end
end
private
def setup_context
@ctx = OpenSSL::SSL::SSLContext.new
# 服务端必须提供自己的证书和私钥
@ctx.cert = OpenSSL::X509::Certificate.new(File.read("./certs/nodes/#{@node_id}/cert.pem"))
@ctx.key = OpenSSL::PKey::RSA.new(File.read("./certs/nodes/#{@node_id}/key.pem"))
# 关键:配置 mTLS
# 1. 设置CA证书,用于验证客户端证书的合法性
@ctx.ca_file = './certs/ca/ca_cert.pem'
# 2. 设置验证模式为 VERIFY_PEER,并强制客户端必须提供证书
@ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER | OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
end
end
class TlsClient
def self.request(node_id, peer_id, peer_address, message)
ctx = OpenSSL::SSL::SSLContext.new
ctx.cert = OpenSSL::X509::Certificate.new(File.read("./certs/nodes/#{node_id}/cert.pem"))
ctx.key = OpenSSL::PKey::RSA.new(File.read("./certs/nodes/#{node_id}/key.pem"))
ctx.ca_file = './certs/ca/ca_cert.pem'
ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
host, port = peer_address.split(':')
tcp_socket = TCPSocket.new(host, port.to_i)
ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ctx)
# 客户端也必须验证服务端的CN,防止中间人攻击
ssl_socket.hostname = peer_id
begin
ssl_socket.connect
ssl_socket.puts(message.to_json)
response_json = ssl_socket.gets
JSON.parse(response_json) if response_json
rescue OpenSSL::SSL::SSLError => e
Logger.new(STDOUT).error("[#{node_id}] [CLIENT] SSL Error connecting to #{peer_id}: #{e.message}")
nil
ensure
ssl_socket.close if ssl_socket && !ssl_socket.closed?
end
end
end
end
现在我们有了一个安全的通信层。任何节点间的通信都经过了加密和双向身份验证。这是进入 Paxos 协议实现的前提。
第二阶段:实现 Paxos 核心逻辑
Paxos 算法的核心是两阶段提交(Prepare-Promise, Propose-Accepted),以确保在可能出现网络分区和节点故障的情况下,所有节点最终对一个值达成共识。在一个 Multi-Paxos 系统中,我们实际上是在为一系列日志条目(instance)分别运行 Paxos 算法。
我们的节点将同时扮演 Proposer, Acceptor, 和 Learner 三个角色。
# lib/paxos_node.rb
require 'thread'
require 'logger'
require_relative 'secure_transport'
class PaxosNode
include SecureTransport
# Paxos 消息结构
Message = Struct.new(:type, :instance, :proposal_number, :proposal_value, :from)
def initialize(node_id, port, peers)
@node_id = node_id
@port = port
@peers = peers # e.g., { 'node2' => 'localhost:8002', 'node3' => 'localhost:8003' }
@quorum_size = (@peers.size + 1) / 2 + 1
# Acceptor 状态 (持久化)
# key: instance_id, value: highest proposal number promised
@highest_promises = {}
# key: instance_id, value: { proposal_number: n, value: v }
@accepted_values = {}
@state_mutex = Mutex.new
# Proposer 状态
@proposal_number = 0
@proposal_mutex = Mutex.new
# Replicated Log (Learner 状态)
@log = {}
@log_mutex = Mutex.new
@highest_log_instance = -1
@logger = Logger.new(STDOUT)
@logger.formatter = proc { |severity, datetime, progname, msg| "[#{datetime.strftime('%Y-%m-%d %H:%M:%S')}] [#{@node_id}] #{msg}\n" }
@server = TlsServer.new(@node_id, @port, @peers.merge({@node_id => "localhost:#{@port}"}))
end
def start
@logger.info("Node starting up...")
# 在后台线程中启动 mTLS 服务器来接收来自其他节点的消息
Thread.new do
@server.start do |socket, client_cn|
handle_connection(socket)
end
end
end
# 对外暴露的接口,用于提交一个新的值
def propose(value)
instance_to_propose = @log_mutex.synchronize { @highest_log_instance + 1 }
@proposal_mutex.synchronize do
# 保证 proposal number 的唯一性和单调递增
# 格式: [timestamp, node_id_integer]
@proposal_number = [Time.now.to_f, @node_id.gsub('node','').to_i]
end
# --- Phase 1a: Proposer sends Prepare ---
prepare_message = Message.new(:prepare, instance_to_propose, @proposal_number, nil, @node_id).to_h
@logger.info("Proposing value '#{value}' for instance #{instance_to_propose} with proposal N=#{@proposal_number.inspect}")
promises = broadcast(prepare_message)
# --- Phase 1b: Proposer waits for a quorum of Promises ---
if promises.size < @quorum_size
@logger.warn("Failed to get a quorum for Prepare phase. Got #{promises.size}, need #{@quorum_size}.")
return false
end
@logger.info("Got quorum of promises (#{promises.size})")
# 检查收到的 promises。如果任何一个 promise 包含了之前已接受的值,
# 必须选择 proposal number 最高的那个值作为本次提议的值。
# 这是 Paxos 算法保证安全性的关键。
highest_accepted_promise = promises.compact.select { |p| p['accepted_value'] }.max_by { |p| p['proposal_number'] }
value_to_propose = if highest_accepted_promise
@logger.info("Found a previously accepted value from promises. Adopting it.")
highest_accepted_promise['accepted_value']
else
value
end
# --- Phase 2a: Proposer sends Propose (Accept Request) ---
propose_message = Message.new(:propose, instance_to_propose, @proposal_number, value_to_propose, @node_id).to_h
acceptances = broadcast(propose_message)
# --- Phase 2b: Proposer waits for a quorum of Accepted ---
if acceptances.size < @quorum_size
@logger.warn("Failed to get a quorum for Propose phase. Got #{acceptances.size}, need #{@quorum_size}.")
return false
end
@logger.info("Value '#{value_to_propose}' accepted by a quorum for instance #{instance_to_propose}. Consensus reached.")
# --- Learning Phase ---
# 广播给所有节点(包括自己),告知最终结果
learn_message = Message.new(:learn, instance_to_propose, nil, value_to_propose, @node_id).to_h
broadcast(learn_message)
learn(instance_to_propose, value_to_propose) # 自己也学习
true
end
def get_log
@log_mutex.synchronize { @log.dup }
end
private
def handle_connection(socket)
raw_message = socket.gets
return unless raw_message
message = JSON.parse(raw_message)
response = case message['type'].to_sym
when :prepare
handle_prepare(message)
when :propose
handle_propose(message)
when :learn
learn(message['instance'], message['proposal_value'])
{ status: 'learned' }
else
{ error: 'unknown message type' }
end
socket.puts(response.to_json)
end
# --- Acceptor Logic ---
def handle_prepare(message)
@state_mutex.synchronize do
instance = message['instance']
proposal_number = message['proposal_number']
@highest_promises[instance] ||= [-1.0, -1] # Initialize with a very small number
if proposal_number > @highest_promises[instance]
@logger.info("Promise for instance #{instance}, N=#{proposal_number.inspect}")
@highest_promises[instance] = proposal_number
# 持久化状态到磁盘(此处省略,但在生产系统中是必须的)
# 返回 Promise,如果之前有接受过值,需要一并返回
return {
status: 'promise',
proposal_number: @accepted_values[instance] ? @accepted_values[instance][:proposal_number] : nil,
accepted_value: @accepted_values[instance] ? @accepted_values[instance][:value] : nil
}
else
@logger.warn("Reject Prepare for instance #{instance}, N=#{proposal_number.inspect} is not higher than #{@highest_promises[instance].inspect}")
return { status: 'rejected' }
end
end
end
def handle_propose(message)
@state_mutex.synchronize do
instance = message['instance']
proposal_number = message['proposal_number']
value = message['proposal_value']
# 只有当提议号大于或等于我们承诺过的最高提议号时,才接受
if proposal_number >= @highest_promises[instance]
@logger.info("Accepted value '#{value}' for instance #{instance}, N=#{proposal_number.inspect}")
@accepted_values[instance] = { proposal_number: proposal_number, value: value }
# 持久化状态到磁盘
return { status: 'accepted' }
else
@logger.warn("Reject Propose for instance #{instance}, N=#{proposal_number.inspect} is not >= #{@highest_promises[instance].inspect}")
return { status: 'rejected' }
end
end
end
# --- Learner Logic ---
def learn(instance, value)
@log_mutex.synchronize do
return if @log.key?(instance) # 避免重复学习
@log[instance] = value
@highest_log_instance = [@highest_log_instance, instance].max
@logger.info("Learned value for instance #{instance}: '#{value}'")
puts "Current Log: #{@log.sort.to_h}"
end
end
# --- Network Communication ---
def broadcast(message)
responses = []
threads = @peers.map do |peer_id, peer_address|
Thread.new do
response = TlsClient.request(@node_id, peer_id, peer_address, message)
# 只收集成功的响应
responses << response if response && (response['status'] == 'promise' || response['status'] == 'accepted')
end
end
threads.each(&:join)
# 也要处理自己作为 Acceptor 的情况
self_response = case message[:type]
when :prepare then handle_prepare(message.transform_keys(&:to_s))
when :propose then handle_propose(message.transform_keys(&:to_s))
else nil
end
responses << self_response if self_response && (self_response[:status] == 'promise' || self_response[:status] == 'accepted')
responses.compact
end
end
这段代码是整个系统的核心。它将 mTLS 通信与 Paxos 的状态机逻辑结合在一起。一个常见的错误是在实现 handle_prepare
和 handle_propose
时,没有使用互斥锁(Mutex
)来保护共享状态,这在高并发下会导致状态错乱,破坏一致性。另一个关键点是在广播后,Proposer 必须将自己的响应也计算在内,因为它自己也是一个 Acceptor。
下面是 Paxos 完整协商过程的流程图:
sequenceDiagram participant Proposer participant A1 as Acceptor 1 participant A2 as Acceptor 2 participant A3 as Acceptor 3 Proposer->>+A1: Prepare(N=1) Proposer->>+A2: Prepare(N=1) Proposer->>+A3: Prepare(N=1) A1-->>-Proposer: Promise(N=1, prev_accepted=nil) A2-->>-Proposer: Promise(N=1, prev_accepted=nil) Note right of Proposer: Quorum (2/3) reached! A3-->>-Proposer: Promise(N=1, prev_accepted=nil) Proposer->>+A1: Propose(N=1, V='config_A') Proposer->>+A2: Propose(N=1, V='config_A') Proposer->>+A3: Propose(N=1, V='config_A') A1-->>-Proposer: Accepted(N=1, V='config_A') A2-->>-Proposer: Accepted(N=1, V='config_A') Note right of Proposer: Quorum (2/3) reached! Consensus achieved. A3-->>-Proposer: Accepted(N=1, V='config_A') Proposer->>A1: Learn(V='config_A') Proposer->>A2: Learn(V='config_A') Proposer->>A3: Learn(V='config_A')
第三阶段:组装并运行集群
现在,我们可以把所有部分组装起来。下面的启动脚本会根据传入的参数启动一个节点。
# run_node.rb
require_relative 'lib/paxos_node'
unless ARGV.length == 2
puts "Usage: ruby run_node.rb <node_id> <port>"
puts "Example: ruby run_node.rb node1 8001"
exit 1
end
NODE_ID = ARGV[0]
PORT = ARGV[1].to_i
# 在真实项目中,这个配置应该来自配置文件或服务发现
PEERS = {
'node1' => 'localhost:8001',
'node2' => 'localhost:8002',
'node3' => 'localhost:8003'
}
# 从集群中移除自己
peers_without_self = PEERS.reject { |id, _| id == NODE_ID }
node = PaxosNode.new(NODE_ID, PORT, peers_without_self)
node.start
# 交互式命令行,用于测试
puts "Node #{NODE_ID} is running. Type 'propose <value>' or 'log' or 'exit'."
loop do
print "> "
input = gets.chomp.split(' ', 2)
command = input[0]
case command
when 'propose'
if input[1]
# 在一个单独的线程中发起提议,避免阻塞主线程
Thread.new { node.propose(input[1]) }
else
puts "Usage: propose <value>"
end
when 'log'
puts node.get_log.sort.to_h.inspect
when 'exit'
exit 0
else
puts "Unknown command: #{command}" unless command.empty?
end
end
要运行这个集群,首先确保已生成证书 (ruby scripts/generate_certs.rb
)。然后打开三个终端窗口,分别启动三个节点:
# Terminal 1
ruby run_node.rb node1 8001
# Terminal 2
ruby run_node.rb node2 8002
# Terminal 3
ruby run_node.rb node3 8003
启动后,在 node1
的终端里输入 propose apples
。你会看到 node1
的日志输出了 Paxos 的各个阶段,最终达成共识。然后,在 node2
或 node3
的终端里输入 log
,你会看到它们也已经学习到了这个值:{0=>"apples"}
。接着,在 node2
终端里输入 propose oranges
,所有节点上的日志都会更新为 {0=>"apples", 1=>"oranges"}
。这证明了我们的安全复制日志已经工作。
局限性与未来迭代方向
这个从零构建的模块验证了核心设计,但在投入生产前,还有几个重要的工程问题需要解决。
首先,领导者选举。目前的实现中,任何节点都可以发起提议,这在多个节点同时提议时可能导致活锁(livelock),即提议号不断升高但没有提议被接受。工业级的 Paxos 实现(如 Raft)通常会选举一个稳定的领导者,只有领导者可以发起提议,这大大简化了流程并提高了效率。我们的系统可以通过在 Paxos 日志本身之上实现一个租约机制来选举领导者。
其次,状态持久化。@highest_promises
和 @accepted_values
是 Acceptor 的关键状态,必须被持久化到磁盘。如果节点崩溃重启,它必须能恢复这些状态,否则可能破坏之前已达成的共识,违反 Paxos 的安全性保证。
第三,日志压缩。随着系统运行,日志会无限增长。必须实现快照(snapshotting)机制,定期将日志应用到某个状态机,然后将旧日志截断,以释放空间。
最后,集群成员变更。当前的节点列表是静态的。在真实环境中,我们需要动态地增加或移除节点。这是一个复杂的分布式一致性问题,通常也需要通过在 Paxos 日志中提交成员变更配置来实现。
尽管存在这些局限,但这个实践过程本身极具价值。它不仅深入到了 Paxos 算法的实现细节,更重要的是,它将安全通信(mTLS)作为系统设计的一等公民,构建了一个在不可信网络中具备基本生存能力的一致性内核。对于需要轻量级、嵌入式、强一致性且技术栈统一的场景,这条路径提供了一个可行的起点。