observer/pkg/exporters/gelf_exporter/trace.go

154 lines
3.4 KiB
Go

package gelfexporter
import (
"context"
"sync"
"git.ma-al.com/maal-libraries/observer/pkg/attr"
"git.ma-al.com/maal-libraries/observer/pkg/level"
"git.ma-al.com/maal-libraries/observer/pkg/syslog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"gopkg.in/Graylog2/go-gelf.v2/gelf"
)
var _ trace.SpanExporter = &Exporter{}
// New creates an Exporter with the passed options.
func New(options ...Option) (*Exporter, error) {
cfg, err := newConfig(options...)
if err != nil {
return nil, err
}
writer, err := gelf.NewUDPWriter(cfg.GelfUrl)
if err != nil {
return nil, err
}
return &Exporter{
gelfWriter: writer,
}, nil
}
type Exporter struct {
gelfWriter *gelf.UDPWriter
appName string
stoppedMu sync.RWMutex
stopped bool
}
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
if err := ctx.Err(); err != nil {
return err
}
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
return nil
}
if len(spans) == 0 {
return nil
}
stubs := tracetest.SpanStubsFromReadOnlySpans(spans)
for i := range stubs {
stub := &stubs[i]
var attributes = make(map[string]interface{})
for _, attr := range stub.Attributes {
attributes[string(attr.Key)] = GetByType(attr.Value)
}
for _, attr := range stub.Resource.Attributes() {
attributes[string(attr.Key)] = GetByType(attr.Value)
}
for i := range stub.Events {
event := stub.Events[i]
var gelf GELFMessage = GELFMessage{
Host: e.appName,
ShortMessage: event.Name,
Timestamp: stub.StartTime,
// Defaults to ALERT since we consider lack of the level a serious error that should be fixed ASAP.
// Otherwise some dangerous unexpected behaviour could go unnoticed.
Level: syslog.ALERT,
ExtraFields: attributes,
}
for _, attrKV := range event.Attributes {
if attrKV.Key == attr.LogMessageLongKey {
gelf.LongMessage = attrKV.Value.AsString()
continue
}
if attrKV.Key == attr.LogMessageShortKey {
gelf.ShortMessage = attrKV.Value.AsString()
continue
}
if attrKV.Key == attr.SeverityLevelKey {
gelf.Level = level.FromString(attrKV.Value.AsString()).IntoSyslogLevel()
continue
}
attributes[string(attrKV.Key)] = GetByType(attrKV.Value)
}
Log(e.gelfWriter, gelf)
}
}
return nil
}
func GetByType(val attribute.Value) interface{} {
switch val.Type() {
case attribute.INVALID:
return "invalid value"
case attribute.BOOL:
return val.AsBool()
case attribute.INT64:
return val.AsInt64()
case attribute.FLOAT64:
return val.AsFloat64()
case attribute.STRING:
return val.AsString()
case attribute.BOOLSLICE:
return val.AsBoolSlice()
case attribute.INT64SLICE:
return val.AsInt64Slice()
case attribute.FLOAT64SLICE:
return val.AsFloat64Slice()
case attribute.STRINGSLICE:
return val.AsStringSlice()
}
return "invalid value"
}
// Shutdown is called to stop the exporter, it performs no action.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()
return nil
}
// // MarshalLog is the marshaling function used by the logging system to represent this Exporter.
// func (e *Exporter) MarshalLog() interface{} {
// return struct {
// Type string
// WithTimestamps bool
// }{
// Type: "stdout",
// WithTimestamps: e.timestamps,
// }
// }