mas_storage_pg/
app_session.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7//! A module containing PostgreSQL implementation of repositories for sessions
8
9use async_trait::async_trait;
10use mas_data_model::{CompatSession, CompatSessionState, Device, Session, SessionState, User};
11use mas_storage::{
12    Clock, Page, Pagination,
13    app_session::{AppSession, AppSessionFilter, AppSessionRepository, AppSessionState},
14    compat::CompatSessionFilter,
15    oauth2::OAuth2SessionFilter,
16};
17use oauth2_types::scope::{Scope, ScopeToken};
18use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
19use sea_query::{
20    Alias, ColumnRef, CommonTableExpression, Expr, PostgresQueryBuilder, Query, UnionType,
21};
22use sea_query_binder::SqlxBinder;
23use sqlx::PgConnection;
24use tracing::Instrument;
25use ulid::Ulid;
26use uuid::Uuid;
27
28use crate::{
29    DatabaseError, ExecuteExt,
30    errors::DatabaseInconsistencyError,
31    filter::StatementExt,
32    iden::{CompatSessions, OAuth2Sessions},
33    pagination::QueryBuilderExt,
34};
35
36/// An implementation of [`AppSessionRepository`] for a PostgreSQL connection
37pub struct PgAppSessionRepository<'c> {
38    conn: &'c mut PgConnection,
39}
40
41impl<'c> PgAppSessionRepository<'c> {
42    /// Create a new [`PgAppSessionRepository`] from an active PostgreSQL
43    /// connection
44    pub fn new(conn: &'c mut PgConnection) -> Self {
45        Self { conn }
46    }
47}
48
49mod priv_ {
50    // The enum_def macro generates a public enum, which we don't want, because it
51    // triggers the missing docs warning
52
53    use std::net::IpAddr;
54
55    use chrono::{DateTime, Utc};
56    use sea_query::enum_def;
57    use uuid::Uuid;
58
59    #[derive(sqlx::FromRow)]
60    #[enum_def]
61    pub(super) struct AppSessionLookup {
62        pub(super) cursor: Uuid,
63        pub(super) compat_session_id: Option<Uuid>,
64        pub(super) oauth2_session_id: Option<Uuid>,
65        pub(super) oauth2_client_id: Option<Uuid>,
66        pub(super) user_session_id: Option<Uuid>,
67        pub(super) user_id: Option<Uuid>,
68        pub(super) scope_list: Option<Vec<String>>,
69        pub(super) device_id: Option<String>,
70        pub(super) human_name: Option<String>,
71        pub(super) created_at: DateTime<Utc>,
72        pub(super) finished_at: Option<DateTime<Utc>>,
73        pub(super) is_synapse_admin: Option<bool>,
74        pub(super) user_agent: Option<String>,
75        pub(super) last_active_at: Option<DateTime<Utc>>,
76        pub(super) last_active_ip: Option<IpAddr>,
77    }
78}
79
80use priv_::{AppSessionLookup, AppSessionLookupIden};
81
82impl TryFrom<AppSessionLookup> for AppSession {
83    type Error = DatabaseError;
84
85    fn try_from(value: AppSessionLookup) -> Result<Self, Self::Error> {
86        // This is annoying to do, but we have to match on all the fields to determine
87        // whether it's a compat session or an oauth2 session
88        let AppSessionLookup {
89            cursor,
90            compat_session_id,
91            oauth2_session_id,
92            oauth2_client_id,
93            user_session_id,
94            user_id,
95            scope_list,
96            device_id,
97            human_name,
98            created_at,
99            finished_at,
100            is_synapse_admin,
101            user_agent,
102            last_active_at,
103            last_active_ip,
104        } = value;
105
106        let user_session_id = user_session_id.map(Ulid::from);
107
108        match (
109            compat_session_id,
110            oauth2_session_id,
111            oauth2_client_id,
112            user_id,
113            scope_list,
114            device_id,
115            is_synapse_admin,
116        ) {
117            (
118                Some(compat_session_id),
119                None,
120                None,
121                Some(user_id),
122                None,
123                device_id_opt,
124                Some(is_synapse_admin),
125            ) => {
126                let id = compat_session_id.into();
127                let device = device_id_opt
128                    .map(Device::try_from)
129                    .transpose()
130                    .map_err(|e| {
131                        DatabaseInconsistencyError::on("compat_sessions")
132                            .column("device_id")
133                            .row(id)
134                            .source(e)
135                    })?;
136
137                let state = match finished_at {
138                    None => CompatSessionState::Valid,
139                    Some(finished_at) => CompatSessionState::Finished { finished_at },
140                };
141
142                let session = CompatSession {
143                    id,
144                    state,
145                    user_id: user_id.into(),
146                    device,
147                    human_name,
148                    user_session_id,
149                    created_at,
150                    is_synapse_admin,
151                    user_agent,
152                    last_active_at,
153                    last_active_ip,
154                };
155
156                Ok(AppSession::Compat(Box::new(session)))
157            }
158
159            (
160                None,
161                Some(oauth2_session_id),
162                Some(oauth2_client_id),
163                user_id,
164                Some(scope_list),
165                None,
166                None,
167            ) => {
168                let id = oauth2_session_id.into();
169                let scope: Result<Scope, _> =
170                    scope_list.iter().map(|s| s.parse::<ScopeToken>()).collect();
171                let scope = scope.map_err(|e| {
172                    DatabaseInconsistencyError::on("oauth2_sessions")
173                        .column("scope")
174                        .row(id)
175                        .source(e)
176                })?;
177
178                let state = match value.finished_at {
179                    None => SessionState::Valid,
180                    Some(finished_at) => SessionState::Finished { finished_at },
181                };
182
183                let session = Session {
184                    id,
185                    state,
186                    created_at,
187                    client_id: oauth2_client_id.into(),
188                    user_id: user_id.map(Ulid::from),
189                    user_session_id,
190                    scope,
191                    user_agent,
192                    last_active_at,
193                    last_active_ip,
194                    human_name,
195                };
196
197                Ok(AppSession::OAuth2(Box::new(session)))
198            }
199
200            _ => Err(DatabaseInconsistencyError::on("sessions")
201                .row(cursor.into())
202                .into()),
203        }
204    }
205}
206
207/// Split a [`AppSessionFilter`] into two separate filters: a
208/// [`CompatSessionFilter`] and an [`OAuth2SessionFilter`].
209fn split_filter(
210    filter: AppSessionFilter<'_>,
211) -> (CompatSessionFilter<'_>, OAuth2SessionFilter<'_>) {
212    let mut compat_filter = CompatSessionFilter::new();
213    let mut oauth2_filter = OAuth2SessionFilter::new();
214
215    if let Some(user) = filter.user() {
216        compat_filter = compat_filter.for_user(user);
217        oauth2_filter = oauth2_filter.for_user(user);
218    }
219
220    match filter.state() {
221        Some(AppSessionState::Active) => {
222            compat_filter = compat_filter.active_only();
223            oauth2_filter = oauth2_filter.active_only();
224        }
225        Some(AppSessionState::Finished) => {
226            compat_filter = compat_filter.finished_only();
227            oauth2_filter = oauth2_filter.finished_only();
228        }
229        None => {}
230    }
231
232    if let Some(device) = filter.device() {
233        compat_filter = compat_filter.for_device(device);
234        oauth2_filter = oauth2_filter.for_device(device);
235    }
236
237    if let Some(browser_session) = filter.browser_session() {
238        compat_filter = compat_filter.for_browser_session(browser_session);
239        oauth2_filter = oauth2_filter.for_browser_session(browser_session);
240    }
241
242    if let Some(last_active_before) = filter.last_active_before() {
243        compat_filter = compat_filter.with_last_active_before(last_active_before);
244        oauth2_filter = oauth2_filter.with_last_active_before(last_active_before);
245    }
246
247    if let Some(last_active_after) = filter.last_active_after() {
248        compat_filter = compat_filter.with_last_active_after(last_active_after);
249        oauth2_filter = oauth2_filter.with_last_active_after(last_active_after);
250    }
251
252    (compat_filter, oauth2_filter)
253}
254
255#[async_trait]
256impl AppSessionRepository for PgAppSessionRepository<'_> {
257    type Error = DatabaseError;
258
259    #[tracing::instrument(
260        name = "db.app_session.list",
261        fields(
262            db.query.text,
263        ),
264        skip_all,
265        err,
266    )]
267    async fn list(
268        &mut self,
269        filter: AppSessionFilter<'_>,
270        pagination: Pagination,
271    ) -> Result<Page<AppSession>, Self::Error> {
272        let (compat_filter, oauth2_filter) = split_filter(filter);
273
274        let mut oauth2_session_select = Query::select()
275            .expr_as(
276                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
277                AppSessionLookupIden::Cursor,
278            )
279            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::CompatSessionId)
280            .expr_as(
281                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
282                AppSessionLookupIden::Oauth2SessionId,
283            )
284            .expr_as(
285                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)),
286                AppSessionLookupIden::Oauth2ClientId,
287            )
288            .expr_as(
289                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserSessionId)),
290                AppSessionLookupIden::UserSessionId,
291            )
292            .expr_as(
293                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)),
294                AppSessionLookupIden::UserId,
295            )
296            .expr_as(
297                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)),
298                AppSessionLookupIden::ScopeList,
299            )
300            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
301            .expr_as(
302                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::HumanName)),
303                AppSessionLookupIden::HumanName,
304            )
305            .expr_as(
306                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
307                AppSessionLookupIden::CreatedAt,
308            )
309            .expr_as(
310                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::FinishedAt)),
311                AppSessionLookupIden::FinishedAt,
312            )
313            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::IsSynapseAdmin)
314            .expr_as(
315                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserAgent)),
316                AppSessionLookupIden::UserAgent,
317            )
318            .expr_as(
319                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)),
320                AppSessionLookupIden::LastActiveAt,
321            )
322            .expr_as(
323                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveIp)),
324                AppSessionLookupIden::LastActiveIp,
325            )
326            .from(OAuth2Sessions::Table)
327            .apply_filter(oauth2_filter)
328            .clone();
329
330        let compat_session_select = Query::select()
331            .expr_as(
332                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
333                AppSessionLookupIden::Cursor,
334            )
335            .expr_as(
336                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
337                AppSessionLookupIden::CompatSessionId,
338            )
339            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2SessionId)
340            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2ClientId)
341            .expr_as(
342                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
343                AppSessionLookupIden::UserSessionId,
344            )
345            .expr_as(
346                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
347                AppSessionLookupIden::UserId,
348            )
349            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::ScopeList)
350            .expr_as(
351                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
352                AppSessionLookupIden::DeviceId,
353            )
354            .expr_as(
355                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
356                AppSessionLookupIden::HumanName,
357            )
358            .expr_as(
359                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
360                AppSessionLookupIden::CreatedAt,
361            )
362            .expr_as(
363                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
364                AppSessionLookupIden::FinishedAt,
365            )
366            .expr_as(
367                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
368                AppSessionLookupIden::IsSynapseAdmin,
369            )
370            .expr_as(
371                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
372                AppSessionLookupIden::UserAgent,
373            )
374            .expr_as(
375                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
376                AppSessionLookupIden::LastActiveAt,
377            )
378            .expr_as(
379                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
380                AppSessionLookupIden::LastActiveIp,
381            )
382            .from(CompatSessions::Table)
383            .apply_filter(compat_filter)
384            .clone();
385
386        let common_table_expression = CommonTableExpression::new()
387            .query(
388                oauth2_session_select
389                    .union(UnionType::All, compat_session_select)
390                    .clone(),
391            )
392            .table_name(Alias::new("sessions"))
393            .clone();
394
395        let with_clause = Query::with().cte(common_table_expression).clone();
396
397        let select = Query::select()
398            .column(ColumnRef::Asterisk)
399            .from(Alias::new("sessions"))
400            .generate_pagination(AppSessionLookupIden::Cursor, pagination)
401            .clone();
402
403        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
404
405        let edges: Vec<AppSessionLookup> = sqlx::query_as_with(&sql, arguments)
406            .traced()
407            .fetch_all(&mut *self.conn)
408            .await?;
409
410        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
411
412        Ok(page)
413    }
414
415    #[tracing::instrument(
416        name = "db.app_session.count",
417        fields(
418            db.query.text,
419        ),
420        skip_all,
421        err,
422    )]
423    async fn count(&mut self, filter: AppSessionFilter<'_>) -> Result<usize, Self::Error> {
424        let (compat_filter, oauth2_filter) = split_filter(filter);
425        let mut oauth2_session_select = Query::select()
426            .expr(Expr::cust("1"))
427            .from(OAuth2Sessions::Table)
428            .apply_filter(oauth2_filter)
429            .clone();
430
431        let compat_session_select = Query::select()
432            .expr(Expr::cust("1"))
433            .from(CompatSessions::Table)
434            .apply_filter(compat_filter)
435            .clone();
436
437        let common_table_expression = CommonTableExpression::new()
438            .query(
439                oauth2_session_select
440                    .union(UnionType::All, compat_session_select)
441                    .clone(),
442            )
443            .table_name(Alias::new("sessions"))
444            .clone();
445
446        let with_clause = Query::with().cte(common_table_expression).clone();
447
448        let select = Query::select()
449            .expr(Expr::cust("COUNT(*)"))
450            .from(Alias::new("sessions"))
451            .clone();
452
453        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
454
455        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
456            .traced()
457            .fetch_one(&mut *self.conn)
458            .await?;
459
460        count
461            .try_into()
462            .map_err(DatabaseError::to_invalid_operation)
463    }
464
465    #[tracing::instrument(
466        name = "db.app_session.finish_sessions_to_replace_device",
467        fields(
468            db.query.text,
469            %user.id,
470            %device_id = device.as_str()
471        ),
472        skip_all,
473        err,
474    )]
475    async fn finish_sessions_to_replace_device(
476        &mut self,
477        clock: &dyn Clock,
478        user: &User,
479        device: &Device,
480    ) -> Result<(), Self::Error> {
481        // TODO need to invoke this from all the oauth2 login sites
482        let span = tracing::info_span!(
483            "db.app_session.finish_sessions_to_replace_device.compat_sessions",
484            { DB_QUERY_TEXT } = tracing::field::Empty,
485        );
486        let finished_at = clock.now();
487        sqlx::query!(
488            "
489                UPDATE compat_sessions SET finished_at = $3 WHERE user_id = $1 AND device_id = $2 AND finished_at IS NULL
490            ",
491            Uuid::from(user.id),
492            device.as_str(),
493            finished_at
494        )
495        .record(&span)
496        .execute(&mut *self.conn)
497        .instrument(span)
498        .await?;
499
500        if let Ok([stable_device_as_scope_token, unstable_device_as_scope_token]) =
501            device.to_scope_token()
502        {
503            let span = tracing::info_span!(
504                "db.app_session.finish_sessions_to_replace_device.oauth2_sessions",
505                { DB_QUERY_TEXT } = tracing::field::Empty,
506            );
507            sqlx::query!(
508                "
509                    UPDATE oauth2_sessions
510                    SET finished_at = $4
511                    WHERE user_id = $1
512                      AND ($2 = ANY(scope_list) OR $3 = ANY(scope_list))
513                      AND finished_at IS NULL
514                ",
515                Uuid::from(user.id),
516                stable_device_as_scope_token.as_str(),
517                unstable_device_as_scope_token.as_str(),
518                finished_at
519            )
520            .record(&span)
521            .execute(&mut *self.conn)
522            .instrument(span)
523            .await?;
524        }
525
526        Ok(())
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use chrono::Duration;
533    use mas_data_model::Device;
534    use mas_storage::{
535        Pagination, RepositoryAccess,
536        app_session::{AppSession, AppSessionFilter},
537        clock::MockClock,
538        oauth2::OAuth2SessionRepository,
539    };
540    use oauth2_types::{
541        requests::GrantType,
542        scope::{OPENID, Scope},
543    };
544    use rand::SeedableRng;
545    use rand_chacha::ChaChaRng;
546    use sqlx::PgPool;
547
548    use crate::PgRepository;
549
550    #[sqlx::test(migrator = "crate::MIGRATOR")]
551    async fn test_app_repo(pool: PgPool) {
552        let mut rng = ChaChaRng::seed_from_u64(42);
553        let clock = MockClock::default();
554        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
555
556        // Create a user
557        let user = repo
558            .user()
559            .add(&mut rng, &clock, "john".to_owned())
560            .await
561            .unwrap();
562
563        let all = AppSessionFilter::new().for_user(&user);
564        let active = all.active_only();
565        let finished = all.finished_only();
566        let pagination = Pagination::first(10);
567
568        assert_eq!(repo.app_session().count(all).await.unwrap(), 0);
569        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
570        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
571
572        let full_list = repo.app_session().list(all, pagination).await.unwrap();
573        assert!(full_list.edges.is_empty());
574        let active_list = repo.app_session().list(active, pagination).await.unwrap();
575        assert!(active_list.edges.is_empty());
576        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
577        assert!(finished_list.edges.is_empty());
578
579        // Start a compat session for that user
580        let device = Device::generate(&mut rng);
581        let compat_session = repo
582            .compat_session()
583            .add(&mut rng, &clock, &user, device.clone(), None, false, None)
584            .await
585            .unwrap();
586
587        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
588        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
589        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
590
591        let full_list = repo.app_session().list(all, pagination).await.unwrap();
592        assert_eq!(full_list.edges.len(), 1);
593        assert_eq!(
594            full_list.edges[0],
595            AppSession::Compat(Box::new(compat_session.clone()))
596        );
597        let active_list = repo.app_session().list(active, pagination).await.unwrap();
598        assert_eq!(active_list.edges.len(), 1);
599        assert_eq!(
600            active_list.edges[0],
601            AppSession::Compat(Box::new(compat_session.clone()))
602        );
603        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
604        assert!(finished_list.edges.is_empty());
605
606        // Finish the session
607        let compat_session = repo
608            .compat_session()
609            .finish(&clock, compat_session)
610            .await
611            .unwrap();
612
613        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
614        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
615        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
616
617        let full_list = repo.app_session().list(all, pagination).await.unwrap();
618        assert_eq!(full_list.edges.len(), 1);
619        assert_eq!(
620            full_list.edges[0],
621            AppSession::Compat(Box::new(compat_session.clone()))
622        );
623        let active_list = repo.app_session().list(active, pagination).await.unwrap();
624        assert!(active_list.edges.is_empty());
625        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
626        assert_eq!(finished_list.edges.len(), 1);
627        assert_eq!(
628            finished_list.edges[0],
629            AppSession::Compat(Box::new(compat_session.clone()))
630        );
631
632        // Start an OAuth2 session
633        let client = repo
634            .oauth2_client()
635            .add(
636                &mut rng,
637                &clock,
638                vec!["https://example.com/redirect".parse().unwrap()],
639                None,
640                None,
641                None,
642                vec![GrantType::AuthorizationCode],
643                Some("First client".to_owned()),
644                Some("https://example.com/logo.png".parse().unwrap()),
645                Some("https://example.com/".parse().unwrap()),
646                Some("https://example.com/policy".parse().unwrap()),
647                Some("https://example.com/tos".parse().unwrap()),
648                Some("https://example.com/jwks.json".parse().unwrap()),
649                None,
650                None,
651                None,
652                None,
653                None,
654                Some("https://example.com/login".parse().unwrap()),
655            )
656            .await
657            .unwrap();
658
659        let device2 = Device::generate(&mut rng);
660        let scope: Scope = [OPENID]
661            .into_iter()
662            .chain(device2.to_scope_token().unwrap().into_iter())
663            .collect();
664
665        // We're moving the clock forward by 1 minute between each session to ensure
666        // we're getting consistent ordering in lists.
667        clock.advance(Duration::try_minutes(1).unwrap());
668
669        let oauth_session = repo
670            .oauth2_session()
671            .add(&mut rng, &clock, &client, Some(&user), None, scope)
672            .await
673            .unwrap();
674
675        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
676        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
677        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
678
679        let full_list = repo.app_session().list(all, pagination).await.unwrap();
680        assert_eq!(full_list.edges.len(), 2);
681        assert_eq!(
682            full_list.edges[0],
683            AppSession::Compat(Box::new(compat_session.clone()))
684        );
685        assert_eq!(
686            full_list.edges[1],
687            AppSession::OAuth2(Box::new(oauth_session.clone()))
688        );
689
690        let active_list = repo.app_session().list(active, pagination).await.unwrap();
691        assert_eq!(active_list.edges.len(), 1);
692        assert_eq!(
693            active_list.edges[0],
694            AppSession::OAuth2(Box::new(oauth_session.clone()))
695        );
696
697        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
698        assert_eq!(finished_list.edges.len(), 1);
699        assert_eq!(
700            finished_list.edges[0],
701            AppSession::Compat(Box::new(compat_session.clone()))
702        );
703
704        // Finish the session
705        let oauth_session = repo
706            .oauth2_session()
707            .finish(&clock, oauth_session)
708            .await
709            .unwrap();
710
711        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
712        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
713        assert_eq!(repo.app_session().count(finished).await.unwrap(), 2);
714
715        let full_list = repo.app_session().list(all, pagination).await.unwrap();
716        assert_eq!(full_list.edges.len(), 2);
717        assert_eq!(
718            full_list.edges[0],
719            AppSession::Compat(Box::new(compat_session.clone()))
720        );
721        assert_eq!(
722            full_list.edges[1],
723            AppSession::OAuth2(Box::new(oauth_session.clone()))
724        );
725
726        let active_list = repo.app_session().list(active, pagination).await.unwrap();
727        assert!(active_list.edges.is_empty());
728
729        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
730        assert_eq!(finished_list.edges.len(), 2);
731        assert_eq!(
732            finished_list.edges[0],
733            AppSession::Compat(Box::new(compat_session.clone()))
734        );
735        assert_eq!(
736            full_list.edges[1],
737            AppSession::OAuth2(Box::new(oauth_session.clone()))
738        );
739
740        // Query by device
741        let filter = AppSessionFilter::new().for_device(&device);
742        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
743        let list = repo.app_session().list(filter, pagination).await.unwrap();
744        assert_eq!(list.edges.len(), 1);
745        assert_eq!(
746            list.edges[0],
747            AppSession::Compat(Box::new(compat_session.clone()))
748        );
749
750        let filter = AppSessionFilter::new().for_device(&device2);
751        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
752        let list = repo.app_session().list(filter, pagination).await.unwrap();
753        assert_eq!(list.edges.len(), 1);
754        assert_eq!(
755            list.edges[0],
756            AppSession::OAuth2(Box::new(oauth_session.clone()))
757        );
758
759        // Create a second user
760        let user2 = repo
761            .user()
762            .add(&mut rng, &clock, "alice".to_owned())
763            .await
764            .unwrap();
765
766        // If we list/count for this user, we should get nothing
767        let filter = AppSessionFilter::new().for_user(&user2);
768        assert_eq!(repo.app_session().count(filter).await.unwrap(), 0);
769        let list = repo.app_session().list(filter, pagination).await.unwrap();
770        assert!(list.edges.is_empty());
771    }
772}