用户活动统计处理技术文档
1. 概述
processActivityToStatisticRecord 方法是定时任务中负责将处理后的用户活动数据转换为统计数据的核心函数。它的主要作用是:
- 按日期分组处理用户活动数据
- 判断每条记录是否来源于广告流量
- 生成详细的统计数据
- 生成汇总统计数据
2. 整体处理流程
2.1 数据准备阶段
系统首先需要确定从哪一天开始处理数据:
- 如果还没有生成过汇总记录,则从最早的已处理记录开始
- 如果已有汇总记录,则从最后一条汇总记录的前一天开始(防止跨天交易遗漏)
- 默认时区为美国洛杉矶时间(America/Los_Angeles)
ZonedDateTime startSummaryDate;
Document latestSummaryRecord = userActivitySummaryRepositoryV2.findLatestSummaryRecord(pearEnv, zoneId);
if (null == latestSummaryRecord) {
Document earliestProcessedRecord = processedUserActivityRepository.getEarliestProcessedRecord(pearEnv);
if (null == earliestProcessedRecord) {
return;
}
startSummaryDate = earliestProcessedRecord.getDate("createdAt").toInstant().atZone(zone).truncatedTo(ChronoUnit.DAYS);
} else {
startSummaryDate = ZonedDateTime.parse(latestSummaryRecord.getString("date") + " 00:00:00", yMdHms);
}
// Start from two days ago to prevent missing cross-day transactions
startSummaryDate = startSummaryDate.minusDays(1);
2.2 逐日处理数据
系统逐日处理数据,每天处理一次,直到处理完所有未处理的数据。
do {
ZonedDateTime endSummaryDate = startSummaryDate.plusDays(1);
String startSummaryDateStr = startSummaryDate.format(yMd);
// 获取指定日期范围内的用户活动数据
LinkedList<Document> statistics = processedUserActivityRepository.uniqueVisitorsAction(
pearEnv,
startSummaryDate.toInstant(),
endSummaryDate.toInstant()
);
// 如果当天没有数据,则检查是否已到未来日期
if (statistics.isEmpty()) {
ZonedDateTime today = ZonedDateTime.now(zone);
String tomorrowStr = today.plusDays(1).format(yMd);
if (tomorrowStr.compareTo(startSummaryDateStr) <= 0) {
break; // 已经处理完所有数据
} else {
startSummaryDate = endSummaryDate;
continue; // 处理下一天
}
}
// 处理统计数据...
startSummaryDate = endSummaryDate;
} while (true);
3. 核心处理逻辑
3.1 广告来源判断
对于每条用户活动记录,系统会判断其是否来源于广告流量:
Iterator<Document> statisticsIter = statistics.iterator();
while (statisticsIter.hasNext()) {
Document d = statisticsIter.next();
String postId = d.getString("postId");
Boolean fromAd = d.getBoolean("fromAd");
// 只有当postId和fromAd都不为空时才继续处理
if (null == postId || null == fromAd) {
statisticsIter.remove();
continue;
}
// 对于非广告数据,需要进一步检查
if (!fromAd) {
// 检查AttributionAndTracking设置
if (checkAttributionAndTrackingCheck(d)) {
continue;
}
// 检查28天内的访问历史
if (checkAccessWithin28Days(d, statistics)) {
continue;
}
}
}
3.1.1 直接广告标记检查
检查当前会话中是否存在广告点击标识:
private boolean checkAttributionAndTrackingCheck(Document document) {
// 获取当前记录的消费者ID和会话ID
String consumerId = document.getString("consumerId");
long sessionId = getSessionId(document); // 从document中提取sessionId
// 查询同一会话中的所有记录
FindIterable<Document> processedRecords = processedUserActivityRepository
.getProcessedRecordByConsumerSessionId(pearEnv, consumerId, sessionId);
for (Document record: processedRecords) {
Document urlParameters = record.get("urlParameters", Document.class);
Document activityData = record.get("activityData", Document.class);
if (null != activityData) {
Document queryParams = activityData.get("queryParams", Document.class);
if (null != queryParams) {
// 检查是否有clickid参数(表示来自广告)
if (null != queryParams.getString("clickid")) {
document.put("fromAd", true);
return true;
}
}
}
if (null != urlParameters) {
// 检查URL参数中是否有clickid
if (null != urlParameters.getString("clickid")) {
document.put("fromAd", true);
return true;
}
}
}
return false;
}
3.1.2 历史访问检查(28天内)
检查用户在过去28天内是否有广告来源的访问记录:
private boolean checkAccessWithin28Days(Document forCheck, List<Document> historyStatisticInMemory) {
String postId = forCheck.getString("postId");
String consumerId = forCheck.getString("consumerId");
String curatorId = forCheck.getString("curatorId");
Instant startTime = forCheck.getDate("startTime").toInstant();
// 先检查内存中的历史数据
for (Document historyStatistic: historyStatisticInMemory) {
if (!consumerId.equals(historyStatistic.getString("consumerId"))) {
continue;
}
Instant historyStartTime = historyStatistic.getDate("startTime").toInstant();
if (historyStartTime.isBefore(startTime) &&
Boolean.TRUE.equals(historyStatistic.getBoolean("originFromAd"))) {
// 如果28天内有广告来源访问,则当前访问也标记为广告来源
forCheck.put("fromAd", true);
forCheck.put("postId", historyStatistic.getString("postId"));
return true;
}
}
// 再检查数据库中的历史数据
String startDate = startTime.atZone(zone).plusDays(-28).format(yMd);
Document lastAccessWithin28Days = userActivityStatisticRepositoryV2
.findLastOriginFromAdUserActivityStatisticsByConsumer(
pearEnv, startDate, startTime, zoneId, curatorId, consumerId);
if (null != lastAccessWithin28Days) {
forCheck.put("fromAd", true);
forCheck.put("postId", lastAccessWithin28Days.getString("postId"));
return true;
}
return false;
}
3.2 生成详细统计数据
为每条记录生成详细的统计数据,用于后续分析:
List<UpdateOneModel<Document>> forStatisticUpsert = statistics.stream().map(d -> {
long sessionId = getSessionId(d);
Bson queryFilter = new BsonDocument("postId", new BsonString(d.getString("postId")))
.append("date", new BsonString(startSummaryDateStr))
.append("consumerId", new BsonString(d.getString("consumerId")))
.append("sessionId", new BsonInt64(sessionId));
return new UpdateOneModel<Document>(
queryFilter,
new BsonDocument("$set", d.toBsonDocument()),
updateOptions
);
}).toList();
3.3 生成汇总统计数据
将详细统计数据按维度分组,生成汇总统计数据:
// 按是否来自广告和postId分组
Map<Boolean, List<Document>> groupingByFromAd = statistics.stream()
.collect(Collectors.groupingBy(
d -> d.getBoolean("fromAd"),
LinkedHashMap::new,
Collectors.toList()
));
groupingByFromAd.forEach((fromAd, fromAdStatistics) -> {
Map<String, List<Document>> groupingByPostId = fromAdStatistics.stream()
.collect(Collectors.groupingBy(
d -> d.getString("postId"),
LinkedHashMap::new,
Collectors.toList()
));
groupingByPostId.forEach((postId, postIdStatistic) -> {
// 对每组数据进行UV去重统计
SummaryRecordInner summary = new SummaryRecordInner();
summary.setPostId(postId);
summary.setFromAd(fromAd);
// UV去重处理
Map<String, Set<String>> uvDeduplication = new HashMap<>();
postIdStatistic.forEach(d -> {
String consumerId = d.getString("consumerId");
Set<String> uvKeys = uvDeduplication.computeIfAbsent(
consumerId,
k -> new HashSet<>()
);
// 各种行为统计(去重)
if (!uvKeys.contains("uniqueVisitors") && d.getBoolean("uniqueVisitors")) {
uvKeys.add("uniqueVisitors");
summary.setUniqueVisitors(summary.getUniqueVisitors() + 1);
}
if (!uvKeys.contains("engaged") && d.getBoolean("engaged")) {
uvKeys.add("engaged");
summary.setEngaged(summary.getEngaged() + 1);
}
// ... 其他行为统计
});
});
});
4. 数据结构定义
4.1 详细统计数据结构
每条详细统计数据包含以下字段:
- postId: 文章ID
- date: 日期
- consumerId: 消费者ID
- sessionId: 会话ID
- fromAd: 是否来自广告
- uniqueVisitors: 是否为唯一访客
- engaged: 是否参与互动
- viewedProduct: 是否查看商品
- clickedAddToCart: 是否点击加入购物车
- clickedBuyNow: 是否点击立即购买
- initiatedCheckout: 是否发起结账
- placedOrder: 是否下单
- addedToCart: 是否加购
- orderQuantity: 订单数量
- device: 设备类型
- platform: 平台
- ui: UI交互数据
4.2 汇总统计数据结构
每条汇总统计数据包含以下字段:
- date: 日期
- postId: 文章ID
- fromAd: 是否来自广告
- uniqueVisitors: 唯一访客数
- engaged: 参与互动人数
- viewedProduct: 查看商品人数
- clickedAddToCart: 点击加入购物车人数
- clickedBuyNow: 点击立即购买人数
- initiatedCheckout: 发起结账人数
- placedOrder: 下单人数
- addedToCart: 加购人数
- orderQuantity: 订单总数
- device: 各设备类型访问次数统计
- platform: 各平台访问次数统计
- ui: UI交互次数统计
5. 处理特点
- 逐日处理:按日期顺序处理数据,确保数据完整性
- 广告溯源:通过多种方式判断流量是否来源于广告,包括直接标记和历史追溯
- 去重统计:在汇总阶段对用户行为进行去重处理,确保统计数据准确性
- 批量操作:使用批量写入提高数据库操作效率
- 时区处理:统一使用洛杉矶时区处理日期,避免跨时区问题