Skip to content

Commit 3eb5b18

Browse files
committed
log: incr slots; add a nominal retry-after-wait on full
1 parent bfbefd8 commit 3eb5b18

2 files changed

Lines changed: 30 additions & 12 deletions

File tree

intra/log/memconsole.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,18 @@ const (
4040
// If false, log lines are silently discarded.
4141
const overwriteOnFull = true
4242

43+
const waitOnFull = true
44+
45+
const waitPeriodOnFull = 3 * time.Millisecond
46+
4347
// buffer: (pageSize / slotSize) slots * memSlotSize bytes each.
44-
// On a 4 KiB page: 4096 / 800 = 5 slots => memBufSize = 8 + 5*800 = 4008 B 1 page.
48+
// On a 4 KiB page: 4096 / 800 = 5 slots => memBufSize = 8 + 5*800 = 4008 B ~ 1 page.
4549
// memBufSize is the total region passed to Ftruncate/Mmap.
4650
var (
47-
nofpages = 8
48-
memSlotSize = charsPerLine // 800 bytes per slot
49-
memNumSlots = nofpages * unix.Getpagesize() / memSlotSize // slots that fit in one page
50-
memBufSize = memHdrSize + memNumSlots*memSlotSize // 1 page
51+
nofpages = 16
52+
memSlotSize = charsPerLine // 800 bytes per slot
53+
memNumSlots = nofpages * unix.Getpagesize()/memSlotSize // slots that fit in one page
54+
memBufSize = memHdrSize + memNumSlots*memSlotSize // ~1 page
5155
)
5256

5357
// mcbuf is a fixed-slot shared-memory region backed by a memfd.
@@ -66,7 +70,7 @@ var (
6670
//
6771
// The Go writer owns writes; after consume returns the Go side resets the buffer.
6872
type mcbuf struct {
69-
fd *os.File // memfd; Go retains ownership so the GC keeps the fd open
73+
fd *os.File // memfd; Go retains ownership
7074
data []byte // mmap region (PROT_READ|PROT_WRITE, MAP_SHARED)
7175
}
7276

@@ -116,6 +120,8 @@ type Memconsole struct {
116120
draining [2]atomic.Bool // true while Drain() is executing for bufs[i]
117121
drops atomic.Uint64 // dropped bytes
118122
consumed atomic.Uint64 // consumed bytes
123+
totwait atomic.Uint64 // wait count for drains to complete when full
124+
vainwait atomic.Uint64 // wait count for drains to complete that were not useful
119125
ticking atomic.Bool // true while a lazy ticker goroutine is running
120126
closed atomic.Bool
121127
}
@@ -206,6 +212,7 @@ func (mc *Memconsole) write(lvl LogLevel, msg Logmsg) {
206212
return
207213
}
208214

215+
waited := false
209216
lpfx := lvl.s()
210217
p := unsafe.StringData(msg)
211218
all := unsafe.Slice(p, len(msg))
@@ -222,6 +229,14 @@ rollover:
222229
r := mc.reader
223230
go mc.consume(r, idx, logfd, start, end)
224231
} else { // the other buffer's consume is wip; cannot swap in
232+
if !waited && waitOnFull {
233+
waited = true
234+
mc.totwait.Add(1)
235+
time.Sleep(waitPeriodOnFull) // back off and wait for the other buffer to be consumed
236+
goto rollover
237+
} else {
238+
mc.vainwait.Add(1)
239+
}
225240
if overwriteOnFull { // ring-within-buffer: overwrite slots
226241
// TODO: write overwrite status to header?
227242
count := mc.slotIdx * memSlotSize
@@ -376,6 +391,8 @@ func (mc *Memconsole) Drops() uint64 { return mc.drops.Load() }
376391

377392
func (mc *Memconsole) Consumed() uint64 { return mc.consumed.Load() }
378393

394+
func (mc *Memconsole) Waited() (uint64, uint64) { return mc.totwait.Load(), mc.vainwait.Load() }
395+
379396
// SetReader installs r as the consumer of the Memconsole's shared-memory
380397
// buffers. Drain(fd, start, end) is called synchronously (but without
381398
// mc.mu held) each time a buffer fills up or the periodic ticker fires;

intra/log/memconsole_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"sync/atomic"
1616
"testing"
1717
"time"
18-
"unsafe"
1918

2019
"golang.org/x/sys/unix"
2120
)
@@ -139,9 +138,9 @@ func TestMemconsoleWriteRead(t *testing.T) {
139138
t.Logf("[fd=%d off=%d] %s", fd, off, line)
140139
n++
141140
}*/
142-
t.Logf("---- drain: fd=%d start=%d end=%d n=%d (%s) %x", fd, start, end, n, s, &b[start])
143-
t.Log(unsafe.String(&b[start], n))
141+
// t.Log(unsafe.String(&b[start], n))
144142
got.Add(int64(n))
143+
t.Logf("---- drain: fd=%d start=%d end=%d n=%d (%s) %x", fd, start, end, n, s, &b[start])
145144
return n
146145
},
147146
})
@@ -156,7 +155,7 @@ func TestMemconsoleWriteRead(t *testing.T) {
156155
s := sb.String()
157156
nwritten.Add(int64(len(s)))
158157
mc.Log(INFO, Logmsg(s))
159-
time.Sleep(3 * time.Millisecond)
158+
//time.Sleep(1 * time.Millisecond)
160159
}
161160

162161
// Flush any partial buffer that has not filled up yet.
@@ -178,9 +177,11 @@ func TestMemconsoleWriteRead(t *testing.T) {
178177
written := nwritten.Load() // bytes submitted
179178
drops := mc.Drops() // overflow events; each loses memNumSlots slots
180179
cons := mc.Consumed()
180+
waited, vainwaited := mc.Waited()
181+
bufsize := mc.BufSize()
181182
want := written - int64(drops) // bytes expected to have been drained
182-
fmt.Printf("--- got %d / %d bytes (drops=%d overflow-events, ~%d bytes lost / cons: %d)\n",
183-
received, written, drops, want, cons)
183+
t.Logf("--- got %d / %d bytes (drops=%d overflow-events, ~%d bytes written %d%% / cons: %d); sz: %d; waited: %d (vain: %d)\n",
184+
received, written, drops, want, 100*drops/uint64(written), cons, bufsize, waited, vainwaited)
184185
if received < want {
185186
t.Errorf("expected %d bytes drained (written=%d drops=%d*%d), got %d / cons %d",
186187
want, written, drops, memNumSlots, received, cons)

0 commit comments

Comments
 (0)