Skip to content
This repository was archived by the owner on Nov 16, 2023. It is now read-only.

Commit 8e41451

Browse files
authored
Support large scale Framework by LargeFrameworkCompression (#44)
1 parent 77ec4ab commit 8e41451

10 files changed

Lines changed: 210 additions & 28 deletions

File tree

doc/user-manual.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- [CompletionStatus](#CompletionStatus)
99
- [RetryPolicy](#RetryPolicy)
1010
- [FrameworkAttemptCompletionPolicy](#FrameworkAttemptCompletionPolicy)
11+
- [Large Scale Framework](#LargeScaleFramework)
1112
- [Framework and Pod History](#FrameworkPodHistory)
1213
- [Framework and Task State Machine](#FrameworkTaskStateMachine)
1314
- [Framework Consistency vs Availability](#FrameworkConsistencyAvailability)
@@ -203,7 +204,7 @@ Watch the change events of all Frameworks (in the specified FrameworkNamespace).
203204
[Container EnvironmentVariable](../pkg/apis/frameworkcontroller/v1/constants.go)
204205

205206
## <a name="PodFailureClassification">Pod Failure Classification</a>
206-
You can specify how to classify and summarize Pod failures by [PodFailureSpec](../pkg/apis/frameworkcontroller/v1/config.go).
207+
You can specify how to classify and summarize Pod failures by the [PodFailureSpec](../pkg/apis/frameworkcontroller/v1/config.go).
207208

208209
You can also directly leverage the [Default PodFailureSpec](../example/config/default/frameworkcontroller.yaml).
209210

@@ -370,8 +371,11 @@ Notes:
370371
</tbody>
371372
</table>
372373

374+
## <a name="LargeScaleFramework">Large Scale Framework</a>
375+
To safely run large scale Framework, i.e. the total task number in a single Framework is greater than 300, you just need to enable the [LargeFrameworkCompression](../pkg/apis/frameworkcontroller/v1/config.go). However, you may also need to decompress the Framework by yourself.
376+
373377
## <a name="FrameworkPodHistory">Framework and Pod History</a>
374-
By leveraging [LogObjectSnapshot](../pkg/apis/frameworkcontroller/v1/config.go), external systems, such as [Fluentd](https://www.fluentd.org) and [ElasticSearch](https://www.elastic.co/products/elasticsearch), can collect and process Framework and Pod history snapshots even if it was retried or deleted, such as persistence, metrics conversion, visualization, alerting, acting, analysis, etc.
378+
By leveraging the [LogObjectSnapshot](../pkg/apis/frameworkcontroller/v1/config.go), external systems, such as [Fluentd](https://www.fluentd.org) and [ElasticSearch](https://www.elastic.co/products/elasticsearch), can collect and process Framework and Pod history snapshots even if it was retried or deleted, such as persistence, metrics conversion, visualization, alerting, acting, analysis, etc.
375379

376380
## <a name="FrameworkTaskStateMachine">Framework and Task State Machine</a>
377381
### <a name="FrameworkStateMachine">Framework State Machine</a>

example/config/default/frameworkcontroller.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,16 @@
55

66
#kubeApiServerAddress: http://10.10.10.10:8080
77
#kubeConfigFilePath: ''
8+
89
#workerNumber: 20
910

11+
#largeFrameworkCompression: true
12+
13+
#frameworkCompletedRetainSec: 2592000
14+
15+
#frameworkMinRetryDelaySecForTransientConflictFailed: 60
16+
#frameworkMaxRetryDelaySecForTransientConflictFailed: 900
17+
1018
podFailureSpec:
1119
################################################################################
1220
# [-1199, -1000]: K8S issued failures

pkg/apis/frameworkcontroller/v1/config.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,27 @@ type Config struct {
5959
// Number of concurrent workers to process each different Frameworks
6060
WorkerNumber *int32 `yaml:"workerNumber"`
6161

62+
// Specify whether to compress some fields in the Framework object if they are too large.
63+
//
64+
// Currently, due to the etcd limitation, the max size of any object on ApiServer is 1.5 MB:
65+
// https://github.com/etcd-io/etcd/blob/master/Documentation/dev-guide/limit.md#request-size-limit
66+
// So, without this compression, FrameworkController can only support small scale Framework in the
67+
// worst case, i.e. the total task number in a single Framework is not greater than 300.
68+
//
69+
// With this compression, FrameworkController can generally support large scale Framework, i.e.
70+
// the total task number in a single Framework is not greater than 10000.
71+
// However, this requires all clients who read the Framework object directly, need to decompress
72+
// the compressed fields by themselves.
73+
//
74+
// How to decompress?
75+
// 1. The field name of a compressed field, has a suffix "Compressed" compared with the field
76+
// name of its corresponding raw field, such as TaskRoleStatusesCompressed is the compressed
77+
// field of TaskRoleStatuses.
78+
// 2. If the raw field is not null, just use the raw field, otherwise fallback to the compressed
79+
// field, by base64 decoding, gzip decompression and json unmarshal.
80+
// 3. Currently, only field TaskRoleStatuses will be compressed if it is too large.
81+
LargeFrameworkCompression *bool `yaml:"largeFrameworkCompression"`
82+
6283
// Check interval and timeout to expect the created CRD to be in Established condition.
6384
CRDEstablishedCheckIntervalSec *int64 `yaml:"crdEstablishedCheckIntervalSec"`
6485
CRDEstablishedCheckTimeoutSec *int64 `yaml:"crdEstablishedCheckTimeoutSec"`
@@ -196,6 +217,9 @@ func NewConfig() *Config {
196217
if c.WorkerNumber == nil {
197218
c.WorkerNumber = common.PtrInt32(10)
198219
}
220+
if c.LargeFrameworkCompression == nil {
221+
c.LargeFrameworkCompression = common.PtrBool(false)
222+
}
199223
if c.CRDEstablishedCheckIntervalSec == nil {
200224
c.CRDEstablishedCheckIntervalSec = common.PtrInt64(1)
201225
}

pkg/apis/frameworkcontroller/v1/constants.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ const (
4242
PodKind = "Pod"
4343
ObjectUIDFieldPath = "metadata.uid"
4444

45-
ConfigFilePath = "./frameworkcontroller.yaml"
46-
UnlimitedValue = -1
47-
ExtendedUnlimitedValue = -2
45+
ConfigFilePath = "./frameworkcontroller.yaml"
46+
UnlimitedValue = -1
47+
ExtendedUnlimitedValue = -2
48+
LargeFrameworkCompressionMinBytes = 100 * 1024
4849

4950
// For all managed objects
5051
// Predefined Annotations

pkg/apis/frameworkcontroller/v1/funcs.go

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -432,15 +432,16 @@ func (f *Framework) NewFrameworkStatus() *FrameworkStatus {
432432
func (f *Framework) NewFrameworkAttemptStatus(
433433
frameworkAttemptID int32) FrameworkAttemptStatus {
434434
return FrameworkAttemptStatus{
435-
ID: frameworkAttemptID,
436-
StartTime: meta.Now(),
437-
RunTime: nil,
438-
CompletionTime: nil,
439-
InstanceUID: nil,
440-
ConfigMapName: GetConfigMapName(f.Name),
441-
ConfigMapUID: nil,
442-
CompletionStatus: nil,
443-
TaskRoleStatuses: f.NewTaskRoleStatuses(),
435+
ID: frameworkAttemptID,
436+
StartTime: meta.Now(),
437+
RunTime: nil,
438+
CompletionTime: nil,
439+
InstanceUID: nil,
440+
ConfigMapName: GetConfigMapName(f.Name),
441+
ConfigMapUID: nil,
442+
CompletionStatus: nil,
443+
TaskRoleStatuses: f.NewTaskRoleStatuses(),
444+
TaskRoleStatusesCompressed: nil,
444445
}
445446
}
446447

@@ -615,3 +616,55 @@ func (f *Framework) TransitionTaskState(
615616
"[%v][%v][%v]: Transitioned Task from [%v] to [%v]",
616617
f.Key(), taskRoleName, taskIndex, srcState, dstState)
617618
}
619+
620+
func (f *Framework) Compress() error {
621+
if f.Status == nil {
622+
return nil
623+
}
624+
625+
if f.TaskRoleStatuses() != nil {
626+
f.Status.AttemptStatus.TaskRoleStatusesCompressed = nil
627+
628+
jsonTaskRoleStatus := common.ToJson(f.TaskRoleStatuses())
629+
if len(jsonTaskRoleStatus) >= LargeFrameworkCompressionMinBytes {
630+
compressedTaskRoleStatus, err := common.Compress(jsonTaskRoleStatus)
631+
if err != nil {
632+
return err
633+
}
634+
635+
f.Status.AttemptStatus.TaskRoleStatusesCompressed = compressedTaskRoleStatus
636+
f.Status.AttemptStatus.TaskRoleStatuses = nil
637+
return nil
638+
}
639+
}
640+
641+
return nil
642+
}
643+
644+
func (f *Framework) Decompress() error {
645+
if f.Status == nil {
646+
return nil
647+
}
648+
649+
if f.TaskRoleStatuses() != nil {
650+
f.Status.AttemptStatus.TaskRoleStatusesCompressed = nil
651+
return nil
652+
}
653+
654+
compressedTaskRoleStatus := f.Status.AttemptStatus.TaskRoleStatusesCompressed
655+
if compressedTaskRoleStatus != nil {
656+
jsonTaskRoleStatus, err := common.Decompress(compressedTaskRoleStatus)
657+
if err != nil {
658+
return err
659+
}
660+
661+
rawTaskRoleStatus := []*TaskRoleStatus{}
662+
common.FromJson(jsonTaskRoleStatus, &rawTaskRoleStatus)
663+
664+
f.Status.AttemptStatus.TaskRoleStatuses = rawTaskRoleStatus
665+
f.Status.AttemptStatus.TaskRoleStatusesCompressed = nil
666+
return nil
667+
}
668+
669+
return nil
670+
}

pkg/apis/frameworkcontroller/v1/types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,10 @@ type FrameworkAttemptStatus struct {
270270
// It will never be changed during the whole lifetime of a specific Framework.
271271
ConfigMapName string `json:"configMapName"`
272272
// ConfigMapUID can also universally locate the FrameworkAttemptInstance.
273-
ConfigMapUID *types.UID `json:"configMapUID"`
274-
CompletionStatus *FrameworkAttemptCompletionStatus `json:"completionStatus"`
275-
TaskRoleStatuses []*TaskRoleStatus `json:"taskRoleStatuses"`
273+
ConfigMapUID *types.UID `json:"configMapUID"`
274+
CompletionStatus *FrameworkAttemptCompletionStatus `json:"completionStatus"`
275+
TaskRoleStatuses []*TaskRoleStatus `json:"taskRoleStatuses"`
276+
TaskRoleStatusesCompressed []byte `json:"taskRoleStatusesCompressed,omitempty"`
276277
}
277278

278279
type TaskRoleStatus struct {

pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/barrier/barrier.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,16 @@ func (b *FrameworkBarrier) Run() {
239239
Get(b.bConfig.FrameworkName, meta.GetOptions{})
240240

241241
if err == nil {
242-
isPassed = isBarrierPassed(f)
243-
return isPassed, nil
242+
err = f.Decompress()
243+
if err == nil {
244+
isPassed = isBarrierPassed(f)
245+
return isPassed, nil
246+
} else {
247+
klog.Warningf("Failed to decompress Framework object: %v", err)
248+
// Unknown Error: Poll Until Timeout
249+
isPermanentErr = false
250+
return false, nil
251+
}
244252
} else {
245253
klog.Warningf("Failed to get Framework object from ApiServer: %v", err)
246254
if apiErrors.IsNotFound(err) {
@@ -256,22 +264,23 @@ func (b *FrameworkBarrier) Run() {
256264
})
257265

258266
if isPassed {
259-
klog.Infof("BarrierPassed: " +
267+
klog.Infof("BarrierSucceeded: " +
260268
"All Tasks are ready with not nil PodIP.")
261269
dumpFramework(f)
262270
generateInjector(f)
263271
exit(ci.CompletionCodeSucceeded)
264272
} else {
265273
if err == nil {
266-
klog.Errorf("BarrierNotPassed: " +
274+
klog.Errorf("BarrierTransientConflictFailed: " +
267275
"Timeout to wait all Tasks are ready with not nil PodIP.")
268276
exit(ci.CompletionCodeContainerTransientConflictFailed)
269277
} else {
270-
klog.Errorf("Failed to get Framework object from ApiServer: %v", err)
271278
if isPermanentErr {
279+
klog.Errorf("BarrierPermanentFailed: %v", err)
272280
exit(ci.CompletionCodeContainerPermanentFailed)
273281
} else {
274282
// May also timeout, but still treat as Unknown Error
283+
klog.Errorf("BarrierUnknownFailed: %v", err)
275284
exit(ci.CompletionCode(1))
276285
}
277286
}

pkg/common/utils.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
package common
2424

2525
import (
26+
"bytes"
27+
"compress/gzip"
2628
"encoding/json"
2729
"flag"
2830
"fmt"
@@ -202,3 +204,38 @@ func FromJson(jsonStr string, objAddr interface{}) {
202204
panic(fmt.Errorf("Failed to unmarshal JSON %#v to Object: %v", jsonStr, err))
203205
}
204206
}
207+
208+
func Compress(rawStr string) ([]byte, error) {
209+
compressedBuffer := &bytes.Buffer{}
210+
compressor := gzip.NewWriter(compressedBuffer)
211+
if _, err := compressor.Write([]byte(rawStr)); err != nil {
212+
return nil, fmt.Errorf(
213+
"Failed to compress %#v when writing: %v",
214+
rawStr, err)
215+
} else {
216+
if err := compressor.Close(); err != nil {
217+
return nil, fmt.Errorf(
218+
"Failed to compress %#v when closing: %v",
219+
rawStr, err)
220+
} else {
221+
return compressedBuffer.Bytes(), nil
222+
}
223+
}
224+
}
225+
226+
func Decompress(compressedBytes []byte) (string, error) {
227+
compressedReader := bytes.NewReader(compressedBytes)
228+
if decompressor, err := gzip.NewReader(compressedReader); err != nil {
229+
return "", fmt.Errorf(
230+
"Failed to decompress %#v when initializing: %v",
231+
compressedBytes, err)
232+
} else {
233+
if rawBytes, err := ioutil.ReadAll(decompressor); err != nil {
234+
return "", fmt.Errorf(
235+
"Failed to decompress %#v when reading: %v",
236+
compressedBytes, err)
237+
} else {
238+
return string(rawBytes), nil
239+
}
240+
}
241+
}

pkg/controller/controller.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -541,32 +541,39 @@ func (c *FrameworkController) syncFramework(key string) (returnedErr error) {
541541
// Ensure the expected Framework.Status is the same as the remote one
542542
// before sync.
543543
if !expected.remoteSynced {
544+
c.compressFramework(f)
544545
updateErr := c.updateRemoteFrameworkStatus(f)
546+
c.updateExpectedFrameworkStatusInfo(f.Key(), f.Status, updateErr == nil)
547+
545548
if updateErr != nil {
546549
return updateErr
547550
}
548-
c.updateExpectedFrameworkStatusInfo(f.Key(), f.Status, true)
549551
}
550552
}
551553

552554
// At this point, f.Status is the same as the expected and remote
553555
// Framework.Status, so it is ready to sync against f.Spec and other
554556
// related objects.
555-
errs := []error{}
556-
remoteF := f.DeepCopy()
557+
decompressErr := c.decompressFramework(f)
558+
if decompressErr != nil {
559+
return decompressErr
560+
}
561+
remoteRawF := f.DeepCopy()
557562

563+
errs := []error{}
558564
syncErr := c.syncFrameworkStatus(f)
559565
errs = append(errs, syncErr)
560566

561-
if !reflect.DeepEqual(remoteF.Status, f.Status) {
567+
if !reflect.DeepEqual(remoteRawF.Status, f.Status) {
562568
// Always update the expected and remote Framework.Status even if sync
563569
// error, since f.Status should never be corrupted due to any Platform
564570
// Transient Error, so no need to rollback to the one before sync, and
565571
// no need to DeepCopy between f.Status and the expected one.
572+
c.compressFramework(f)
566573
updateErr := c.updateRemoteFrameworkStatus(f)
567-
errs = append(errs, updateErr)
568-
569574
c.updateExpectedFrameworkStatusInfo(f.Key(), f.Status, updateErr == nil)
575+
576+
errs = append(errs, updateErr)
570577
} else {
571578
klog.Infof(logPfx +
572579
"Skip to update the expected and remote Framework.Status since " +
@@ -1806,6 +1813,34 @@ func (c *FrameworkController) completeFrameworkAttempt(
18061813
}
18071814
}
18081815

1816+
// Best effort to compress and no need to requeue if failed, since the
1817+
// updateRemoteFrameworkStatus may still succeed if compress failed.
1818+
func (c *FrameworkController) compressFramework(f *ci.Framework) {
1819+
if *c.cConfig.LargeFrameworkCompression {
1820+
logPfx := fmt.Sprintf("[%v]: compressFramework: ", f.Key())
1821+
klog.Infof(logPfx + "Started")
1822+
defer func() { klog.Infof(logPfx + "Completed") }()
1823+
1824+
err := f.Compress()
1825+
if err != nil {
1826+
klog.Warningf(logPfx+"Failed: %v", err)
1827+
}
1828+
}
1829+
}
1830+
1831+
func (c *FrameworkController) decompressFramework(f *ci.Framework) error {
1832+
logPfx := fmt.Sprintf("[%v]: decompressFramework: ", f.Key())
1833+
klog.Infof(logPfx + "Started")
1834+
defer func() { klog.Infof(logPfx + "Completed") }()
1835+
1836+
err := f.Decompress()
1837+
if err != nil {
1838+
return fmt.Errorf(logPfx+"Failed: %v", err)
1839+
} else {
1840+
return nil
1841+
}
1842+
}
1843+
18091844
func (c *FrameworkController) updateRemoteFrameworkStatus(f *ci.Framework) error {
18101845
logPfx := fmt.Sprintf("[%v]: updateRemoteFrameworkStatus: ", f.Key())
18111846
klog.Infof(logPfx + "Started")

0 commit comments

Comments
 (0)