Skip to content

Commit 1230cac

Browse files
author
Kacper Sawicki
authored
feat(scaletest): extend notifications runner with smtp support (coder#20222)
This PR extends the scaletest notification runner with SMTP support. If the `--smtp-api-url` flag is provided, the runner will also watch for SMTP notifications using the specified URL. #### Changes - Added a new watcher to retrieve emails sent to the runner user - Tracked WebSocket and SMTP latencies separately - Updated metrics to include `notification_id` and `notification_type` labels #### CLI Flags - `--smtp-api-url`: Address of the SMTP mock HTTP API used to retrieve email notifications #### Metrics - `notification_delivery_latency_seconds` now includes: - `notification_id` - `notification_type` (`websocket` or `smtp`)
1 parent 7bbeef4 commit 1230cac

6 files changed

Lines changed: 845 additions & 461 deletions

File tree

cli/exp_scaletest.go

Lines changed: 11 additions & 324 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929

3030
"github.com/coder/coder/v2/cli/cliui"
3131
"github.com/coder/coder/v2/coderd/httpapi"
32-
notificationsLib "github.com/coder/coder/v2/coderd/notifications"
3332
"github.com/coder/coder/v2/coderd/tracing"
3433
"github.com/coder/coder/v2/codersdk"
3534
"github.com/coder/coder/v2/codersdk/workspacesdk"
@@ -40,7 +39,6 @@ import (
4039
"github.com/coder/coder/v2/scaletest/dashboard"
4140
"github.com/coder/coder/v2/scaletest/harness"
4241
"github.com/coder/coder/v2/scaletest/loadtestutil"
43-
"github.com/coder/coder/v2/scaletest/notifications"
4442
"github.com/coder/coder/v2/scaletest/reconnectingpty"
4543
"github.com/coder/coder/v2/scaletest/workspacebuild"
4644
"github.com/coder/coder/v2/scaletest/workspacetraffic"
@@ -1922,259 +1920,6 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command {
19221920
return cmd
19231921
}
19241922

1925-
func (r *RootCmd) scaletestNotifications() *serpent.Command {
1926-
var (
1927-
userCount int64
1928-
ownerUserPercentage float64
1929-
notificationTimeout time.Duration
1930-
dialTimeout time.Duration
1931-
noCleanup bool
1932-
1933-
tracingFlags = &scaletestTracingFlags{}
1934-
1935-
// This test requires unlimited concurrency.
1936-
timeoutStrategy = &timeoutFlags{}
1937-
cleanupStrategy = newScaletestCleanupStrategy()
1938-
output = &scaletestOutputFlags{}
1939-
prometheusFlags = &scaletestPrometheusFlags{}
1940-
)
1941-
1942-
cmd := &serpent.Command{
1943-
Use: "notifications",
1944-
Short: "Simulate notification delivery by creating many users listening to notifications.",
1945-
Handler: func(inv *serpent.Invocation) error {
1946-
ctx := inv.Context()
1947-
client, err := r.InitClient(inv)
1948-
if err != nil {
1949-
return err
1950-
}
1951-
1952-
notifyCtx, stop := signal.NotifyContext(ctx, StopSignals...)
1953-
defer stop()
1954-
ctx = notifyCtx
1955-
1956-
me, err := requireAdmin(ctx, client)
1957-
if err != nil {
1958-
return err
1959-
}
1960-
1961-
client.HTTPClient = &http.Client{
1962-
Transport: &codersdk.HeaderTransport{
1963-
Transport: http.DefaultTransport,
1964-
Header: map[string][]string{
1965-
codersdk.BypassRatelimitHeader: {"true"},
1966-
},
1967-
},
1968-
}
1969-
1970-
if userCount <= 0 {
1971-
return xerrors.Errorf("--user-count must be greater than 0")
1972-
}
1973-
1974-
if ownerUserPercentage < 0 || ownerUserPercentage > 100 {
1975-
return xerrors.Errorf("--owner-user-percentage must be between 0 and 100")
1976-
}
1977-
1978-
ownerUserCount := int64(float64(userCount) * ownerUserPercentage / 100)
1979-
if ownerUserCount == 0 && ownerUserPercentage > 0 {
1980-
ownerUserCount = 1
1981-
}
1982-
regularUserCount := userCount - ownerUserCount
1983-
1984-
_, _ = fmt.Fprintf(inv.Stderr, "Distribution plan:\n")
1985-
_, _ = fmt.Fprintf(inv.Stderr, " Total users: %d\n", userCount)
1986-
_, _ = fmt.Fprintf(inv.Stderr, " Owner users: %d (%.1f%%)\n", ownerUserCount, ownerUserPercentage)
1987-
_, _ = fmt.Fprintf(inv.Stderr, " Regular users: %d (%.1f%%)\n", regularUserCount, 100.0-ownerUserPercentage)
1988-
1989-
outputs, err := output.parse()
1990-
if err != nil {
1991-
return xerrors.Errorf("could not parse --output flags")
1992-
}
1993-
1994-
tracerProvider, closeTracing, tracingEnabled, err := tracingFlags.provider(ctx)
1995-
if err != nil {
1996-
return xerrors.Errorf("create tracer provider: %w", err)
1997-
}
1998-
tracer := tracerProvider.Tracer(scaletestTracerName)
1999-
2000-
reg := prometheus.NewRegistry()
2001-
metrics := notifications.NewMetrics(reg)
2002-
2003-
logger := inv.Logger
2004-
prometheusSrvClose := ServeHandler(ctx, logger, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), prometheusFlags.Address, "prometheus")
2005-
defer prometheusSrvClose()
2006-
2007-
defer func() {
2008-
_, _ = fmt.Fprintln(inv.Stderr, "\nUploading traces...")
2009-
if err := closeTracing(ctx); err != nil {
2010-
_, _ = fmt.Fprintf(inv.Stderr, "\nError uploading traces: %+v\n", err)
2011-
}
2012-
// Wait for prometheus metrics to be scraped
2013-
_, _ = fmt.Fprintf(inv.Stderr, "Waiting %s for prometheus metrics to be scraped\n", prometheusFlags.Wait)
2014-
<-time.After(prometheusFlags.Wait)
2015-
}()
2016-
2017-
_, _ = fmt.Fprintln(inv.Stderr, "Creating users...")
2018-
2019-
dialBarrier := &sync.WaitGroup{}
2020-
ownerWatchBarrier := &sync.WaitGroup{}
2021-
dialBarrier.Add(int(userCount))
2022-
ownerWatchBarrier.Add(int(ownerUserCount))
2023-
2024-
expectedNotifications := map[uuid.UUID]chan time.Time{
2025-
notificationsLib.TemplateUserAccountCreated: make(chan time.Time, 1),
2026-
notificationsLib.TemplateUserAccountDeleted: make(chan time.Time, 1),
2027-
}
2028-
2029-
configs := make([]notifications.Config, 0, userCount)
2030-
for range ownerUserCount {
2031-
config := notifications.Config{
2032-
User: createusers.Config{
2033-
OrganizationID: me.OrganizationIDs[0],
2034-
},
2035-
Roles: []string{codersdk.RoleOwner},
2036-
NotificationTimeout: notificationTimeout,
2037-
DialTimeout: dialTimeout,
2038-
DialBarrier: dialBarrier,
2039-
ReceivingWatchBarrier: ownerWatchBarrier,
2040-
ExpectedNotifications: expectedNotifications,
2041-
Metrics: metrics,
2042-
}
2043-
if err := config.Validate(); err != nil {
2044-
return xerrors.Errorf("validate config: %w", err)
2045-
}
2046-
configs = append(configs, config)
2047-
}
2048-
for range regularUserCount {
2049-
config := notifications.Config{
2050-
User: createusers.Config{
2051-
OrganizationID: me.OrganizationIDs[0],
2052-
},
2053-
Roles: []string{},
2054-
NotificationTimeout: notificationTimeout,
2055-
DialTimeout: dialTimeout,
2056-
DialBarrier: dialBarrier,
2057-
ReceivingWatchBarrier: ownerWatchBarrier,
2058-
Metrics: metrics,
2059-
}
2060-
if err := config.Validate(); err != nil {
2061-
return xerrors.Errorf("validate config: %w", err)
2062-
}
2063-
configs = append(configs, config)
2064-
}
2065-
2066-
go triggerUserNotifications(
2067-
ctx,
2068-
logger,
2069-
client,
2070-
me.OrganizationIDs[0],
2071-
dialBarrier,
2072-
dialTimeout,
2073-
expectedNotifications,
2074-
)
2075-
2076-
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
2077-
2078-
for i, config := range configs {
2079-
id := strconv.Itoa(i)
2080-
name := fmt.Sprintf("notifications-%s", id)
2081-
var runner harness.Runnable = notifications.NewRunner(client, config)
2082-
if tracingEnabled {
2083-
runner = &runnableTraceWrapper{
2084-
tracer: tracer,
2085-
spanName: name,
2086-
runner: runner,
2087-
}
2088-
}
2089-
2090-
th.AddRun(name, id, runner)
2091-
}
2092-
2093-
_, _ = fmt.Fprintln(inv.Stderr, "Running notification delivery scaletest...")
2094-
testCtx, testCancel := timeoutStrategy.toContext(ctx)
2095-
defer testCancel()
2096-
err = th.Run(testCtx)
2097-
if err != nil {
2098-
return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err)
2099-
}
2100-
2101-
// If the command was interrupted, skip stats.
2102-
if notifyCtx.Err() != nil {
2103-
return notifyCtx.Err()
2104-
}
2105-
2106-
res := th.Results()
2107-
for _, o := range outputs {
2108-
err = o.write(res, inv.Stdout)
2109-
if err != nil {
2110-
return xerrors.Errorf("write output %q to %q: %w", o.format, o.path, err)
2111-
}
2112-
}
2113-
2114-
if !noCleanup {
2115-
_, _ = fmt.Fprintln(inv.Stderr, "\nCleaning up...")
2116-
cleanupCtx, cleanupCancel := cleanupStrategy.toContext(ctx)
2117-
defer cleanupCancel()
2118-
err = th.Cleanup(cleanupCtx)
2119-
if err != nil {
2120-
return xerrors.Errorf("cleanup tests: %w", err)
2121-
}
2122-
}
2123-
2124-
if res.TotalFail > 0 {
2125-
return xerrors.New("load test failed, see above for more details")
2126-
}
2127-
2128-
return nil
2129-
},
2130-
}
2131-
2132-
cmd.Options = serpent.OptionSet{
2133-
{
2134-
Flag: "user-count",
2135-
FlagShorthand: "c",
2136-
Env: "CODER_SCALETEST_NOTIFICATION_USER_COUNT",
2137-
Description: "Required: Total number of users to create.",
2138-
Value: serpent.Int64Of(&userCount),
2139-
Required: true,
2140-
},
2141-
{
2142-
Flag: "owner-user-percentage",
2143-
Env: "CODER_SCALETEST_NOTIFICATION_OWNER_USER_PERCENTAGE",
2144-
Default: "20.0",
2145-
Description: "Percentage of users to assign Owner role to (0-100).",
2146-
Value: serpent.Float64Of(&ownerUserPercentage),
2147-
},
2148-
{
2149-
Flag: "notification-timeout",
2150-
Env: "CODER_SCALETEST_NOTIFICATION_TIMEOUT",
2151-
Default: "5m",
2152-
Description: "How long to wait for notifications after triggering.",
2153-
Value: serpent.DurationOf(&notificationTimeout),
2154-
},
2155-
{
2156-
Flag: "dial-timeout",
2157-
Env: "CODER_SCALETEST_DIAL_TIMEOUT",
2158-
Default: "2m",
2159-
Description: "Timeout for dialing the notification websocket endpoint.",
2160-
Value: serpent.DurationOf(&dialTimeout),
2161-
},
2162-
{
2163-
Flag: "no-cleanup",
2164-
Env: "CODER_SCALETEST_NO_CLEANUP",
2165-
Description: "Do not clean up resources after the test completes.",
2166-
Value: serpent.BoolOf(&noCleanup),
2167-
},
2168-
}
2169-
2170-
tracingFlags.attach(&cmd.Options)
2171-
timeoutStrategy.attach(&cmd.Options)
2172-
cleanupStrategy.attach(&cmd.Options)
2173-
output.attach(&cmd.Options)
2174-
prometheusFlags.attach(&cmd.Options)
2175-
return cmd
2176-
}
2177-
21781923
type runnableTraceWrapper struct {
21791924
tracer trace.Tracer
21801925
spanName string
@@ -2184,8 +1929,9 @@ type runnableTraceWrapper struct {
21841929
}
21851930

21861931
var (
2187-
_ harness.Runnable = &runnableTraceWrapper{}
2188-
_ harness.Cleanable = &runnableTraceWrapper{}
1932+
_ harness.Runnable = &runnableTraceWrapper{}
1933+
_ harness.Cleanable = &runnableTraceWrapper{}
1934+
_ harness.Collectable = &runnableTraceWrapper{}
21891935
)
21901936

21911937
func (r *runnableTraceWrapper) Run(ctx context.Context, id string, logs io.Writer) error {
@@ -2227,6 +1973,14 @@ func (r *runnableTraceWrapper) Cleanup(ctx context.Context, id string, logs io.W
22271973
return c.Cleanup(ctx, id, logs)
22281974
}
22291975

1976+
func (r *runnableTraceWrapper) GetMetrics() map[string]any {
1977+
c, ok := r.runner.(harness.Collectable)
1978+
if !ok {
1979+
return nil
1980+
}
1981+
return c.GetMetrics()
1982+
}
1983+
22301984
func getScaletestWorkspaces(ctx context.Context, client *codersdk.Client, owner, template string) ([]codersdk.Workspace, int, error) {
22311985
var (
22321986
pageNumber = 0
@@ -2375,73 +2129,6 @@ func parseTargetRange(name, targets string) (start, end int, err error) {
23752129
return start, end, nil
23762130
}
23772131

2378-
// triggerUserNotifications waits for all test users to connect,
2379-
// then creates and deletes a test user to trigger notification events for testing.
2380-
func triggerUserNotifications(
2381-
ctx context.Context,
2382-
logger slog.Logger,
2383-
client *codersdk.Client,
2384-
orgID uuid.UUID,
2385-
dialBarrier *sync.WaitGroup,
2386-
dialTimeout time.Duration,
2387-
expectedNotifications map[uuid.UUID]chan time.Time,
2388-
) {
2389-
logger.Info(ctx, "waiting for all users to connect")
2390-
2391-
// Wait for all users to connect
2392-
waitCtx, cancel := context.WithTimeout(ctx, dialTimeout+30*time.Second)
2393-
defer cancel()
2394-
2395-
done := make(chan struct{})
2396-
go func() {
2397-
dialBarrier.Wait()
2398-
close(done)
2399-
}()
2400-
2401-
select {
2402-
case <-done:
2403-
logger.Info(ctx, "all users connected")
2404-
case <-waitCtx.Done():
2405-
if waitCtx.Err() == context.DeadlineExceeded {
2406-
logger.Error(ctx, "timeout waiting for users to connect")
2407-
} else {
2408-
logger.Info(ctx, "context canceled while waiting for users")
2409-
}
2410-
return
2411-
}
2412-
2413-
const (
2414-
triggerUsername = "scaletest-trigger-user"
2415-
triggerEmail = "scaletest-trigger@example.com"
2416-
)
2417-
2418-
logger.Info(ctx, "creating test user to test notifications",
2419-
slog.F("username", triggerUsername),
2420-
slog.F("email", triggerEmail),
2421-
slog.F("org_id", orgID))
2422-
2423-
testUser, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
2424-
OrganizationIDs: []uuid.UUID{orgID},
2425-
Username: triggerUsername,
2426-
Email: triggerEmail,
2427-
Password: "test-password-123",
2428-
})
2429-
if err != nil {
2430-
logger.Error(ctx, "create test user", slog.Error(err))
2431-
return
2432-
}
2433-
expectedNotifications[notificationsLib.TemplateUserAccountCreated] <- time.Now()
2434-
2435-
err = client.DeleteUser(ctx, testUser.ID)
2436-
if err != nil {
2437-
logger.Error(ctx, "delete test user", slog.Error(err))
2438-
return
2439-
}
2440-
expectedNotifications[notificationsLib.TemplateUserAccountDeleted] <- time.Now()
2441-
close(expectedNotifications[notificationsLib.TemplateUserAccountCreated])
2442-
close(expectedNotifications[notificationsLib.TemplateUserAccountDeleted])
2443-
}
2444-
24452132
func createWorkspaceAppConfig(client *codersdk.Client, appHost, app string, workspace codersdk.Workspace, agent codersdk.WorkspaceAgent) (workspacetraffic.AppConfig, error) {
24462133
if app == "" {
24472134
return workspacetraffic.AppConfig{}, nil

0 commit comments

Comments
 (0)