UserActivity 数据处理全流程说明
1. 整体处理流程
数据在系统中的流转和处理过程如下:
UserActivity (原始数据)
↓ (会话处理)
ProcessedUserActivity (会话数据)
↓ (行为统计)
UserActivityStatistic_America/Los_Angeles_v2 (统计数据)
↓ (汇总处理)
UserActivitySummary_America/Los_Angeles_v2 (汇总数据)
2. 各阶段处理逻辑详解
2.1 第一阶段:UserActivity → ProcessedUserActivity
处理逻辑说明:
- 从 UserActivity 集合中读取未处理的用户活动数据
- 根据 consumerId 查找最近处理过的同用户记录
- 根据时间间隔(30分钟)为每条记录分配 sessionId
- 添加统计相关字段(如 fromAd、uniqueVisitors 等)
- 将处理后的数据保存到 ProcessedUserActivity 集合中
关键查询语句:
// 查询未处理的用户活动数据
db.UserActivity.find({
"$and": [
{"receivedAt": {"$exists": True}},
{"$or": [
{"botStatus": {"$exists": False}},
{"botStatus": "NOT_BOT"}
]}
]
})
// 查询用户最近处理记录
db.ProcessedUserActivity.findOne(
{"env": "prod", "consumerId": "xxx"},
{sort: {"createdAt": -1}}
)
处理逻辑伪代码:
# 分配会话ID的逻辑
if not recent_record:
# 用户第一次活动,创建新的会话ID
session_id = time.time_ns()
else:
# 检查时间间隔
time_diff = (created_at - recent_time).total_seconds()
if time_diff > 30 * 60: # 超过30分钟,创建新的会话
session_id = time.time_ns()
else:
# 沿用之前的会话ID
session_id = recent_record.get("sessionId")
2.2 第二阶段:ProcessedUserActivity → UserActivityStatistic
处理逻辑说明:
- 按日期逐日处理 ProcessedUserActivity 数据
- 提取统计所需字段并构造统计记录
- 删除已存在的同日期统计数据
- 将新的统计数据保存到 UserActivityStatistic 集合中
关键查询语句:
// 查询指定日期范围内的处理后数据
db.ProcessedUserActivity.find({
"env": "prod",
"createdAt": {
"$gte": start_datetime_utc,
"$lt": end_datetime_utc
}
})
// 删除已存在的同日期统计数据
db.UserActivityStatistic.deleteMany({
"env": "prod",
"zoneId": "America/Los_Angeles",
"date": "2025-12-01"
})
处理逻辑伪代码:
# 构造统计记录
statistic_record = {
"env": "prod",
"zoneId": "America/Los_Angeles",
"postId": stat.get("postId"),
"date": date_str,
"consumerId": stat.get("consumerId"),
"sessionId": stat.get("sessionId"),
"fromAd": stat.get("fromAd", False),
"uniqueVisitors": stat.get("uniqueVisitors", False),
"engaged": stat.get("engaged", False),
"viewedProduct": stat.get("viewedProduct", False),
"clickedAddToCart": stat.get("clickedAddToCart", False),
"placedOrder": stat.get("placedOrder", False),
"device": stat.get("device", "Unknown"),
"platform": stat.get("platform", "Unknown"),
"ui": stat.get("ui", {})
}
2.3 第三阶段:UserActivityStatistic → UserActivitySummary
处理逻辑说明:
- 按是否来自广告和文章ID对统计数据进行分组
- 对每个分组进行用户去重统计
- 统计设备、平台和UI分布情况
- 删除已存在的同日期汇总数据
- 将新的汇总数据保存到 UserActivitySummary 集合中
关键查询语句:
// 删除已存在的同日期汇总数据
db.UserActivitySummary.deleteMany({
"env": "prod",
"zoneId": "America/Los_Angeles",
"date": "2025-12-01"
})
处理逻辑伪代码:
# 按是否来自广告和postId分组
grouped_data = defaultdict(lambda: defaultdict(list))
for stat in statistics:
from_ad = stat.get("fromAd", False)
post_id = stat.get("postId", "unknown")
grouped_data[from_ad][post_id].append(stat)
# 用户去重统计
for from_ad, post_groups in grouped_data.items():
for post_id, post_stats in post_groups.items():
# UV去重统计
visitors = set()
engaged_users = set()
product_viewers = set()
cart_clickers = set()
order_placers = set()
for stat in post_stats:
consumer_id = stat.get("consumerId")
if not consumer_id:
continue
if stat.get("uniqueVisitors"):
visitors.add(consumer_id)
if stat.get("engaged"):
engaged_users.add(consumer_id)
if stat.get("viewedProduct"):
product_viewers.add(consumer_id)
if stat.get("clickedAddToCart"):
cart_clickers.add(consumer_id)
if stat.get("placedOrder"):
order_placers.add(consumer_id)
# 构造汇总记录
summary_record = {
"env": "prod",
"zoneId": "America/Los_Angeles",
"date": date_str,
"postId": post_id,
"fromAd": from_ad,
"uniqueVisitors": len(visitors),
"engaged": len(engaged_users),
"viewedProduct": len(product_viewers),
"clickedAddToCart": len(cart_clickers),
"placedOrder": len(order_placers), # 这就是最终返回的placedOrder值
"device": dict(device_stats),
"platform": dict(platform_stats),
"ui": dict(ui_stats)
}
3. 示例数据流转
3.1 UserActivity 原始数据
{
"_id": "ua001",
"consumerId": "user001",
"activityName": "USER_ACTIVITY_ORDER_PLACED",
"postId": "6ff59f85-6b2b-486c-a7e6-33d1a1979da9",
"createdAt": "2025-12-01T10:05:00.000Z",
"receivedAt": "2025-12-01T10:05:05.000Z",
"botStatus": "NOT_BOT"
}
3.2 ProcessedUserActivity 会话处理数据
{
"_id": "pua001",
"consumerId": "user001",
"sessionId": "sess001",
"activityName": "USER_ACTIVITY_ORDER_PLACED",
"postId": "6ff59f85-6b2b-486c-a7e6-33d1a1979da9",
"createdAt": "2025-12-01T10:05:00.000Z",
"receivedAt": "2025-12-01T10:05:05.000Z",
"botStatus": "NOT_BOT",
"fromAd": false,
"placedOrder": true
}
3.3 UserActivityStatistic 统计数据
{
"_id": "stat001",
"consumerId": "user001",
"sessionId": "sess001",
"postId": "6ff59f85-6b2b-486c-a7e6-33d1a1979da9",
"date": "2025-12-01",
"fromAd": false,
"placedOrder": true
}
3.4 UserActivitySummary 汇总数据
{
"date": "2025-12-01",
"postId": "6ff59f85-6b2b-486c-a7e6-33d1a1979da9",
"fromAd": false,
"placedOrder": 1 // 这就是 [/v2/summary](file:///Users/wangjun/Code/Pears/code/analyse-schedule/source/UserActivityRepositoryImpl.java#L36-L36) 接口返回的值
}
4. 总结
整个数据处理流程是一个典型的ETL(抽取、转换、加载)过程,通过逐层处理将原始用户行为数据转化为可用于分析的汇总数据。每一层都有其特定的处理逻辑和数据结构,确保最终的汇总数据准确反映了用户行为模式。