定时任务技术文档

1. 概述

系统中存在一个名为 PostSchedule 的定时任务类,主要负责处理用户活动数据(User Activity)的处理和统计工作。该类通过Spring的@Scheduled注解实现定时执行,并使用分布式锁机制确保在多节点部署环境中只有一个实例在执行任务。

2. 整体架构

2.1 核心组件

2.2 数据流总览

原始用户活动数据(userActivityCollection)
         ↓
处理后的用户活动数据(processedUserActivityRepository)
         ↓
用户活动统计数据(userActivityStatisticRepositoryV2)
         ↓
用户活动汇总数据(userActivitySummaryRepositoryV2)

3. 详细处理流程

3.1 第一阶段:获取分布式锁

涉及组件:

  • lockCollection:存储分布式锁信息的集合
  • 锁名称:${pearEnv}ProcessingUa
  • 锁持有者标识:随机生成的UUID

执行频率:每分钟执行一次 (fixedDelayString = "PT1M")

处理逻辑:

  1. 查询是否存在锁记录
  2. 如果存在且未过期,检查是否为自己持有的锁
  3. 如果是自己持有的锁,则更新过期时间
  4. 如果没有锁或锁已过期,则尝试插入新的锁记录
  5. 成功获取锁后,启动后续处理任务

3.2 第二阶段:处理原始用户活动数据

涉及组件:

执行频率:每2分钟执行一次(获取锁后启动)

数据流向:

  1. userActivityCollection 集合中读取未处理的用户活动数据
  2. 根据 consumerId 的首字母将数据分配到不同的处理队列(共16个队列:0-9,a-f)
  3. 并行处理各队列中的数据:
    • 为每条记录分配 sessionId(30分钟内的活动属于同一会话)
    • 防止重复处理相同数据
  4. 将处理后的数据保存到 processedUserActivityRepository 对应的集合中

关键处理逻辑:

  • 使用滑动窗口机制处理会话连接
  • 会话超时时间:30分钟
  • 批处理大小:1000条记录

3.2.1 实际数据处理示例

原始数据 UserActivity 示例:

{
  "createdAt": {
    "$date": "2025-12-03T07:25:23.933Z"
  },
  "_id": {
    "$oid": "692fe5e54f09245af6a607b6"
  },
  "postId": null,
  "userRole": "CONSUMER",
  "sessionId": null,
  "botStatus": "NOT_BOT",
  "urlParameters": null,
  "pearId": null,
  "activityData": {
    "price": 0,
    "userId": "f4a6b3f1-d9e8-457c-81cb-a8851eb7ee4e",
    "cookies": {
      "_ga": "GA1.1.309855701.1764727420",
      "_fbp": "fb.1.1764727424021.60396115221247597",
      "_ga_RV17GL444M": "GS2.1.s1764746687$o4$g0$t1764746703$j44$l0$h0"
    },
    "products": "[{\"id\":\"e1bdf484-8eb3-46f6-b1aa-316429853033\",\"title\":\"General Admission\",\"quantity\":1,\"price\":0}]",
    "createdAt": "2025-12-03T07:25:23.933Z",
    "visitorId": "aa170eb0-1be2-4919-83e8-94f20a444f42",
    "timeOffset": 0,
    "orderNumber": "pr78674",
    "paymentMethod": null
  },
  "userId": "f4a6b3f1-d9e8-457c-81cb-a8851eb7ee4e",
  "adId": null,
  "activityName": "USER_ACTIVITY_ORDER_PLACED",
  "consumerId": "aa170eb0-1be2-4919-83e8-94f20a444f42",
  "userAgent": {
    "os": {
      "name": "Windows",
      "version": "10"
    },
    "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
    "cpu": {
      "architecture": "amd64"
    },
    "isBot": false,
    "device": {},
    "engine": {
      "name": "Blink",
      "version": "142.0.0.0"
    },
    "browser": {
      "name": "Chrome",
      "major": "142",
      "version": "142.0.0.0"
    }
  },
  "receivedAt": {
    "$date": "2025-12-03T07:25:25.288Z"
  }
}

处理后的 ProcessedUserActivity 数据示例:

{
  "createdAt": {
    "$date": "2025-06-23T01:44:12.133Z"
  },
  "userAgent": {
    "os": {
      "name": "Windows",
      "version": "10"
    },
    "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
    "cpu": {
      "architecture": "amd64"
    },
    "isBot": false,
    "device": {},
    "engine": {
      "name": "Blink",
      "version": "137.0.0.0"
    },
    "browser": {
      "name": "Chrome",
      "major": "137",
      "version": "137.0.0.0"
    }
  },
  "activityName": "USER_ACTIVITY_ORDER_PLACED",
  "postId": null,
  "receivedAt": {
    "$date": "2025-06-23T01:44:06.928Z"
  },
  "urlParameters": null,
  "_id": {
    "$oid": "6858b16612eb317ae2aa6007"
  },
  "botStatus": "NOT_BOT",
  "userId": "071c9139-9db6-44ea-85be-e5aae1beafa3",
  "adId": null,
  "consumerId": "071c9139-9db6-44ea-85be-e5aae1beafa3",
  "userRole": "CONSUMER",
  "sessionId": {
    "$numberLong": "1703106525706345"
  },
  "pearId": null,
  "activityData": {
    "price": 90.49,
    "cookies": {
      "_ga": "GA1.1.1327355664.1749691171",
      "_fbp": "fb.1.1749691171277.845883016907596194",
      "_ga_RV17GL444M": "GS2.1.s1750642027$o27$g1$t1750643049$j60$l0$h0"
    },
    "products": "[{\"id\":\"61ed0a75-220a-40ad-9c8c-6916bad2f91a\",\"title\":\"99999\",\"quantity\":1,\"price\":100}]",
    "createdAt": "2025-06-23T01:44:12.133Z",
    "visitorId": "7b283b21-691a-4894-b8da-287fa6325ec3",
    "timeOffset": 0,
    "orderNumber": "pr62787",
    "paymentMethod": "BANK_CARD"
  }
}

处理过程说明:

  1. 从 UserActivity 集合中筛选符合条件的数据:

    • botStatus 不存在或为 "NOT_BOT"
    • receivedAt 字段存在
    • activityName 为具体的活动类型如 "USER_ACTIVITY_ORDER_PLACED"
  2. 为每条记录分配 sessionId:

    • 根据 consumerId 查找最近处理过的同用户记录
    • 如果找不到或者时间间隔超过30分钟,则分配新的 sessionId(使用 System.nanoTime())
    • 否则沿用之前的 sessionId
  3. 在上面的例子中,UserActivity 记录会被处理成 ProcessedUserActivity 记录,主要变化是增加了 sessionId 字段。

3.3 第三阶段:生成统计数据

涉及组件:

执行频率:每2分钟执行一次(与第二阶段并行)

数据流向:

  1. processedUserActivityRepository 获取已处理的用户活动数据
  2. 按日期分组处理数据(逐日处理)
  3. 判断每条记录是否来自广告流量:
    • 检查当前会话中是否有广告点击ID(clickid)
    • 检查过去28天内是否通过广告访问过
  4. 生成详细的统计数据并保存到 userActivityStatisticRepositoryV2 对应的集合中

关键处理逻辑:

  • 按日处理,从最早的未处理日期开始
  • 广告来源判断算法
  • 数据去重处理

3.4 第四阶段:生成汇总数据

涉及组件:

执行频率:每2分钟执行一次(与第二、三阶段并行)

数据流向:

  1. userActivityStatisticRepositoryV2 获取统计数据
  2. 按是否来自广告、postId等维度分组
  3. 进行UV去重计算:
    • uniqueVisitors(唯一访客)
    • engaged(参与互动)
    • viewedProduct(查看商品)
    • clickedAddToCart(点击加入购物车)
    • clickedBuyNow(点击立即购买)
    • initiatedCheckout(发起结账)
    • placedOrder(下单)
    • addedToCart(加购)
    • orderQuantity(订单数量)
  4. 按设备类型、平台等维度分类统计
  5. 生成汇总数据并保存到 userActivitySummaryRepositoryV2 对应的集合中

4. 关键技术实现

4.1 并行处理

  • 使用虚拟线程提高并发性能
  • 按用户ID首字母分片处理,减少竞争
  • 使用队列和锁机制保证数据一致性

4.2 异常处理

  • 所有定时任务都有异常捕获机制,防止任务中断
  • 数据库操作具有重试机制
  • 内存管理机制防止OOM(及时清理缓存)

4.3 数据一致性保障

  • 使用MongoDB的原子操作保证数据写入一致性
  • Upsert机制确保数据不丢失
  • 批量处理提高性能同时保持数据完整性

5. Python演示脚本说明

为了更好地理解整个流程,我们还提供了一个Python演示脚本 full_schedule_demo.py,它完整地模拟了Java代码中的处理逻辑,包括:

  1. 分布式锁机制
  2. 会话ID分配逻辑
  3. 详细统计数据生成
  4. 汇总数据生成

通过运行该脚本,可以清楚地看到每一步的中间产物和最终结果,有助于理解整个数据处理流程。

results matching ""

    No results matching ""