Skip to content

eBPF: Handling events in Userspace

Objective

  • to understand how eBPF-events are being handled in userspace in various open-source projects
    • to learn their approach for handling massive amount of events
🚀 Featured in

Reasoning

Note

Snippets are take from cilium/tetragon and aquasecurity/tracee projects.

Once eBPF-events are written by the kernel-space hook in ringBuffer or perfBuffer, they become available for consumption from user-space.

Following steps are usually performed in user-space code;

  1. Preparation of ringBuffer / perfBuffer reader
  2. Reading of records from buffer
  3. Processing of raw-samples

Tetragon

Preparation

PerfEvent reader is prepared from pinned perf-map. [Source]

snippet.go
...
    pinOpts := ebpf.LoadPinOptions{}
    perfMap, err := ebpf.LoadPinnedMap(k.PerfConfig.MapName, &pinOpts)
    if err != nil {
        return fmt.Errorf("opening pinned map '%s' failed: %w", k.PerfConfig.MapName, err)
    }
    defer perfMap.Close()

    rbSize := k.getRBSize(int(perfMap.MaxEntries()))
    perfReader, err := perf.NewReader(perfMap, rbSize)

...

Reading

A goroutine is launched;

  • to read records from perfReader that adds them to eventsQueue (a buffered-channel).

[Source]

snippet2.go
...

    // We spawn go routine to read and process perf events,
    // connected with main app through eventsQueue channel.
    eventsQueue := make(chan *perf.Record, k.getRBQueueSize())


    // Listeners are ready and about to start reading from perf reader, tell
    // user everything is ready.
    k.log.Info("Listening for events...")


    // Start reading records from the perf array. Reads until the reader is closed.
    var wg sync.WaitGroup
    wg.Add(1)
    defer wg.Wait()
    go func() {
            defer wg.Done()
            for stopCtx.Err() == nil {
                    record, err := perfReader.Read()
                    if err != nil {
                            // NOTE(JM and Djalal): count and log errors while excluding the stopping context
                            if stopCtx.Err() == nil {
                                    RingbufErrors.Inc()
                                    errorCnt := getCounterValue(RingbufErrors)
                                    k.log.Warn("Reading bpf events failed", "errors", errorCnt, logfields.Error, err)
                            }
                    } else {
                            if len(record.RawSample) > 0 {
                                    select {
                                    case eventsQueue <- &record:
                                    default:
                                            // eventsQueue channel is full, drop the event
                                            queueLost.Inc()
                                    }
                                    RingbufReceived.Inc()
                            }


                            if record.LostSamples > 0 {
                                    RingbufLost.Add(float64(record.LostSamples))
                            }
                    }
            }
    }()

...

Another goroutine is launched;

  • for reading records from eventsQueue, where they are passed to receiveEvent() for processing

[Source]

snippet3.go
...

  // Start processing records from perf.
  wg.Add(1)
  go func() {
          defer wg.Done()
          for {
                  select {
                  case event := <-eventsQueue:
                          k.receiveEvent(event.RawSample)
                          queueReceived.Inc()
                  case <-stopCtx.Done():
                          k.log.Info("Listening for events completed.", logfields.Error, stopCtx.Err())
                          k.log.Debug(fmt.Sprintf("Unprocessed events in RB queue: %d", len(eventsQueue)))
                          return
                  }
          }
  }()


...

Processing

On calling receiveEvent()

  • it converts raw-bytes to events by passing data to HandlePerfData()
  • send events to various listeners

[Source]

snippet4.go
func (k *Observer) receiveEvent(data []byte) {
        var timer time.Time
        if option.Config.EnableMsgHandlingLatency {
                timer = time.Now()
        }


        op, events, err := HandlePerfData(data)
        opcodemetrics.OpTotalInc(ops.OpCode(op))
        if err != nil {
                errormetrics.HandlerErrorsInc(ops.OpCode(op), err.kind)
                switch err.kind {
                case errormetrics.HandlePerfUnknownOp:
                        k.log.Debug("unknown opcode ignored", "opcode", err.opcode)
                default:
                        k.log.Debug("error occurred in event handler", "opcode", err.opcode, logfields.Error, err)
                }
        }
        for _, event := range events {
                k.observerListeners(event)
        }
        if option.Config.EnableMsgHandlingLatency {
                opcodemetrics.LatencyStats.WithLabelValues(strconv.FormatUint(uint64(op), 10)).Observe(float64(time.Since(timer).Microseconds()))
        }
}

On calling HandlePerfData();

  • it tries to find event-specific handler using first-byte
  • calls the handler for parsing raw-bytes

[Source]

snippet5.go
func HandlePerfData(data []byte) (byte, []Event, *HandlePerfError) {
        op := data[0]
        r := bytes.NewReader(data)
        // These ops handlers are registered by RegisterEventHandlerAtInit().
        handler, ok := eventHandler[op]
        if !ok {
                return op, nil, &HandlePerfError{
                        kind:   errormetrics.HandlePerfUnknownOp,
                        err:    fmt.Errorf("unknown op: %d", op),
                        opcode: op,
                }
        }


        events, err := handler(r)
        if err != nil {
                return op, events, &HandlePerfError{
                        kind:   errormetrics.HandlePerfHandlerError,
                        err:    fmt.Errorf("handler for op %d failed: %w", op, err),
                        opcode: op,
                }
        }
        return op, events, nil
}

Tracee

As Tracee uses libbpfgo for loading eBPF objects, so there is a little difference in approach for preparation and reading of raw-data from perf/ring buffer. (extensive usage of go-channels)

Preparation

[Source]

PerfBuffer is initialized with eventsChannel a buffered-channel for receiving raw-event bytes.

snippet.go
...
    // Initialize perf buffers and needed channels

    t.eventsChannel = make(chan []byte, 1000)
    t.lostEvChannel = make(chan uint64)
    if t.config.PerfBufferSize < 1 {
        return errfmt.Errorf("invalid perf buffer size: %d", t.config.PerfBufferSize)
    }
    t.eventsPerfMap, err = t.bpfModule.InitPerfBuf(
        "events",
        t.eventsChannel,
        t.lostEvChannel,
        t.config.PerfBufferSize,
    )
    if err != nil {
        return errfmt.Errorf("error initializing events perf map: %v", err)
    }
...

Reading / Decoding

[Source]

Then handleEvents() is launched in a separate goroutine for handling all perf-events:

  • it further sends eventsChannel to decodeEvents()
    • that reads raw-events & decodes them
    • returns eventsChan for receiving decoded-events
snippet2.go
...
// handleEvents is the main pipeline of tracee. It receives events from the perf buffer
// and passes them through a series of stages, each stage is a goroutine that performs a
// specific task on the event. The pipeline is started in a separate goroutine.
func (t *Tracee) handleEvents(ctx context.Context, initialized chan<- struct{}) {
    logger.Debugw("Starting handleEvents goroutine")
    defer logger.Debugw("Stopped handleEvents goroutine")

    var errcList []<-chan error

    // Decode stage: events are read from the perf buffer and decoded into trace.Event type.

    eventsChan, errc := t.decodeEvents(ctx, t.eventsChannel)
    t.stats.Channels["decode"] = eventsChan
    errcList = append(errcList, errc)

    // Cache stage: events go through a caching function.

...

Processing

Events from eventsChan goes through several logical stages such as:

  • container enrichment
  • detection engine

finally all events are handled by sink stage for printing/logging.

[Source]

snippet3.go
    // Process events stage: events go through a processing functions.

    eventsChan, errc = t.processEvents(ctx, eventsChan)
    t.stats.Channels["process"] = eventsChan
    errcList = append(errcList, errc)

    // Enrichment stage: container events are enriched with additional runtime data.

    if !t.config.NoContainersEnrich { // TODO: remove safe-guard soon.
        eventsChan, errc = t.enrichContainerEvents(ctx, eventsChan)
        t.stats.Channels["enrich"] = eventsChan
        errcList = append(errcList, errc)
    }


    // Derive events stage: events go through a derivation function.

    eventsChan, errc = t.deriveEvents(ctx, eventsChan)
    t.stats.Channels["derive"] = eventsChan
    errcList = append(errcList, errc)

    // Engine events stage: events go through the signatures engine for detection.

    if t.config.EngineConfig.Mode == engine.ModeSingleBinary {
        eventsChan, errc = t.engineEvents(ctx, eventsChan)
        t.stats.Channels["engine"] = eventsChan
        errcList = append(errcList, errc)
    }

    // Sink pipeline stage: events go through printers.
    errc = t.sinkEvents(ctx, eventsChan)
    t.stats.Channels["sink"] = eventsChan
    errcList = append(errcList, errc)

References