Skip to content

Commit 996dc68

Browse files
authored
tiktok: add query with hopping, tumbling and rolling aggregates (#2414)
Also changes the tiktok generator to simplify it. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent de481f9 commit 996dc68

File tree

8 files changed

+155
-197
lines changed

8 files changed

+155
-197
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
CREATE VIEW q0 AS
2+
SELECT
3+
interaction_id,
4+
count(*),
5+
avg(watch_time)
6+
FROM TABLE(
7+
HOP(
8+
TABLE interactions,
9+
DESCRIPTOR(interaction_date),
10+
INTERVAL '90' MINUTES,
11+
INTERVAL '24' HOURS
12+
)
13+
)
14+
GROUP BY
15+
interaction_id;
16+
17+
CREATE VIEW q1 AS
18+
SELECT
19+
interaction_id,
20+
count(*),
21+
avg(watch_time)
22+
FROM TABLE(
23+
TUMBLE(
24+
TABLE interactions,
25+
DESCRIPTOR(interaction_date),
26+
INTERVAL '90' MINUTES
27+
)
28+
)
29+
GROUP BY
30+
interaction_id;
31+
32+
CREATE VIEW q2 AS
33+
SELECT
34+
user_id,
35+
interaction_type,
36+
count(*) OVER day as interaction_len_d,
37+
avg(watch_time) OVER day as average_watch_time_d,
38+
interaction_date
39+
FROM interactions
40+
WINDOW
41+
day AS (PARTITION BY user_id ORDER BY interaction_date RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW);

benchmark/feldera-sql/benchmarks/tiktok/table.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,5 @@ CREATE TABLE interactions (
66
interaction_type STRING,
77
watch_time INT,
88
interaction_date TIMESTAMP LATENESS INTERVAL 15 MINUTES,
9-
previous_interaction_date TIMESTAMP,
109
interaction_month TIMESTAMP
1110
) WITH ('connectors' = '{interactions}');

demo/project_demo12-HopsworksTikTokRecSys/sql_program.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ def generate_program(transport_cfg, format_cfg):
1111
interaction_type STRING,
1212
watch_time INT,
1313
interaction_date TIMESTAMP LATENESS INTERVAL 15 MINUTES,
14-
previous_interaction_date TIMESTAMP,
1514
interaction_month TIMESTAMP
1615
)"""
1716

Lines changed: 103 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
use chrono::{DateTime, Datelike, Days, TimeDelta, Utc};
1+
use chrono::{DateTime, Datelike, TimeDelta, Utc};
22
use fake::Fake;
33
use rand::seq::SliceRandom;
44
use rand::Rng;
55
use serde::Serialize;
66

7-
use crate::users::User;
8-
use crate::videos::Video;
9-
107
#[derive(Debug, Clone, Serialize)]
118
pub struct Interaction {
129
pub interaction_id: u32,
@@ -18,8 +15,6 @@ pub struct Interaction {
1815
#[serde(serialize_with = "serialize_timestamp")]
1916
pub interaction_date: DateTime<Utc>,
2017
#[serde(serialize_with = "serialize_timestamp")]
21-
pub previous_interaction_date: DateTime<Utc>,
22-
#[serde(serialize_with = "serialize_timestamp")]
2318
pub interaction_month: DateTime<Utc>,
2419
}
2520

@@ -30,12 +25,95 @@ where
3025
serializer.serialize_str(&date.format("%Y-%m-%d %H:%M:%S").to_string())
3126
}
3227

33-
impl Interaction {
34-
pub fn generate(num: u32, users: Vec<User>, videos: Vec<Video>) -> Vec<Interaction> {
35-
let mut now = Utc::now();
28+
#[derive(Debug, Clone)]
29+
pub struct InteractionGenerator {
30+
max_users: u32,
31+
max_videos: u32,
32+
max_interactions: u32,
33+
interaction_id: u32,
34+
start: chrono::DateTime<Utc>,
35+
}
36+
37+
impl Default for InteractionGenerator {
38+
fn default() -> Self {
39+
Self {
40+
max_users: 1_000,
41+
max_videos: 1_000,
42+
max_interactions: 10_000_000,
43+
interaction_id: 0,
44+
start: Utc::now(),
45+
}
46+
}
47+
}
48+
49+
impl InteractionGenerator {
50+
pub fn max_users(mut self, n: u32) -> Self {
51+
self.max_users = n;
52+
self
53+
}
54+
55+
pub fn max_videos(mut self, n: u32) -> Self {
56+
self.max_videos = n;
57+
self
58+
}
59+
60+
pub fn max_interactions(mut self, n: u32) -> Self {
61+
self.max_interactions = n;
62+
self
63+
}
64+
65+
pub fn start_date(mut self, start: DateTime<Utc>) -> Self {
66+
self.start = start;
67+
self
68+
}
69+
70+
pub fn generate(mut self) -> Vec<Interaction> {
3671
let mut rng = rand::thread_rng();
3772

38-
let mut interactions = Vec::with_capacity(num as usize);
73+
let mut interactions = Vec::with_capacity(self.max_interactions as usize);
74+
75+
for _ in 0..self.max_interactions {
76+
let random_delay = chrono::TimeDelta::seconds(rng.gen_range(0..600));
77+
let interaction_date = self.timestamp_generator() - random_delay;
78+
79+
let interaction = Interaction {
80+
interaction_id: self.interaction_id_generator(),
81+
user_id: self.user_id_generator(&mut rng),
82+
video_id: self.video_id_generator(&mut rng),
83+
category_id: self.video_category_generator(&mut rng),
84+
interaction_type: self.interaction_type_generator(&mut rng),
85+
watch_time: self.watch_time_generator(&mut rng),
86+
interaction_month: interaction_date.with_day(1).unwrap(),
87+
interaction_date,
88+
};
89+
90+
interactions.push(interaction);
91+
}
92+
93+
interactions
94+
}
95+
}
96+
97+
impl InteractionGenerator {
98+
fn interaction_id_generator(&mut self) -> u32 {
99+
if self.interaction_id == self.max_interactions {
100+
self.interaction_id = 0;
101+
} else {
102+
self.interaction_id += 1;
103+
}
104+
105+
self.interaction_id
106+
}
107+
108+
fn user_id_generator(&self, rng: &mut impl Rng) -> u32 {
109+
Fake::fake_with_rng(&(0..self.max_videos), rng)
110+
}
111+
112+
fn video_id_generator(&self, rng: &mut impl Rng) -> u32 {
113+
Fake::fake_with_rng(&(0..self.max_videos), rng)
114+
}
115+
116+
fn interaction_type_generator(&self, rng: &mut impl Rng) -> String {
39117
let interaction_types = [
40118
("like", 1.5),
41119
("dislike", 0.2),
@@ -45,44 +123,24 @@ impl Interaction {
45123
("skip", 10.0),
46124
];
47125

48-
for _ in 0..num {
49-
let user = users.choose(&mut rng).unwrap();
50-
let video = videos.choose(&mut rng).unwrap();
51-
52-
let random_delay = chrono::TimeDelta::seconds(rng.gen_range(0..600));
53-
let interaction_date = now + random_delay;
54-
55-
let previous_interaction_date = interaction_date - Days::new(rng.gen_range(0..90));
56-
57-
let mut watch_time = rng.gen_range(1..video.video_length);
58-
59-
let probability_watched_till_end = 1.0 - ((watch_time / video.video_length) as f64);
60-
61-
let watched_till_end = rng.gen_bool(probability_watched_till_end);
126+
interaction_types
127+
.choose_weighted(rng, |item| item.1)
128+
.unwrap()
129+
.0
130+
.to_string()
131+
}
62132

63-
if watched_till_end {
64-
watch_time = video.video_length;
65-
}
133+
fn video_category_generator(&self, rng: &mut impl Rng) -> u8 {
134+
Fake::fake_with_rng(&(1..=11), rng)
135+
}
66136

67-
interactions.push(Interaction {
68-
interaction_id: Fake::fake_with_rng(&(100_000_000..999_999_999), &mut rng),
69-
user_id: user.user_id,
70-
video_id: video.video_id,
71-
category_id: video.category_id,
72-
interaction_type: interaction_types
73-
.choose_weighted(&mut rng, |item| item.1)
74-
.unwrap()
75-
.0
76-
.to_string(),
77-
watch_time,
78-
interaction_date,
79-
previous_interaction_date,
80-
interaction_month: interaction_date.with_day(1).unwrap(),
81-
});
137+
fn watch_time_generator(&self, rng: &mut impl Rng) -> u8 {
138+
Fake::fake_with_rng(&(10..=250), rng)
139+
}
82140

83-
now += TimeDelta::seconds(1);
84-
}
141+
fn timestamp_generator(&mut self) -> DateTime<Utc> {
142+
self.start += TimeDelta::seconds(1);
85143

86-
interactions
144+
self.start
87145
}
88146
}
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,2 @@
1-
pub mod users;
2-
pub mod videos;
31
pub mod interactions;
42
pub mod buffered_topic;

demo/project_demo12-HopsworksTikTokRecSys/tiktok-gen/src/main.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ use rdkafka::{
77
producer::ThreadedProducer,
88
ClientConfig,
99
};
10-
use tiktok_gen::{
11-
buffered_topic::BufferedTopic, interactions::Interaction, users::User, videos::Video,
12-
};
10+
use tiktok_gen::{buffered_topic::BufferedTopic, interactions::InteractionGenerator};
1311

1412
#[derive(Debug, Parser)]
1513
struct GeneratorOptions {
@@ -85,13 +83,17 @@ fn main() -> Result<(), Box<dyn Error>> {
8583

8684
let start = std::time::Instant::now();
8785

88-
let users = User::generate(options.users, options.historical);
89-
println!("Sample users:\n{:#?}", users.get(0..10));
90-
91-
let videos = Video::generate(options.videos, options.historical);
92-
println!("Sample videos:\n{:#?}", videos.get(0..10));
86+
let interactions = InteractionGenerator::default()
87+
.max_users(options.users)
88+
.max_videos(options.videos)
89+
.max_interactions(options.interactions)
90+
.start_date(if options.historical {
91+
chrono::Utc::now() - chrono::TimeDelta::days(630)
92+
} else {
93+
chrono::Utc::now()
94+
})
95+
.generate();
9396

94-
let interactions = Interaction::generate(options.interactions, users, videos);
9597
let took = start.elapsed();
9698

9799
println!("Sample interactions:\n{:#?}", interactions.get(0..10));

demo/project_demo12-HopsworksTikTokRecSys/tiktok-gen/src/users.rs

Lines changed: 0 additions & 61 deletions
This file was deleted.

0 commit comments

Comments
 (0)