@@ -40,14 +40,18 @@ const (
4040// If false, log lines are silently discarded.
4141const overwriteOnFull = true
4242
43+ const waitOnFull = true
44+
45+ const waitPeriodOnFull = 3 * time .Millisecond
46+
4347// buffer: (pageSize / slotSize) slots * memSlotSize bytes each.
44- // On a 4 KiB page: 4096 / 800 = 5 slots => memBufSize = 8 + 5*800 = 4008 B ≈ 1 page.
48+ // On a 4 KiB page: 4096 / 800 = 5 slots => memBufSize = 8 + 5*800 = 4008 B ~ 1 page.
4549// memBufSize is the total region passed to Ftruncate/Mmap.
4650var (
47- nofpages = 8
48- memSlotSize = charsPerLine // 800 bytes per slot
49- memNumSlots = nofpages * unix .Getpagesize () / memSlotSize // slots that fit in one page
50- memBufSize = memHdrSize + memNumSlots * memSlotSize // ≈ 1 page
51+ nofpages = 16
52+ memSlotSize = charsPerLine // 800 bytes per slot
53+ memNumSlots = nofpages * unix .Getpagesize ()/ memSlotSize // slots that fit in one page
54+ memBufSize = memHdrSize + memNumSlots * memSlotSize // ~ 1 page
5155)
5256
5357// mcbuf is a fixed-slot shared-memory region backed by a memfd.
6670//
6771// The Go writer owns writes; after consume returns the Go side resets the buffer.
6872type mcbuf struct {
69- fd * os.File // memfd; Go retains ownership so the GC keeps the fd open
73+ fd * os.File // memfd; Go retains ownership
7074 data []byte // mmap region (PROT_READ|PROT_WRITE, MAP_SHARED)
7175}
7276
@@ -116,6 +120,8 @@ type Memconsole struct {
116120 draining [2 ]atomic.Bool // true while Drain() is executing for bufs[i]
117121 drops atomic.Uint64 // dropped bytes
118122 consumed atomic.Uint64 // consumed bytes
123+ totwait atomic.Uint64 // wait count for drains to complete when full
124+ vainwait atomic.Uint64 // wait count for drains to complete that were not useful
119125 ticking atomic.Bool // true while a lazy ticker goroutine is running
120126 closed atomic.Bool
121127}
@@ -206,6 +212,7 @@ func (mc *Memconsole) write(lvl LogLevel, msg Logmsg) {
206212 return
207213 }
208214
215+ waited := false
209216 lpfx := lvl .s ()
210217 p := unsafe .StringData (msg )
211218 all := unsafe .Slice (p , len (msg ))
@@ -222,6 +229,14 @@ rollover:
222229 r := mc .reader
223230 go mc .consume (r , idx , logfd , start , end )
224231 } else { // the other buffer's consume is wip; cannot swap in
232+ if ! waited && waitOnFull {
233+ waited = true
234+ mc .totwait .Add (1 )
235+ time .Sleep (waitPeriodOnFull ) // back off and wait for the other buffer to be consumed
236+ goto rollover
237+ } else {
238+ mc .vainwait .Add (1 )
239+ }
225240 if overwriteOnFull { // ring-within-buffer: overwrite slots
226241 // TODO: write overwrite status to header?
227242 count := mc .slotIdx * memSlotSize
@@ -376,6 +391,8 @@ func (mc *Memconsole) Drops() uint64 { return mc.drops.Load() }
376391
377392func (mc * Memconsole ) Consumed () uint64 { return mc .consumed .Load () }
378393
394+ func (mc * Memconsole ) Waited () (uint64 , uint64 ) { return mc .totwait .Load (), mc .vainwait .Load () }
395+
379396// SetReader installs r as the consumer of the Memconsole's shared-memory
380397// buffers. Drain(fd, start, end) is called synchronously (but without
381398// mc.mu held) each time a buffer fills up or the periodic ticker fires;
0 commit comments