feat: add a dependency management graph for agents#20208
Conversation
mafredri
left a comment
There was a problem hiding this comment.
Nice documentation and starting point. 👍🏻
The DependencyCoordinator seems premature as it's currently just a thin wrapper that exposes the underlying graph. Consider:
- Not embedding
Graphdirectly (breaks encapsulation) - Using domain-specific terminology (e.g.,
AddDependencyinstead ofAddEdge) - Adding validation at this layer (cycles, self-references)
This would make the business logic clearer and easier to validate.
… multiple other units
| } | ||
| }) | ||
|
|
||
| t.Run("ConcurrentReadWrite", func(t *testing.T) { |
There was a problem hiding this comment.
I think we could potentially just keep this concurrency test and drop the others.
There was a problem hiding this comment.
I've removed redundant tests here 6ff5772
| // saveDOTFile saves a DOT representation of the graph to a file | ||
| func saveDOTFile(t *testing.T, graph *unit.Graph[Status, *Unit], filename string) { | ||
| outputDir := createTestOutputDir(t) | ||
| dot, err := graph.ToDOT(filename) | ||
| if err != nil { | ||
| t.Logf("Failed to generate DOT for %s: %v", filename, err) | ||
| return | ||
| } | ||
|
|
||
| dotPath := filepath.Join(outputDir, filename+".dot") | ||
| err = os.WriteFile(dotPath, []byte(dot), 0o600) | ||
| require.NoError(t, err, "failed to write DOT file") | ||
| t.Logf("Saved DOT file: %s", dotPath) | ||
| } |
There was a problem hiding this comment.
My intention was for these files to be visible in a predictable place after the tests run. But I'm not sure why I preferred that to having them in a temp dir for during test debugging. I think your suggestion is a better approach.
Applied in c528868
| expected, err := os.ReadFile(goldenFile) | ||
| require.NoError(t, err, "read golden file, run \"make gen/golden-files\" and commit the changes") | ||
|
|
||
| require.Equal(t, string(expected), dot, "golden file mismatch (-want +got): %s, run \"make gen/golden-files\", verify and commit the changes", goldenFile) |
There was a problem hiding this comment.
Use cmp.Diff for a nicer diff output.
| // Check for forward edge | ||
| vertices := graph.GetForwardAdjacentVertices(unit1) | ||
| require.Len(t, vertices, 2) | ||
| // Unit 1 depends on the completion of Unit2 | ||
| require.Contains(t, vertices, DependencyEdge{ | ||
| From: unit1, | ||
| To: unit2, | ||
| Edge: StatusCompleted, | ||
| }) | ||
| // Unit 1 depends on the start of Unit3 | ||
| require.Contains(t, vertices, DependencyEdge{ | ||
| From: unit1, | ||
| To: unit3, | ||
| Edge: StatusStarted, | ||
| }) | ||
|
|
||
| // Check for reverse edges | ||
| unit2ReverseEdges := graph.GetReverseAdjacentVertices(unit2) | ||
| require.Len(t, unit2ReverseEdges, 1) | ||
| // Unit 2 must be completed before Unit 1 can start | ||
| require.Contains(t, unit2ReverseEdges, DependencyEdge{ | ||
| From: unit1, | ||
| To: unit2, | ||
| Edge: StatusCompleted, | ||
| }) | ||
|
|
||
| unit3ReverseEdges := graph.GetReverseAdjacentVertices(unit3) | ||
| require.Len(t, unit3ReverseEdges, 1) | ||
| // Unit 3 must be started before Unit 1 can complete | ||
| require.Contains(t, unit3ReverseEdges, DependencyEdge{ | ||
| From: unit1, | ||
| To: unit3, | ||
| Edge: StatusStarted, | ||
| }) | ||
|
|
||
| // Assert on the DOT representation using golden file | ||
| assertDOTGraph(t, graph, "ForwardAndReverseEdges") | ||
| }) |
There was a problem hiding this comment.
Could this be converted to a table-driven test?
There was a problem hiding this comment.
If done, the table driven test could also assert post-execution that there was a table test for each golden file in the testdata. Leaving unused ones behind in a refactor could be confusing.
There was a problem hiding this comment.
Done in 813c783. They aren't good table tests. The tests aren't really well suited to be table tests in their current form. But we have accomplished an assertion to check that every test case has a golden file and matches it.
Once I implement proper serialization they will look better as table tests
| } | ||
| }) | ||
|
|
||
| t.Run("StressTest", func(t *testing.T) { |
There was a problem hiding this comment.
Would this be better off as a benchmark instead?
There was a problem hiding this comment.
I think so too, stress tests shouldn't be run as part of the regular test suite, making benchmark a good fit.
There was a problem hiding this comment.
I've turned this into a benchmark in 6ff5772
| graph := &unit.Graph[Status, *Unit]{} | ||
| unit1 := &Unit{Name: "unit1", Status: StatusPending} | ||
| unit2 := &Unit{Name: "unit2", Status: StatusPending} | ||
| unit3 := &Unit{Name: "unit3", Status: StatusPending} | ||
| err := graph.AddEdge(unit1, unit2, StatusCompleted) | ||
| require.NoError(t, err) | ||
| err = graph.AddEdge(unit1, unit3, StatusStarted) | ||
| require.NoError(t, err) |
There was a problem hiding this comment.
Would it make more sense to load the graph from the golden DOT instead? A testing pattern I find works well is
- Load serialized representation
- Make assertions about data structure
- Serialize data structure
- Assert no differences in before versus after ser/deser
There was a problem hiding this comment.
Do you mind if we skip this? It would work well, but the serialization is a little tricky given that marshalling to DOT doesn't store the metadata in the rest of this wrapper and so unmarshaling from a golden file doesn't provide a complete Graph.
I could write complete serialize() and deserialize() methods for the Graph, or I could try to get rid of the wrapper. Getting rid of the wrapper is premature in my opinion and it would be a better use of my time to implement the rest of this feature before I further optimise tests that already prove the functionality.
There was a problem hiding this comment.
We can skip it. Just a suggestion.
| g.mu.Lock() | ||
| defer g.mu.Unlock() | ||
|
|
||
| // Initialize the graph if it doesn't exist |
There was a problem hiding this comment.
| // Initialize the graph if it doesn't exist | |
| // Initialize the graph on first use. |
| g.nextID = 1 | ||
| } | ||
|
|
||
| // Get or create IDs for vertices |
There was a problem hiding this comment.
| // Get or create IDs for vertices |
I'd suggest removing all low-value comments. Look especially close at comments that only explain what, not why.
To me the comment and code are saying the exact same thing: getOrCreateVertexID.
| ) | ||
|
|
||
| // Test types for thread safety testing. | ||
| // values in production might differ from these test values. |
There was a problem hiding this comment.
This seems a bit vague and premature if we don't even know what production values are going to be. Perhaps name the type testStatus to make the intent clear?
| ) | ||
|
|
||
| // Unit is a test type for the graph. | ||
| // Production types might be more complex. |
There was a problem hiding this comment.
This also seems a bit vague and doesn't add value, I'd suggest taking a close look at all test comments to remove any superfluous ones.
Consider naming testUnit (like above).
| // createTestOutputDir creates a directory for test outputs. | ||
| // This directory should be listed in the .gitignore file as: | ||
| // test-output/ | ||
| func createTestOutputDir(t *testing.T) string { |
There was a problem hiding this comment.
Could be replaced by t.TempDir().
| // The next ID to assign to a vertex. | ||
| nextID int64 | ||
| // Store edge types by "fromID->toID" key. This is used to lookup the edge type for a given edge. | ||
| edgeTypes map[string]EdgeType |
There was a problem hiding this comment.
Suggestion: While not typically enforced for unexported fields/types. Go comments are typically written starting with the name, example "edgeTypes are indexed by "fromID->toID" and can be used to ...".
| var UpdateGoldenFiles = flag.Bool("update", false, "update .golden files") | ||
|
|
||
| // assertDOTGraph asserts that the graph's DOT representation matches the golden file | ||
| func assertDOTGraph(t *testing.T, graph *unit.Graph[Status, *Unit], goldenName string) { |
There was a problem hiding this comment.
Given the assert/require nomenclature, this function requires rather than asserts.
| // Check for forward edge | ||
| vertices := graph.GetForwardAdjacentVertices(unit1) | ||
| require.Len(t, vertices, 2) | ||
| // Unit 1 depends on the completion of Unit2 | ||
| require.Contains(t, vertices, DependencyEdge{ | ||
| From: unit1, | ||
| To: unit2, | ||
| Edge: StatusCompleted, | ||
| }) | ||
| // Unit 1 depends on the start of Unit3 | ||
| require.Contains(t, vertices, DependencyEdge{ | ||
| From: unit1, | ||
| To: unit3, | ||
| Edge: StatusStarted, | ||
| }) | ||
|
|
||
| // Check for reverse edges | ||
| unit2ReverseEdges := graph.GetReverseAdjacentVertices(unit2) | ||
| require.Len(t, unit2ReverseEdges, 1) | ||
| // Unit 2 must be completed before Unit 1 can start | ||
| require.Contains(t, unit2ReverseEdges, DependencyEdge{ | ||
| From: unit1, | ||
| To: unit2, | ||
| Edge: StatusCompleted, | ||
| }) | ||
|
|
||
| unit3ReverseEdges := graph.GetReverseAdjacentVertices(unit3) | ||
| require.Len(t, unit3ReverseEdges, 1) | ||
| // Unit 3 must be started before Unit 1 can complete | ||
| require.Contains(t, unit3ReverseEdges, DependencyEdge{ | ||
| From: unit1, | ||
| To: unit3, | ||
| Edge: StatusStarted, | ||
| }) | ||
|
|
||
| // Assert on the DOT representation using golden file | ||
| assertDOTGraph(t, graph, "ForwardAndReverseEdges") | ||
| }) |
There was a problem hiding this comment.
If done, the table driven test could also assert post-execution that there was a table test for each golden file in the testdata. Leaving unused ones behind in a refactor could be confusing.
|
|
||
| graph := &unit.Graph[Status, *Unit]{} | ||
| var wg sync.WaitGroup | ||
| const numGoroutines = 100 |
There was a problem hiding this comment.
These are a bit extreme, I think perhaps 10 should suffice.
It's also a good idea to increase the chance of collisions e.g. via:
sync := make(chan struct{})
for range 10; go func() {
// prepare ...
<-sync
// work ...
}
close(sync)| to := &Unit{Name: fmt.Sprintf("unit-%d-%d", goroutineID, j+1), Status: StatusPending} | ||
| err := graph.AddEdge(from, to, StatusCompleted) | ||
|
|
||
| mu.Lock() |
There was a problem hiding this comment.
Avoid synchronizing goroutines when you're trying to create collisions. Keep errors local until done, then store them.
(This and above is applicable to the other concurrency tests as well.)
There was a problem hiding this comment.
@SasSwart I might be missing a bit of context here, but I’d like to better understand where this is heading.
From what I can see, this PR introduces a wrapper around the gonum library, and since gonum already provides a rich graph API, I’m wondering what additional value we’re aiming to add here. Have you written down the next implementation steps somewhere? It’d be great to make sure we’re aligned on the direction before going too deep.
The next step is to add reactivity to the graph, such that changes in the status of one vertex will propagate and allow other vertices to change their own statuses.
That makes sense conceptually, though I’m curious whether the full async trigger mechanism is necessary, especially given that users will likely only have 3–5 scripts running on average. Would a simpler orchestrator that periodically checks the status of running scripts and reacts accordingly be enough to achieve the same result, but with less moving parts?
Also, did we explore any existing task orchestration libraries before starting this implementation? For example:
- goyek/goyek
- Roemer/gotaskr
- go-task/task
(though I’m not sure if it offers a usable library API)
I’m mostly trying to understand whether this design solves something that current tools can’t, or if we might be able to simplify by leaning on existing solutions.
|
@mtojek The wrapper is not around gonum specifically. Rather, it is meant as a facade that allows us to swap out the underlying implementation if we need to. I put it behind a wrapper precisely because if we ever want to switch it out with any of the alternatives you've mentioned, that will be easy. This has already shown its worth when I needed to switch from a POC hand rolled implementation to gonum and the facade made that simpler to do. Right now, second guessing gonum is not the most productive way to ship this quickly. We have something that works for the graphing aspect. The remaining work is to implement:
I estimate that I can get reactivity implemented in a day, and plumbing in 1.5 days. I have half a day left after my flight today, then I have tomorrow and Friday. |
| if g.gonumGraph == nil { | ||
| g.gonumGraph = simple.NewDirectedGraph() | ||
| g.vertexToID = make(map[VertexType]int64) | ||
| g.idToVertex = make(map[int64]VertexType) | ||
| g.edgeTypes = make(map[string]EdgeType) | ||
| g.nextID = 1 | ||
| } |
There was a problem hiding this comment.
It looks like this is the only place where we do this "lazy" init. Is the reasoning here to allow a zero value of a Graph to be useful?
Alternatives:
- Add a constructor
- Extract this to e.g.
initGraph()and ensure this gets called on every function call
There was a problem hiding this comment.
we do this because @mafredri didn't want a constructor 😅
There was a problem hiding this comment.
I'm down with not having a constructor, but it just seems like a potential foot-gun if this is the only place where we check if g.gonumGraph is not nil.
| operationCount := 0 | ||
|
|
||
| for operationCount < 50 { | ||
| operation := float32(randInt(100)) / 100.0 |
There was a problem hiding this comment.
I didn't read closely enough before, but since this is the only place you're using randInt here, might I suggest:
| operation := float32(randInt(100)) / 100.0 | |
| operation := must(cryptorand.Float32()) |
(Or just use cryptorand.Float64 as it's already defined)
In cryptorand/numbers.go:
// Float32 returns a random number in [0.0,1.0) as a float32.
func Float32() (float32, error) {
rng, cs := secureRand()
return rng.Float32(), cs.err
}
This utility function also gets sprinkled a lot throughout the codebase:
func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
}
| const numWriters = 50 | ||
| const numReaders = 100 | ||
| const operationsPerWriter = 1000 | ||
| const operationsPerReader = 2000 |
There was a problem hiding this comment.
Do we really need so many ops/readers/writers here? I think we could probably get similar enough assurances with the benchmark while running with reduced concurrency here.
Relates to coder/internal#1093
This is the first of N pull requests to allow coder script ordering.
It introduces what is for now dead code, but paves the way for various interfaces that allow coder scripts and other processes to depend on one another via CLI commands and terraform configurations.
The next step is to add reactivity to the graph, such that changes in the status of one vertex will propagate and allow other vertices to change their own statuses.
Concurrency and stress testing yield the following:
CPU Profile:

Mem Profile:

Predictably, lock contention and memory allocation are the largest components of this system under stress. Nothing seems untoward.