我们面临一个棘手的矛盾。业务分析团队依赖于一个存储着数十 PB 数据的 Apache Iceberg 数据湖。每隔几分钟,由 Ray 集群驱动的 ETL 作业就会向核心事实表中原子性地提交数 GB 的新数据。另一方面,运营团队需要一个能在 macOS 和 iPad 上运行的实时仪表盘,这个仪表盘必须在数据写入后的几秒内,就能可视化最新的关键性能指标(KPI)。
最初的方案简单粗暴:一个 SwiftUI 应用,每 10 秒轮询一次后端 API,该 API 再去查询 Iceberg 表。这个方案在生产环境中迅速崩溃。对 Iceberg 的查询,即便有分区裁剪和谓词下推,对于一个需要即时响应的 UI 来说,其 p99 延迟仍然过高,动辄数十秒。更严重的是,成百上千个客户端同时轮询,给我们的查询引擎(Trino)带来了巨大的、无意义的负载,成本急剧攀升。我们本质上是在用一个为高效批处理设计的系统,去应对一个低延迟的流式查询场景。这行不通。
我们需要的是一种推(Push)模型,而不是拉(Pull)模型。当 Iceberg 表发生变更时,后端应该主动通知所有活跃的客户端。这就把问题转化为了:如何可靠、低延迟地捕获 Iceberg 表的变更事件,并通过一个可扩展的机制广播出去?
架构构想与技术选型
放弃轮询后,一个事件驱动的架构浮出水面。其核心流程应该如下:
- 一个服务持续监控 Iceberg 表的元数据,检测新快照(Snapshot)的产生。
- 一旦检测到新快照,该服务立即生成一个“数据已更新”的事件。
- 该事件被发布到一个高吞吐、低延迟的消息总线。
- SwiftUI 客户端作为订阅者,监听这个消息总线,并在收到事件后触发一次精准的数据刷新请求。
在这个架构下,技术选型至关重要。
变更捕获: 我们已经在使用 Ray 进行数据处理。与其引入一个全新的组件(比如 Debezium 或监听 S3 事件的 Lambda),不如利用 Ray 的原生能力。我们可以设计一个轻量级、长时间运行的 Ray Actor,专门负责监控 Iceberg 的元数据指针。这比启动一个完整的 Spark/Flink 作业来做同样的事情要高效得多。
消息总线: 在 Kafka、Pulsar 和云原生消息服务之间,我们选择了 Azure Service Bus。原因在于它的完全托管特性、企业级的可靠性(至少一次送达保证)以及对 AMQP 1.0 协议的良好支持。AMQP 协议有成熟的跨语言客户端库,这对我们连接 Python 后端和 Swift 客户端至关重要。
客户端: SwiftUI 是既定选择。挑战在于如何在 Swift 生态中优雅地处理与 Azure Service Bus 的长连接和异步消息接收。
最终的架构图如下:
graph TD subgraph "数据处理平面 (Ray Cluster)" A[ETL 作业] -- 写入 --> B[Apache Iceberg Table on S3]; C[Iceberg Sentinel Actor] -- 轮询元数据 --> B; end subgraph "事件通知平面 (Azure Cloud)" D[Azure Service Bus Topic]; end subgraph "客户端平面 (Apple Ecosystem)" E[SwiftUI Dashboard App]; F[API Gateway]; G[轻量级查询服务]; end C -- 检测到新 Snapshot --> H{发布变更事件}; H -- publish --> D; D -- push (AMQP) --> E; E -- 收到事件后 --> I{发起数据刷新请求}; I -- HTTP GET --> F; F -- 路由 --> G; G -- 使用 Snapshot ID 精准查询 --> B; G -- 返回聚合结果 --> F; F -- 返回结果 --> E;
步骤一:实现 Iceberg 哨兵 Ray Actor
这个 Actor 是整个系统的脉搏。它的职责单一且明确:监视一个 Iceberg 表的元数据文件指针 (version-hint.text
或直接列出 v*.metadata.json
文件),并在指针更新时发出通知。
这里的坑在于,我们不能简单地依赖文件系统的 mtime
,因为它在对象存储上可能不可靠或有延迟。最可靠的方式是直接读取指针文件中的版本号,并与 Actor 内存中维护的最新版本号进行比较。
# iceberg_sentinel_actor.py
import os
import logging
import time
import json
from typing import Optional
import ray
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from pyiceberg.catalog import load_catalog
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@ray.remote
class IcebergSentinelActor:
"""
一个长期运行的 Ray Actor,负责监控指定的 Iceberg 表的元数据变更,
并在检测到新快照时,向 Azure Service Bus 发送通知。
"""
def __init__(self, catalog_name: str, table_identifier: str, service_bus_conn_str: str, topic_name: str):
self.catalog_name = catalog_name
self.table_identifier = table_identifier
self.service_bus_conn_str = service_bus_conn_str
self.topic_name = topic_name
# 在真实项目中,目录配置应来自共享配置
self.catalog = load_catalog(
catalog_name,
**{
"uri": os.environ.get("ICEBERG_S3_URI"),
"s3.endpoint": os.environ.get("ICEBERG_S3_ENDPOINT"),
"s3.access-key-id": os.environ.get("ICEBERG_S3_ACCESS_KEY_ID"),
"s3.secret-access-key": os.environ.get("ICEBERG_S3_SECRET_ACCESS_KEY"),
}
)
self.last_known_snapshot_id: Optional[int] = None
self._is_running = False
logging.info(f"Sentinel for table '{table_identifier}' initialized.")
def _get_current_snapshot_id(self) -> Optional[int]:
"""
加载表并返回当前快照ID。包含错误处理。
"""
try:
table = self.catalog.load_table(self.table_identifier)
if table.current_snapshot():
return table.current_snapshot().snapshot_id
return None
except Exception as e:
# 在生产环境中,应该有更细致的异常处理和告警
logging.error(f"Failed to load table or get current snapshot for {self.table_identifier}: {e}")
return None
async def _publish_update_event(self, new_snapshot_id: int, timestamp: int):
"""
构建消息并异步发布到 Azure Service Bus 主题。
"""
message_body = {
"table_identifier": self.table_identifier,
"new_snapshot_id": new_snapshot_id,
"event_timestamp_utc": timestamp,
"version": "1.0"
}
async with ServiceBusClient.from_connection_string(self.service_bus_conn_str) as client:
async with client.get_topic_sender(self.topic_name) as sender:
try:
message = ServiceBusMessage(json.dumps(message_body), content_type="application/json")
await sender.send_messages(message)
logging.info(f"Successfully published event for snapshot {new_snapshot_id} to topic '{self.topic_name}'.")
except Exception as e:
logging.error(f"Failed to publish message to Service Bus: {e}")
# 在这里可以加入重试逻辑或死信队列策略
def run(self, poll_interval_seconds: float = 2.0):
"""
启动监控循环。
"""
logging.info(f"Starting monitoring loop for {self.table_identifier} with {poll_interval_seconds}s interval.")
self._is_running = True
# 初始化 last_known_snapshot_id
self.last_known_snapshot_id = self._get_current_snapshot_id()
if self.last_known_snapshot_id:
logging.info(f"Initial snapshot ID for {self.table_identifier} is {self.last_known_snapshot_id}.")
else:
logging.warning(f"Could not determine initial snapshot ID for {self.table_identifier}.")
import asyncio
loop = asyncio.get_event_loop()
while self._is_running:
time.sleep(poll_interval_seconds)
current_snapshot_id = self._get_current_snapshot_id()
if current_snapshot_id and current_snapshot_id != self.last_known_snapshot_id:
logging.info(f"New snapshot detected for {self.table_identifier}. Old: {self.last_known_snapshot_id}, New: {current_snapshot_id}.")
self.last_known_snapshot_id = current_snapshot_id
# 异步发布事件,不阻塞监控循环
event_timestamp = int(time.time())
loop.create_task(self._publish_update_event(current_snapshot_id, event_timestamp))
def stop(self):
"""
停止监控循环。
"""
self._is_running = False
logging.info(f"Stopping sentinel for table {self.table_identifier}.")
# --- 部署和运行 Sentinel Actor ---
if __name__ == '__main__':
# 假设 Ray 已经初始化
ray.init(address='auto')
# 从环境变量或配置文件加载敏感信息
SERVICE_BUS_CONNECTION_STR = os.environ.get("AZURE_SERVICE_BUS_CONN_STR")
SERVICE_BUS_TOPIC_NAME = "iceberg.table.updates"
CATALOG_NAME = "prod_catalog"
TABLE_IDENTIFIER = "db.fact_events"
# 创建并启动 Actor
sentinel = IcebergSentinelActor.options(name="IcebergSentinel", get_if_exists=True).remote(
catalog_name=CATALOG_NAME,
table_identifier=TABLE_IDENTIFIER,
service_bus_conn_str=SERVICE_BUS_CONNECTION_STR,
topic_name=SERVICE_BUS_TOPIC_NAME,
)
# run 方法会阻塞,所以我们在 Actor 内部启动一个循环
# 这里调用 run 是异步的,不会阻塞主线程
sentinel.run.remote(poll_interval_seconds=1.5)
# 在实际应用中,Actor 会在 Ray 集群中一直运行,直到集群关闭或 Actor 被显式销毁
# 这里用一个 sleep 模拟长时间运行
try:
while True:
time.sleep(60)
logging.info("Sentinel actor is running...")
except KeyboardInterrupt:
ray.kill(sentinel)
这个 Actor 的设计考虑了几个生产实践:
- 无状态与恢复: Actor 本身是有状态的(
last_known_snapshot_id
),但这个状态可以在重启后通过查询 Iceberg 表的当前快照快速重建。 - 资源效率: 使用一个长时间运行的 Actor 比周期性地启动一个 Ray Task 或 Spark Job 开销小得多。它只占有一个 CPU核心和少量内存。
- 异步通信: 向 Azure Service Bus 的发布是异步的,避免了网络延迟阻塞下一次的元数据检查。
- 配置驱动: 所有敏感信息和环境配置都通过环境变量注入,符合云原生应用的实践。
步骤二:在 SwiftUI 客户端中消费事件
这是连接后端与前端的关键一步。我们需要一个可靠的 AMQP 客户端库。在 Swift 生态中,swift-nio-amqp
是一个基于 SwiftNIO 的高性能底层选择。我们将基于它封装一个易于在 SwiftUI 中使用的服务。
我们将创建一个 ObservableObject
,它负责维护与 Azure Service Bus 的连接,监听消息,并在收到消息时更新 @Published
属性。
// ServiceBusListener.swift
import Foundation
import Combine
import AMQPClient // 这是一个假设的库,API 模仿了常见 AMQP 客户端
// 定义从 Service Bus 接收到的消息结构
struct IcebergUpdateNotification: Decodable, Identifiable {
let id = UUID() // 为 SwiftUI 列表提供唯一标识
let tableIdentifier: String
let newSnapshotId: Int
let eventTimestampUtc: Int
enum CodingKeys: String, CodingKey {
case tableIdentifier = "table_identifier"
case newSnapshotId = "new_snapshot_id"
case eventTimestampUtc = "event_timestamp_utc"
}
}
// 负责与 Azure Service Bus 通信的核心服务
// 遵从 ObservableObject 协议,以便 SwiftUI 视图可以订阅其变化
@MainActor // 确保所有 @Published 属性的更新都在主线程上
class ServiceBusListener: ObservableObject {
@Published var latestNotification: IcebergUpdateNotification?
@Published var connectionStatus: String = "Disconnected"
@Published var error: Error?
private var amqpClient: AMQPClient?
private var cancellables = Set<AnyCancellable>()
private let connectionString: String
private let topicName: String
private let subscriptionName: String
init(connectionString: String, topicName: String, subscriptionName: String) {
self.connectionString = connectionString
self.topicName = topicName
self.subscriptionName = subscriptionName
}
func connectAndListen() {
// 防止重复连接
guard connectionStatus == "Disconnected" else { return }
Task {
do {
self.connectionStatus = "Connecting..."
// 1. 解析连接字符串并建立连接 (此为伪代码)
let config = try AMQPConnectionConfig.from(connectionString: self.connectionString)
let client = AMQPClient(config: config)
self.amqpClient = client
try await client.connect()
self.connectionStatus = "Connected"
// 2. 声明一个消费者来监听订阅
// 在 Service Bus 中,Topic 的订阅者表现得像一个队列
let consumer = try await client.createConsumer(
queueName: "\(topicName)/subscriptions/\(subscriptionName)"
)
// 3. 开始消费消息
self.connectionStatus = "Listening for messages..."
for await message in consumer.messages {
handle(message: message)
}
} catch let amqpError {
self.connectionStatus = "Failed"
self.error = amqpError
// 在真实项目中,这里应该有带退避策略的重连逻辑
disconnect()
}
}
}
private func handle(message: AMQPMessage) {
// 确保消息处理和 UI 更新在主线程
guard let body = message.body else {
// 确认消息,即使它是空的,以防止它被重新投递
Task { try? await message.ack() }
return
}
do {
let notification = try JSONDecoder().decode(IcebergUpdateNotification.self, from: body)
self.latestNotification = notification
self.error = nil
logging.info("Received and decoded new notification for snapshot: \(notification.newSnapshotId)")
} catch let decodeError {
self.error = decodeError
logging.error("Failed to decode message: \(decodeError)")
}
// 不论成功与否,都要确认消息
Task { try? await message.ack() }
}
func disconnect() {
self.connectionStatus = "Disconnected"
Task {
try? await self.amqpClient?.close()
self.amqpClient = nil
}
}
}
现在,我们可以在 SwiftUI 视图中使用这个 ServiceBusListener
。
// ContentView.swift
import SwiftUI
struct ContentView: View {
// 使用 @StateObject 来管理 Listener 的生命周期
@StateObject private var listener: ServiceBusListener
// 假设这是从 API 获取的数据模型
@State private var kpiData: String = "Waiting for data..."
@State private var isLoading: Bool = false
init() {
// 从安全的地方加载配置,例如 Info.plist 或一个专门的配置文件
let connStr = "Endpoint=sb://your-namespace.servicebus.windows.net/;..."
let topic = "iceberg.table.updates"
let subscription = "swiftui_dashboard_sub_1" // 每个客户端实例应该有唯一的订阅
_listener = StateObject(wrappedValue: ServiceBusListener(
connectionString: connStr,
topicName: topic,
subscriptionName: subscription
))
}
var body: some View {
VStack(spacing: 20) {
Text("Iceberg Real-time Dashboard")
.font(.largeTitle)
VStack {
Text("Connection Status")
.font(.headline)
Text(listener.connectionStatus)
.foregroundColor(statusColor)
.bold()
}
if let notification = listener.latestNotification {
VStack(alignment: .leading) {
Text("Latest Update Received:")
.font(.headline)
Text("Table: \(notification.tableIdentifier)")
Text("New Snapshot ID: \(notification.newSnapshotId)")
Text("Timestamp: \(Date(timeIntervalSince1970: TimeInterval(notification.eventTimestampUtc)))")
}
.padding()
.background(Color.gray.opacity(0.2))
.cornerRadius(10)
}
// 数据展示区域
ZStack {
Text(kpiData)
.font(.system(.largeTitle, design: .monospaced))
.redacted(reason: isLoading ? .placeholder : [])
if isLoading {
ProgressView()
}
}
if let error = listener.error {
Text("Error: \(error.localizedDescription)")
.foregroundColor(.red)
}
}
.padding()
.onAppear {
listener.connectAndListen()
}
.onDisappear {
listener.disconnect()
}
// 当收到新通知时,触发数据获取
.onChange(of: listener.latestNotification?.newSnapshotId) { newSnapshotId in
guard let newSnapshotId = newSnapshotId else { return }
fetchLatestKPIData(snapshotId: newSnapshotId)
}
}
private var statusColor: Color {
switch listener.connectionStatus {
case "Connected", "Listening for messages...":
return .green
case "Connecting...":
return .orange
default:
return .red
}
}
// 模拟从后端 API 获取数据的过程
private func fetchLatestKPIData(snapshotId: Int) {
isLoading = true
// 在真实应用中,这里会是一个网络请求
// URL 可能是 "https://api.yourcompany.com/kpi?snapshotId=\(snapshotId)"
Task {
// 模拟 0.5 秒的网络延迟
try? await Task.sleep(nanoseconds: 500_000_000)
// 模拟 API 返回的结果
let freshData = "Revenue: $\(Int.random(in: 1000...5000))K"
self.kpiData = freshData
self.isLoading = false
}
}
}
闭环:精准的数据查询
最后一步是那个轻量级的查询服务。当 SwiftUI 客户端收到包含 new_snapshot_id
的通知后,它会调用这个服务。这个服务的重要性在于它的查询是精准且高效的。它利用 Iceberg 的时间旅行能力,直接查询特定快照的数据。
# aiohttp_api_server.py (一个轻量级查询服务的伪代码)
from aiohttp import web
from pyiceberg.catalog import load_catalog
# ... 初始化 catalog ...
async def get_kpi(request):
snapshot_id = request.query.get('snapshotId')
if not snapshot_id:
return web.Response(text="Missing snapshotId parameter", status=400)
try:
table = catalog.load_table("db.fact_events")
# 使用时间旅行查询指定快照
df = table.scan(snapshot_id=int(snapshot_id)).to_pandas()
# 在这里执行你的聚合计算
# 这是一个极其简化的例子
total_revenue = df['price'].sum()
return web.json_response({"kpi": {"total_revenue": total_revenue}})
except Exception as e:
# 错误处理
return web.Response(text=f"Error processing request: {e}", status=500)
app = web.Application()
app.add_routes([web.get('/kpi', get_kpi)])
# web.run_app(app)
这个 API 端点确保了每次查询都是针对一个已知的数据版本,避免了在查询过程中数据发生变化的复杂情况,并且查询速度很快,因为查询引擎可以直接定位到该快照对应的清单文件(Manifest Files)。
局限性与未来迭代路径
此架构虽然解决了最初的轮询问题,但并非银弹。
首先,系统的“近实时”延迟下限,取决于 Ray Actor 的轮询间隔。在我们的实现中是 1.5 秒。要达到亚秒级,可以改造 Sentinel Actor,使其不再轮询,而是被动接收来自 S3 的事件通知(例如通过 S3 Event Notifications -> SQS -> Lambda 触发器),但这会增加架构的复杂性。
其次,让大量移动客户端直接通过 AMQP 连接到 Azure Service Bus,在网络不稳定或客户端规模达到数万时,会给客户端的电量和连接管理带来挑战。一个更稳健的生产级模式可能是在云端部署一个“后端对前端”(BFF)层,由 BFF 维护到 Service Bus 的稳定连接,然后通过 WebSocket 将消息推送到 SwiftUI 客户端。这能更好地控制连接数和消息格式。
最后,我们依赖 Azure Service Bus 的“至少一次”投递保证。这意味着客户端需要具备幂等性,即多次处理同一个 snapshot_id
的通知不会产生副作用。在我们的例子中,onChange(of:)
修饰符天然地处理了这个问题,因为连续相同的 newSnapshotId
值不会重复触发动作。但在更复杂的场景下,需要客户端维护一个已处理事件的记录。