eBPF: Handling events in Userspace
Objective
- to understand how eBPF-events are being handled in userspace in various open-source projects
- to learn their approach for handling massive amount of events
Note
Currently only tetragon project is covered.
Reasoning
Once eBPF-events are written by the kernel-space hook in ringBuffer or perfBuffer
, they become available for consumption from user-space.
Following steps are usually performed in user-space code;
- Preparation of ringBuffer / perfBuffer reader
- Reading of records from buffer
- Processing of raw-samples
Tetragon
Preparation
PerfEvent reader is prepared from pinned perf-map. [Source]
snippet.go |
---|
| ...
pinOpts := ebpf.LoadPinOptions{}
perfMap, err := ebpf.LoadPinnedMap(k.PerfConfig.MapName, &pinOpts)
if err != nil {
return fmt.Errorf("opening pinned map '%s' failed: %w", k.PerfConfig.MapName, err)
}
defer perfMap.Close()
rbSize := k.getRBSize(int(perfMap.MaxEntries()))
perfReader, err := perf.NewReader(perfMap, rbSize)
...
|
Reading
A goroutine is launched;
- to read records from perfReader that adds them to
eventsQueue
(a buffered-channel).
[Source]
snippet2.go |
---|
| ...
// We spawn go routine to read and process perf events,
// connected with main app through eventsQueue channel.
eventsQueue := make(chan *perf.Record, k.getRBQueueSize())
// Listeners are ready and about to start reading from perf reader, tell
// user everything is ready.
k.log.Info("Listening for events...")
// Start reading records from the perf array. Reads until the reader is closed.
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
go func() {
defer wg.Done()
for stopCtx.Err() == nil {
record, err := perfReader.Read()
if err != nil {
// NOTE(JM and Djalal): count and log errors while excluding the stopping context
if stopCtx.Err() == nil {
RingbufErrors.Inc()
errorCnt := getCounterValue(RingbufErrors)
k.log.Warn("Reading bpf events failed", "errors", errorCnt, logfields.Error, err)
}
} else {
if len(record.RawSample) > 0 {
select {
case eventsQueue <- &record:
default:
// eventsQueue channel is full, drop the event
queueLost.Inc()
}
RingbufReceived.Inc()
}
if record.LostSamples > 0 {
RingbufLost.Add(float64(record.LostSamples))
}
}
}
}()
...
|
Another goroutine is launched;
- for reading records from eventsQueue, where they are passed to
receiveEvent()
for processing
[Source]
snippet3.go |
---|
| ...
// Start processing records from perf.
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event := <-eventsQueue:
k.receiveEvent(event.RawSample)
queueReceived.Inc()
case <-stopCtx.Done():
k.log.Info("Listening for events completed.", logfields.Error, stopCtx.Err())
k.log.Debug(fmt.Sprintf("Unprocessed events in RB queue: %d", len(eventsQueue)))
return
}
}
}()
...
|
Processing
On calling receiveEvent()
- it converts raw-bytes to
events
by passing data to HandlePerfData()
- send events to various listeners
[Source]
snippet4.go |
---|
| func (k *Observer) receiveEvent(data []byte) {
var timer time.Time
if option.Config.EnableMsgHandlingLatency {
timer = time.Now()
}
op, events, err := HandlePerfData(data)
opcodemetrics.OpTotalInc(ops.OpCode(op))
if err != nil {
errormetrics.HandlerErrorsInc(ops.OpCode(op), err.kind)
switch err.kind {
case errormetrics.HandlePerfUnknownOp:
k.log.Debug("unknown opcode ignored", "opcode", err.opcode)
default:
k.log.Debug("error occurred in event handler", "opcode", err.opcode, logfields.Error, err)
}
}
for _, event := range events {
k.observerListeners(event)
}
if option.Config.EnableMsgHandlingLatency {
opcodemetrics.LatencyStats.WithLabelValues(strconv.FormatUint(uint64(op), 10)).Observe(float64(time.Since(timer).Microseconds()))
}
}
|
On calling HandlePerfData()
;
- it tries to find event-specific handler using
first-byte
- calls the handler for parsing raw-bytes
[Source]
snippet5.go |
---|
| func HandlePerfData(data []byte) (byte, []Event, *HandlePerfError) {
op := data[0]
r := bytes.NewReader(data)
// These ops handlers are registered by RegisterEventHandlerAtInit().
handler, ok := eventHandler[op]
if !ok {
return op, nil, &HandlePerfError{
kind: errormetrics.HandlePerfUnknownOp,
err: fmt.Errorf("unknown op: %d", op),
opcode: op,
}
}
events, err := handler(r)
if err != nil {
return op, events, &HandlePerfError{
kind: errormetrics.HandlePerfHandlerError,
err: fmt.Errorf("handler for op %d failed: %w", op, err),
opcode: op,
}
}
return op, events, nil
}
|
References