UserActivity 数据处理全流程说明

1. 整体处理流程

数据在系统中的流转和处理过程如下:

UserActivity (原始数据)
    ↓ (会话处理)
ProcessedUserActivity (会话数据)
    ↓ (行为统计)
UserActivityStatistic_America/Los_Angeles_v2 (统计数据)
    ↓ (汇总处理)
UserActivitySummary_America/Los_Angeles_v2 (汇总数据)

2. 各阶段处理逻辑详解

2.1 第一阶段:UserActivity → ProcessedUserActivity

处理逻辑说明:

  1. 从 UserActivity 集合中读取未处理的用户活动数据
  2. 根据 consumerId 查找最近处理过的同用户记录
  3. 根据时间间隔(30分钟)为每条记录分配 sessionId
  4. 添加统计相关字段(如 fromAd、uniqueVisitors 等)
  5. 将处理后的数据保存到 ProcessedUserActivity 集合中

关键查询语句:

// 查询未处理的用户活动数据
db.UserActivity.find({
  "$and": [ 
    {"receivedAt": {"$exists": True}},
    {"$or": [
      {"botStatus": {"$exists": False}},
      {"botStatus": "NOT_BOT"}
    ]}
  ]
})

// 查询用户最近处理记录
db.ProcessedUserActivity.findOne(
  {"env": "prod", "consumerId": "xxx"},
  {sort: {"createdAt": -1}}
)

处理逻辑伪代码:

# 分配会话ID的逻辑
if not recent_record:
    # 用户第一次活动,创建新的会话ID
    session_id = time.time_ns()
else:
    # 检查时间间隔
    time_diff = (created_at - recent_time).total_seconds()
    if time_diff > 30 * 60:  # 超过30分钟,创建新的会话
        session_id = time.time_ns()
    else:
        # 沿用之前的会话ID
        session_id = recent_record.get("sessionId")

2.2 第二阶段:ProcessedUserActivity → UserActivityStatistic

处理逻辑说明:

  1. 按日期逐日处理 ProcessedUserActivity 数据
  2. 提取统计所需字段并构造统计记录
  3. 删除已存在的同日期统计数据
  4. 将新的统计数据保存到 UserActivityStatistic 集合中

关键查询语句:

// 查询指定日期范围内的处理后数据
db.ProcessedUserActivity.find({
  "env": "prod",
  "createdAt": {
    "$gte": start_datetime_utc,
    "$lt": end_datetime_utc
  }
})

// 删除已存在的同日期统计数据
db.UserActivityStatistic.deleteMany({
  "env": "prod",
  "zoneId": "America/Los_Angeles",
  "date": "2025-12-01"
})

处理逻辑伪代码:

# 构造统计记录
statistic_record = {
  "env": "prod",
  "zoneId": "America/Los_Angeles",
  "postId": stat.get("postId"),
  "date": date_str,
  "consumerId": stat.get("consumerId"),
  "sessionId": stat.get("sessionId"),
  "fromAd": stat.get("fromAd", False),
  "uniqueVisitors": stat.get("uniqueVisitors", False),
  "engaged": stat.get("engaged", False),
  "viewedProduct": stat.get("viewedProduct", False),
  "clickedAddToCart": stat.get("clickedAddToCart", False),
  "placedOrder": stat.get("placedOrder", False),
  "device": stat.get("device", "Unknown"),
  "platform": stat.get("platform", "Unknown"),
  "ui": stat.get("ui", {})
}

2.3 第三阶段:UserActivityStatistic → UserActivitySummary

处理逻辑说明:

  1. 按是否来自广告和文章ID对统计数据进行分组
  2. 对每个分组进行用户去重统计
  3. 统计设备、平台和UI分布情况
  4. 删除已存在的同日期汇总数据
  5. 将新的汇总数据保存到 UserActivitySummary 集合中

关键查询语句:

// 删除已存在的同日期汇总数据
db.UserActivitySummary.deleteMany({
  "env": "prod",
  "zoneId": "America/Los_Angeles",
  "date": "2025-12-01"
})

处理逻辑伪代码:

# 按是否来自广告和postId分组
grouped_data = defaultdict(lambda: defaultdict(list))
for stat in statistics:
    from_ad = stat.get("fromAd", False)
    post_id = stat.get("postId", "unknown")
    grouped_data[from_ad][post_id].append(stat)

# 用户去重统计
for from_ad, post_groups in grouped_data.items():
    for post_id, post_stats in post_groups.items():
        # UV去重统计
        visitors = set()
        engaged_users = set()
        product_viewers = set()
        cart_clickers = set()
        order_placers = set()

        for stat in post_stats:
            consumer_id = stat.get("consumerId")
            if not consumer_id:
                continue

            if stat.get("uniqueVisitors"):
                visitors.add(consumer_id)
            if stat.get("engaged"):
                engaged_users.add(consumer_id)
            if stat.get("viewedProduct"):
                product_viewers.add(consumer_id)
            if stat.get("clickedAddToCart"):
                cart_clickers.add(consumer_id)
            if stat.get("placedOrder"):
                order_placers.add(consumer_id)

        # 构造汇总记录
        summary_record = {
            "env": "prod",
            "zoneId": "America/Los_Angeles",
            "date": date_str,
            "postId": post_id,
            "fromAd": from_ad,
            "uniqueVisitors": len(visitors),
            "engaged": len(engaged_users),
            "viewedProduct": len(product_viewers),
            "clickedAddToCart": len(cart_clickers),
            "placedOrder": len(order_placers),  # 这就是最终返回的placedOrder值
            "device": dict(device_stats),
            "platform": dict(platform_stats),
            "ui": dict(ui_stats)
        }

3. 示例数据流转

3.1 UserActivity 原始数据

{
  "_id": "ua001",
  "consumerId": "user001",
  "activityName": "USER_ACTIVITY_ORDER_PLACED",
  "postId": "6ff59f85-6b2b-486c-a7e6-33d1a1979da9",
  "createdAt": "2025-12-01T10:05:00.000Z",
  "receivedAt": "2025-12-01T10:05:05.000Z",
  "botStatus": "NOT_BOT"
}

3.2 ProcessedUserActivity 会话处理数据

{
  "_id": "pua001",
  "consumerId": "user001",
  "sessionId": "sess001",
  "activityName": "USER_ACTIVITY_ORDER_PLACED",
  "postId": "6ff59f85-6b2b-486c-a7e6-33d1a1979da9",
  "createdAt": "2025-12-01T10:05:00.000Z",
  "receivedAt": "2025-12-01T10:05:05.000Z",
  "botStatus": "NOT_BOT",
  "fromAd": false,
  "placedOrder": true
}

3.3 UserActivityStatistic 统计数据

{
  "_id": "stat001",
  "consumerId": "user001",
  "sessionId": "sess001",
  "postId": "6ff59f85-6b2b-486c-a7e6-33d1a1979da9",
  "date": "2025-12-01",
  "fromAd": false,
  "placedOrder": true
}

3.4 UserActivitySummary 汇总数据

{
  "date": "2025-12-01",
  "postId": "6ff59f85-6b2b-486c-a7e6-33d1a1979da9",
  "fromAd": false,
  "placedOrder": 1  // 这就是 [/v2/summary](file:///Users/wangjun/Code/Pears/code/analyse-schedule/source/UserActivityRepositoryImpl.java#L36-L36) 接口返回的值
}

4. 总结

整个数据处理流程是一个典型的ETL(抽取、转换、加载)过程,通过逐层处理将原始用户行为数据转化为可用于分析的汇总数据。每一层都有其特定的处理逻辑和数据结构,确保最终的汇总数据准确反映了用户行为模式。

results matching ""

    No results matching ""