优步流处理应⽤用
袁泳 @ UBER
优步简介
便捷交通,弹指之间
流数据 - 城市的脉搏
市场动态
当前全球有多少⻋车可⽤用
过去10分钟⾥里旧⾦金⼭山⾥里每个区域有多少⻋车载客?
过去10分钟⾥里旧⾦金⼭山⾥里每个六边形有多少⻋车载客?
历史变化
状态跟踪
出发点
优步平台本质:分布式状态机
乘客状态
乘客状态 司机状态
优步平台本质:分布式状态机
各类应⽤用⽣生成事件
秒级事件延迟
事件绝少丢失
收集事件消息 - 低开销、易扩展
挑战在哪⾥里?
多维度
每个事件消息包含数⼗十字段
细粒度数据
细粒度数据
细粒度数据
每个城市多于 10,000 六边形
7 种⻋车型
细粒度数据
⼀一天1440分钟
细粒度数据
13 种司机状态
细粒度数据
300 座城市
细粒度数据
⼀一天数据量:
300 x 10,000 x 7 x 1440 x 13 = 3930亿可能组合
细粒度数据
查询模式不定
任意维度的组合
多种聚合查询
Heatmap
Top N
Histogram
count(), avg(), sum(), percent(), geo
多变的地理位置聚合查询
⾼高流量
• 数⼗十万条消息每秒,数百亿⼀一天
• 每条消息包含数⼗十字段
短时交货
关键: 把问题⼀一般化
数据类型
• 多维时序数据
维度 值
state driver_arrived
vehicle type uber X
timestamp 13244323342
lattitude
longitude
数据查询
• 基于单表时空数据的OLAP
SELECT <agg functions>, <dimensions>
FROM <data_source>
WHERE <boolean filter>
GROUP BY <dimensions>
HAVING <boolean filter>
ORDER BY <sorting criterial>
LIMIT <n>
DO <post aggregation>
选择存储系统
最低要求
• ⽀支持时序和地理空间的OLAP
• ⽀支持⼤大流量数据
• ⽀支持秒级查询
• ⽀支持原始数据查询
键值数据库
键值数据库
维度 值
A a
B b
⼀一键⼀一值:预算所有组合
• 布尔操作符: AND, OR, NOT
⼀一键⼀一值:预算所有组合
维度 值
A a
B b
• 布尔操作符: AND, OR, NOT
⼀一键⼀一值:预算所有组合
维度 值
A a
B b
• A and (not B)
• B and (not A)
• A or B
• not (A or B)
维度
A
B
• {A}
• {B}
• {A, B}
• {}
⼀一键⼀一值:预算所有组合
维度
A
B
• {A}
• {B}
• {A, B}
• {}
⼀一键⼀一值:预算所有组合
—> 计算幂集
键值数据库的局限
预算所有键值组合的时空复杂度:O(2
n
)
关系数据库
关系数据库
关系数据库的局限
• 不易管理多项索引
关系数据库的局限
• 扫描速度不够
但我的KV系统速度奇快
光快不⾏行
⼀一个城市⾥里每个六边形⾥里⻋车的数⺫⽬目 => 18,000 次查询
平均延迟: 1ms
百分位延迟: 2s
失败率: %
光快不⾏行
⼀一次查询延迟超过百分位的概率: (1 - ) x = 83%
⼀一次查询成功的概率: (1 - )18000 = 84%
系统必备功能
• 快速扫描
• 布尔查询
• 原始数据
• 各类聚合
Elasticsearch
基于⾼高效倒排索引的布尔查询
内建分布式查询
快速扫描,灵活聚合
存储
搞定没?
数据转换
. (Lat, Long) -> (zipcode, hexagon)
动态定价
趋势预测
供求分布
技术上说: Clustering & Pr(D, S, E)
新场景 —> 新需求
预处理
Joining Multiple Streams
Sessionization
多级处理
状态管理
Apache Samza
Why Apache Samza?
DAG on Kafka
与Kafka的⼀一流整合
内置检查点
内置状态管理
处理 存储
存储层当掉怎么办?
预处理耗时太久怎么办?
处理 存储
终于搞定了?
后期处理
查询结果转换和平滑处理
查询结果转换和平滑处理
计算规模
⼀一个城市⾄至少10,000六边形
每个六边形331个邻居需要处理
计算规模
⼀一次查询:331 x 10,000 = 310万六边形
计算规模
99%-ile 处理时间: 70ms
计算规模
简单架构
“You can have a second computer once
you’ve shown you know how to use the
first one.”
- Paul Graham
后期处理
• 每个处理单元都是纯函数
• 通过组合算⼦子组合处理单元
后期处理
• ⾼高度并⾏行化的执⾏行
• 流⽔水线
务实考量
数据发现
Elasticsearch 查询语句过度复杂
SELECT
timeseries(7d)
FROM
driverAcceptanceRate
WHERE
geo_dist(10,
[37,
22])
AND
time
IN
(2015-‐02-‐04,2015-‐03-‐06)
AND
=
1
Elasticsearch 查询可以优化
• 流⽔水线
• 查询验证
• 查询限速
T
im
e in
seconds
Elasticsearch 也许会被替换
Storage QueryProcessing
还有⼀一件事
数据流⾥里总有不同模式
总有快速探索发现模式的需要
多少司机在5分钟内连续取消请求5次以上
哪些乘客半⼩小时内在相距超过100公⾥里的地⽅方叫⻋车?
Complex Event Processing
FROM
driver_canceled#(10
min)
SELECT
clientUUID,
count(clientUUID)
as
cancelCount
GROUP
BY
clientUUID
HAVING
cancelCount
>
10
INSERT
INTO
hipchat(room);
简单实现
谢谢!