Skip to content

Commit 197bf2c

Browse files
author
twitter-team
committed
Open-sourcing Timelines Aggregation Framework
Open sourcing Aggregation Framework, a config-driven Summingbird based framework for generating real-time and batch aggregate features to be consumed by ML models.
1 parent b5e849b commit 197bf2c

File tree

146 files changed

+16429
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

146 files changed

+16429
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Product surfaces at Twitter are built on a shared set of data, models, and softw
2020
| | [topic-social-proof](topic-social-proof/README.md) | Identifies topics related to individual Tweets. |
2121
| Software framework | [navi](navi/README.md) | High performance, machine learning model serving written in Rust. |
2222
| | [product-mixer](product-mixer/README.md) | Software framework for building feeds of content. |
23+
| | [timelines-aggregation-framework](timelines/data_processing/ml_util/aggregation_framework/README.md) | Framework for generating aggregate features in batch or real time.
2324
| | [twml](twml/README.md) | Legacy machine learning framework built on TensorFlow v1. |
2425

2526
The product surface currently included in this repository is the For You Timeline.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.twitter.timelines.prediction.common.aggregates
2+
3+
import com.twitter.ml.api.Feature
4+
import com.twitter.ml.api.FeatureContext
5+
import com.twitter.ml.api.ITransform
6+
import com.twitter.ml.api.constant.SharedFeatures
7+
import java.lang.{Double => JDouble}
8+
9+
import com.twitter.timelines.prediction.common.adapters.AdapterConsumer
10+
import com.twitter.timelines.prediction.common.adapters.EngagementLabelFeaturesDataRecordUtils
11+
import com.twitter.ml.api.DataRecord
12+
import com.twitter.ml.api.RichDataRecord
13+
import com.twitter.timelines.suggests.common.engagement.thriftscala.EngagementType
14+
import com.twitter.timelines.suggests.common.engagement.thriftscala.Engagement
15+
import com.twitter.timelines.prediction.features.common.TimelinesSharedFeatures
16+
import com.twitter.timelines.prediction.features.common.CombinedFeatures
17+
18+
/**
19+
* To transfrom BCE events UUA data records that contain only continuous dwell time to datarecords that contain corresponding binary label features
20+
* The UUA datarecords inputted would have USER_ID, SOURCE_TWEET_ID,TIMESTAMP and
21+
* 0 or one of (TWEET_DETAIL_DWELL_TIME_MS, PROFILE_DWELL_TIME_MS, FULLSCREEN_VIDEO_DWELL_TIME_MS) features.
22+
* We will use the different engagement TIME_MS to differentiate different engagements,
23+
* and then re-use the function in EngagementTypeConverte to add the binary label to the datarecord.
24+
**/
25+
26+
object BCELabelTransformFromUUADataRecord extends ITransform {
27+
28+
val dwellTimeFeatureToEngagementMap = Map(
29+
TimelinesSharedFeatures.TWEET_DETAIL_DWELL_TIME_MS -> EngagementType.TweetDetailDwell,
30+
TimelinesSharedFeatures.PROFILE_DWELL_TIME_MS -> EngagementType.ProfileDwell,
31+
TimelinesSharedFeatures.FULLSCREEN_VIDEO_DWELL_TIME_MS -> EngagementType.FullscreenVideoDwell
32+
)
33+
34+
def dwellFeatureToEngagement(
35+
rdr: RichDataRecord,
36+
dwellTimeFeature: Feature[JDouble],
37+
engagementType: EngagementType
38+
): Option[Engagement] = {
39+
if (rdr.hasFeature(dwellTimeFeature)) {
40+
Some(
41+
Engagement(
42+
engagementType = engagementType,
43+
timestampMs = rdr.getFeatureValue(SharedFeatures.TIMESTAMP),
44+
weight = Some(rdr.getFeatureValue(dwellTimeFeature))
45+
))
46+
} else {
47+
None
48+
}
49+
}
50+
override def transformContext(featureContext: FeatureContext): FeatureContext = {
51+
featureContext.addFeatures(
52+
(CombinedFeatures.TweetDetailDwellEngagements ++ CombinedFeatures.ProfileDwellEngagements ++ CombinedFeatures.FullscreenVideoDwellEngagements).toSeq: _*)
53+
}
54+
override def transform(record: DataRecord): Unit = {
55+
val rdr = new RichDataRecord(record)
56+
val engagements = dwellTimeFeatureToEngagementMap
57+
.map {
58+
case (dwellTimeFeature, engagementType) =>
59+
dwellFeatureToEngagement(rdr, dwellTimeFeature, engagementType)
60+
}.flatten.toSeq
61+
62+
// Re-use BCE( behavior client events) label conversion in EngagementTypeConverter to align with BCE labels generation for offline training data
63+
EngagementLabelFeaturesDataRecordUtils.setDwellTimeFeatures(
64+
rdr,
65+
Some(engagements),
66+
AdapterConsumer.Combined)
67+
}
68+
}

0 commit comments

Comments
 (0)