start reworking exporters to be more composable

This commit is contained in:
Natalia Goc 2024-05-16 18:19:36 +02:00
parent ab5b70704d
commit fc92995cc8
10 changed files with 459 additions and 166 deletions

View File

@ -8,10 +8,13 @@ import (
"os/signal"
"time"
"git.ma-al.com/gora_filip/observer/pkg/level"
"git.ma-al.com/gora_filip/observer/pkg/tracer"
"git.ma-al.com/gora_filip/pkg/attr"
"git.ma-al.com/gora_filip/pkg/combined_exporter"
"git.ma-al.com/gora_filip/pkg/console_exporter"
"git.ma-al.com/gora_filip/pkg/fiber_tracing"
"git.ma-al.com/gora_filip/pkg/level"
"github.com/gofiber/fiber/v2"
"go.opentelemetry.io/otel/trace"
)
type AttributesX struct {
@ -23,13 +26,21 @@ func main() {
StreamRequestBody: true,
})
main.Use(tracer.NewTracer(tracer.Config{
AppName: "test",
JaegerUrl: "http://localhost:4318/v1/traces",
GelfUrl: "192.168.220.30:12201",
Version: "1",
lvl := level.DEBUG
exporter := combined_exporter.NewExporter(
console_exporter.NewExporter(
console_exporter.ExporterOptions{
FilterOnLevel: &lvl,
//EmitOnlyOnError: true,
},
))
main.Use(fiber_tracing.NewMiddleware(fiber_tracing.Config{
AppName: "example",
Version: "0.0.0",
ServiceProvider: "maal",
Exporter: exporter,
}))
defer tracer.ShutdownTracer()
defer fiber_tracing.ShutdownTracer()
main.Get("/", func(c *fiber.Ctx) error {
ctx, span := tracer.Handler(c)
@ -37,13 +48,7 @@ func main() {
span.AddEvent(
"smthing is happening",
trace.WithAttributes(
tracer.LongMessage("smthing is happening - long"),
tracer.JsonAttr("smth", map[string]interface{}{
"xd": 1,
}),
tracer.Level(level.ALERT),
),
attr.WithAttributes(attr.SeverityLevel(level.INFO), attr.SourceCodeLocation(1)),
)
err := Serv(ctx)

View File

@ -9,6 +9,7 @@ import (
"git.ma-al.com/gora_filip/pkg/level"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
)
type IntoTraceAttribute interface {
@ -19,6 +20,27 @@ type IntoTraceAttributes interface {
IntoTraceAttributes() []attribute.KeyValue
}
func CollectAttributes(attrs ...interface{}) []attribute.KeyValue {
collected := make([]attribute.KeyValue, len(attrs))
for _, a := range attrs {
switch a.(type) {
case []attribute.KeyValue:
collected = append(collected, a.([]attribute.KeyValue)...)
case attribute.KeyValue:
collected = append(collected, a.(attribute.KeyValue))
case IntoTraceAttribute:
collected = append(collected, a.(IntoTraceAttribute).IntoTraceAttribute())
case IntoTraceAttributes:
collected = append(collected, a.(IntoTraceAttributes).IntoTraceAttributes()...)
}
}
return collected
}
func WithAttributes(attrs ...interface{}) trace.SpanStartEventOption {
return trace.WithAttributes(CollectAttributes(attrs...)...)
}
const (
SeverityLevelKey = attribute.Key("level")
LogMessageLongKey = attribute.Key("log_message.long")

View File

@ -1,13 +1,28 @@
package code_location
import (
"runtime"
)
type CodeLocation struct {
FilePath string
FuncName string
LineNumber int
FilePath string
FuncName string
LineNumber int
ColumnNumber int
}
func FromStackTrace(...atDepth int) {
func FromStackTrace(atDepth ...int) CodeLocation {
skipLevelsInCallStack := 0
if len(atDepth) > 1 {
skipLevelsInCallStack = atDepth[0]
}
pc, file, line, _ := runtime.Caller(1 + skipLevelsInCallStack)
funcName := runtime.FuncForPC(pc).Name()
return CodeLocation{
FilePath: file,
LineNumber: line,
FuncName: funcName,
}
}

View File

@ -0,0 +1,40 @@
package combined_exporter
import (
"context"
"fmt"
"go.opentelemetry.io/otel/sdk/trace"
)
type Exporter struct {
exporters []trace.SpanExporter
}
func NewExporter(exporters ...trace.SpanExporter) trace.SpanExporter {
return &Exporter{
exporters: exporters,
}
}
// Implements [trace.SpanExporter]
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
for _, exp := range e.exporters {
exp.ExportSpans(ctx, spans)
}
return nil
}
// Implements [trace.SpanExporter]
func (e *Exporter) Shutdown(ctx context.Context) error {
var errs []error
for _, exp := range e.exporters {
err := exp.Shutdown(ctx)
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("multiple erros have occured: %#v", errs)
}
return nil
}

View File

@ -0,0 +1,110 @@
package console_exporter
import (
"context"
"fmt"
"sync"
"git.ma-al.com/gora_filip/pkg/level"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
)
type TraceFormatter interface {
FormatSpans(spans []trace.ReadOnlySpan, removeFields []attribute.Key, verbosityLevel level.SeverityLevel, addTraceId bool, onlyErrors bool) (string, error)
}
// Configuration for the exporter.
//
// Most of options are passed to the formatter.
type ExporterOptions struct {
// Try to parse filters from an environment variable with a name provided by this field.
// Result will only by applied to unset options. NOT IMPLEMENTED!
FilterFromEnvVar *string
// Filter the output based on the [level.SeverityLevel].
FilterOnLevel *level.SeverityLevel
// Fields that should be removed from the output.
FilterOutFields []attribute.Key
// Print only trace events instead of whole traces.
EmitEventsOnly bool
// Add trace id to output
EmitTraceId bool
// Print output only when an error is found
EmitOnlyOnError bool
// Used only when `EmitEventsOnly` is set to true.
TraceFormatter *TraceFormatter
}
type Exporter struct {
lvl level.SeverityLevel
removedFields []attribute.Key
addTraceId bool
onlyErrs bool
traceFormatter TraceFormatter
printerMu sync.Mutex
stoppedMu sync.RWMutex
stopped bool
}
func NewExporter(opts ExporterOptions) trace.SpanExporter {
var formatter TraceFormatter
var lvl level.SeverityLevel
if opts.TraceFormatter != nil {
formatter = *opts.TraceFormatter
} else {
formatter = TraceFormatter(&EventsOnlyFormatter{})
}
if opts.FilterOnLevel != nil {
lvl = *opts.FilterOnLevel
} else {
lvl = level.TRACE
}
return &Exporter{
traceFormatter: formatter,
removedFields: opts.FilterOutFields,
addTraceId: opts.EmitTraceId,
onlyErrs: opts.EmitOnlyOnError,
lvl: lvl,
}
}
// Implements [trace.SpanExporter]
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
return nil
}
if len(spans) == 0 {
return nil
}
e.printerMu.Lock()
defer e.printerMu.Unlock()
printLine, err := e.traceFormatter.FormatSpans(spans, e.removedFields, e.lvl, e.addTraceId, e.onlyErrs)
if err != nil {
fmt.Printf("FAILED TO FORMAT A TRACE WITH ERR: %#v\n", err)
}
if len(printLine) > 0 {
fmt.Println(printLine)
}
return nil
}
// Implements [trace.SpanExporter]
func (e *Exporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}

View File

@ -0,0 +1,93 @@
package console_exporter
import (
"fmt"
"slices"
"git.ma-al.com/gora_filip/pkg/attr"
"git.ma-al.com/gora_filip/pkg/code_location"
"git.ma-al.com/gora_filip/pkg/console_fmt"
"git.ma-al.com/gora_filip/pkg/level"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/semconv/v1.25.0"
)
// A formatter that will print only events using a multiline format with colors.
// It uses attributes from the [attr] and [semconv] packages.
type EventsOnlyFormatter struct{}
func (f *EventsOnlyFormatter) FormatSpans(spans []trace.ReadOnlySpan, removeFields []attribute.Key, verbosityLevel level.SeverityLevel, addTraceId bool, onlyOnError bool) (string, error) {
stubs := tracetest.SpanStubsFromReadOnlySpans(spans)
var formattedSpanString string
for i := range stubs {
stub := &stubs[i]
for j := range stub.Events {
var attributes map[attribute.Key]string = make(map[attribute.Key]string, 0)
var msg string
var lvl level.SeverityLevel
var isErr bool
var location code_location.CodeLocation
for _, attrKV := range stub.Attributes {
if _, exists := attributes[attrKV.Key]; !exists {
attributes[attrKV.Key] = attrKV.Value.AsString()
}
}
for _, attrKV := range stub.Events[j].Attributes {
switch attrKV.Key {
case attr.LogMessageLongKey:
msg = attrKV.Value.AsString()
case attr.LogMessageShortKey:
if len(msg) == 0 {
msg = attrKV.Value.AsString()
}
case attr.SeverityLevelKey:
lvl = level.FromString(attrKV.Value.AsString())
case semconv.CodeFilepathKey:
location.FilePath = attrKV.Value.AsString()
case semconv.CodeLineNumberKey:
location.LineNumber = int(attrKV.Value.AsInt64())
case semconv.CodeColumnKey:
location.ColumnNumber = int(attrKV.Value.AsInt64())
case semconv.ExceptionMessageKey:
attributes[attrKV.Key] = attrKV.Value.AsString()
isErr = true
default:
if !slices.Contains(removeFields, attrKV.Key) && len(attrKV.Key) > 0 {
attributes[attrKV.Key] = attrKV.Value.AsString()
}
}
}
if len(msg) == 0 {
msg = stub.Name
}
if addTraceId {
attributes[attribute.Key("trace_id")] = stub.SpanContext.TraceID().String()
}
if len(location.FilePath) > 0 {
attributes["code.location"] = fmt.Sprintf("%s:%d:%d", location.FilePath, location.LineNumber, location.ColumnNumber)
}
if !(!isErr && onlyOnError) && lvl <= verbosityLevel {
attrs := ""
for k, v := range attributes {
attrs += fmt.Sprintf("\t%s%s%s = %s\n", console_fmt.ColorBold, k, console_fmt.ColorReset, v)
}
formattedSpanString += fmt.Sprintf(
"%s %s\n%s",
fmt.Sprintf("%s[%s]", console_fmt.SeverityLevelToColor(lvl), lvl.String()),
fmt.Sprintf("%s%s", msg, console_fmt.ColorReset),
attrs,
)
}
}
}
return formattedSpanString, nil
}

81
pkg/console_fmt/fmt.go Normal file
View File

@ -0,0 +1,81 @@
package console_fmt
import (
"git.ma-al.com/gora_filip/pkg/level"
)
const (
ColorReset = "\033[0m"
ColorRed = "\033[31m"
ColorGreen = "\033[32m"
ColorYellow = "\033[33m"
ColorBlue = "\033[34m"
ColorPurple = "\033[35m"
ColorCyan = "\033[36m"
ColorWhite = "\033[37m"
ColorBlackOnYellow = "\033[43m\033[30m"
ColorWhiteOnRed = "\033[37m\033[41m"
ColorWhiteOnRedBlinking = "\033[37m\033[41m\033[5m"
ColorBold = "\033[1m"
)
func Red(txt string) string {
return ColorRed + txt + ColorReset
}
func Green(txt string) string {
return ColorGreen + txt + ColorReset
}
func Yellow(txt string) string {
return ColorYellow + txt + ColorReset
}
func Blue(txt string) string {
return ColorBlue + txt + ColorReset
}
func Purple(txt string) string {
return ColorPurple + txt + ColorReset
}
func Cyan(txt string) string {
return ColorCyan + txt + ColorReset
}
func White(txt string) string {
return ColorWhite + txt + ColorReset
}
func BlackOnYellow(txt string) string {
return ColorBlackOnYellow + txt + ColorReset
}
func WhiteOnRed(txt string) string {
return ColorWhiteOnRed + txt + ColorReset
}
func WhiteOnRedBlinking(txt string) string {
return ColorWhiteOnRedBlinking + txt + ColorReset
}
func SeverityLevelToColor(lvl level.SeverityLevel) string {
switch lvl {
case level.TRACE:
return ColorWhite
case level.DEBUG:
return ColorPurple
case level.INFO:
return ColorBlue
case level.WARN:
return ColorYellow
case level.ERR:
return ColorRed
case level.CRIT:
return ColorBlackOnYellow
case level.ALERT:
return ColorWhiteOnRed
default:
return ColorWhite
}
}

View File

@ -0,0 +1,73 @@
package fiber_tracing
import (
"context"
"log"
"github.com/gofiber/fiber/v2"
fiberOpentelemetry "github.com/psmarcin/fiber-opentelemetry/pkg/fiber-otel"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
trc "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
trace "go.opentelemetry.io/otel/trace"
)
var (
TracingError error = nil
TP trc.TracerProvider
)
type Config struct {
AppName string
Version string
ServiceProvider string
Exporter trc.SpanExporter
}
func newResource(config Config) *resource.Resource {
r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(config.AppName),
semconv.ServiceVersionKey.String(config.Version),
attribute.String("service.provider", config.ServiceProvider),
)
return r
}
func NewMiddleware(config Config) func(*fiber.Ctx) error {
var tracerProviders []trc.TracerProviderOption
tracerProviders = append(tracerProviders, trc.WithBatcher(config.Exporter))
tracerProviders = append(tracerProviders, trc.WithResource(newResource(config)))
TP = *trc.NewTracerProvider(tracerProviders...)
otel.SetTracerProvider(&TP)
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
if err != TracingError {
TracingError = err
log.Println(err)
}
}))
tracer := TP.Tracer("_maal-fiber-otel_")
return fiberOpentelemetry.New(
fiberOpentelemetry.Config{
Tracer: tracer,
SpanName: "{{ .Method }} {{ .Path }}",
TracerStartAttributes: []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindServer),
trace.WithNewRoot(),
},
},
)
}
func ShutdownTracer() {
if err := TP.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}

View File

@ -65,8 +65,6 @@ func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan)
attributes[string(attr.Key)] = GetByType(attr.Value)
}
attributes["from"] = "test"
for i := range stub.Events {
event := stub.Events[i]

View File

@ -3,157 +3,13 @@ package tracer
import (
"context"
"fmt"
"log"
"os"
"runtime"
gelfexporter "git.ma-al.com/gora_filip/observer/pkg/gelf_exporter"
"github.com/gofiber/fiber/v2"
fiberOpentelemetry "github.com/psmarcin/fiber-opentelemetry/pkg/fiber-otel"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
trc "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
trace "go.opentelemetry.io/otel/trace"
"runtime"
)
var (
TracingError error = nil
TP trc.TracerProvider
)
type CustomExporter struct {
jaeger *otlptrace.Exporter
stdouttrace *stdouttrace.Exporter
}
type Config struct {
AppName string
JaegerUrl string
GelfUrl string
Version string
}
func NewCustomExporter(jaegerUrl string) (trc.SpanExporter, error) {
var jaeg *otlptrace.Exporter
var outrace *stdouttrace.Exporter
var err error
outrace, err = stdouttrace.New(
stdouttrace.WithWriter(os.Stdout),
stdouttrace.WithPrettyPrint(),
stdouttrace.WithoutTimestamps(),
)
if err != nil {
return &CustomExporter{}, err
}
if len(jaegerUrl) > 0 {
jaeg = otlptracehttp.NewUnstarted(otlptracehttp.WithEndpointURL(jaegerUrl))
}
return &CustomExporter{
jaeger: jaeg,
stdouttrace: outrace,
}, nil
}
func (e *CustomExporter) ExportSpans(ctx context.Context, spans []trc.ReadOnlySpan) error {
if TracingError == nil {
if e.jaeger != nil {
err := e.jaeger.ExportSpans(ctx, spans)
return err
} else {
return e.printOnlyOnError(ctx, spans)
}
}
return nil
}
func (e *CustomExporter) printOnlyOnError(ctx context.Context, spans []trc.ReadOnlySpan) error {
var err error
for _, s := range spans {
if s.Status().Code == codes.Error {
err = e.stdouttrace.ExportSpans(ctx, spans)
break
}
}
return err
}
func (e *CustomExporter) Shutdown(ctx context.Context) error {
if e.jaeger != nil {
e.stdouttrace.Shutdown(ctx)
return e.jaeger.Shutdown(ctx)
} else {
return e.stdouttrace.Shutdown(ctx)
}
}
func newResource(config Config) *resource.Resource {
r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(config.AppName),
semconv.ServiceVersionKey.String(config.Version),
attribute.String("service.provider", "maal"),
)
return r
}
func NewTracer(config Config) func(*fiber.Ctx) error {
l := log.New(os.Stdout, "", 0)
var tracerProviders []trc.TracerProviderOption
otlpExporter := otlptracehttp.NewUnstarted(otlptracehttp.WithEndpointURL(config.JaegerUrl))
gelfExporter, err := gelfexporter.New(
gelfexporter.WithGelfUrl(config.GelfUrl),
gelfexporter.WithAppName("salego"),
)
if err != nil {
l.Fatal(err)
}
tracerProviders = append(tracerProviders, trc.WithBatcher(otlpExporter))
tracerProviders = append(tracerProviders, trc.WithBatcher(gelfExporter))
tracerProviders = append(tracerProviders, trc.WithResource(newResource(config)))
TP = *trc.NewTracerProvider(tracerProviders...)
otel.SetTracerProvider(&TP)
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
if err != TracingError {
TracingError = err
log.Println(err)
}
}))
tracer := TP.Tracer("fiber-otel-router")
return fiberOpentelemetry.New(
fiberOpentelemetry.Config{
Tracer: tracer,
SpanName: "{{ .Method }} {{ .Path }}",
TracerStartAttributes: []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindServer),
trace.WithNewRoot(),
},
},
)
}
func ShutdownTracer() {
if err := TP.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}
func Handler(fc *fiber.Ctx) (context.Context, trace.Span) {
spanName := fmt.Sprint(fc.OriginalURL())
simpleCtx, span := fiberOpentelemetry.Tracer.Start(fc.UserContext(), spanName)