reorganize exporters and simplify their use
This commit is contained in:
125
pkg/exporters/console_exporter/console_exporter.go
Normal file
125
pkg/exporters/console_exporter/console_exporter.go
Normal file
@ -0,0 +1,125 @@
|
||||
package console_exporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.ma-al.com/gora_filip/pkg/level"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
)
|
||||
|
||||
type TraceFormatter interface {
|
||||
FormatSpans(spans []trace.ReadOnlySpan, removeFields []attribute.Key, verbosityLevel level.SeverityLevel, addTraceId bool, onlyErrors bool) (string, error)
|
||||
}
|
||||
|
||||
// Configuration for the exporter.
|
||||
//
|
||||
// Most of options are passed to the formatter.
|
||||
type ExporterOptions struct {
|
||||
// Try to parse filters from an environment variable with a name provided by this field.
|
||||
// Result will only by applied to unset options. NOT IMPLEMENTED!
|
||||
FilterFromEnvVar *string
|
||||
// Filter the output based on the [level.SeverityLevel].
|
||||
FilterOnLevel *level.SeverityLevel
|
||||
// Fields that should be removed from the output.
|
||||
FilterOutFields []attribute.Key
|
||||
// Print only trace events instead of whole traces.
|
||||
EmitEventsOnly bool
|
||||
// Add trace id to output
|
||||
EmitTraceId bool
|
||||
// Print output only when an error is found
|
||||
EmitOnlyOnError bool
|
||||
// Used only when `EmitEventsOnly` is set to true.
|
||||
TraceFormatter *TraceFormatter
|
||||
}
|
||||
|
||||
type Exporter struct {
|
||||
lvl level.SeverityLevel
|
||||
removedFields []attribute.Key
|
||||
addTraceId bool
|
||||
onlyErrs bool
|
||||
traceFormatter TraceFormatter
|
||||
printerMu sync.Mutex
|
||||
stoppedMu sync.RWMutex
|
||||
stopped bool
|
||||
}
|
||||
|
||||
// NOTE: The configuration might change in future releases
|
||||
func DefaultConsoleExporter() trace.SpanExporter {
|
||||
lvl := level.DEBUG
|
||||
fmt := NewEventsOnlyFormatter()
|
||||
return NewExporter(ExporterOptions{
|
||||
FilterFromEnvVar: nil,
|
||||
FilterOnLevel: &lvl,
|
||||
FilterOutFields: []attribute.Key{},
|
||||
EmitEventsOnly: false,
|
||||
EmitTraceId: true,
|
||||
EmitOnlyOnError: false,
|
||||
TraceFormatter: &fmt,
|
||||
})
|
||||
}
|
||||
|
||||
func NewExporter(opts ExporterOptions) trace.SpanExporter {
|
||||
var formatter TraceFormatter
|
||||
var lvl level.SeverityLevel
|
||||
|
||||
if opts.TraceFormatter != nil {
|
||||
formatter = *opts.TraceFormatter
|
||||
} else {
|
||||
formatter = TraceFormatter(&EventsOnlyFormatter{})
|
||||
}
|
||||
if opts.FilterOnLevel != nil {
|
||||
lvl = *opts.FilterOnLevel
|
||||
} else {
|
||||
lvl = level.TRACE
|
||||
}
|
||||
|
||||
return &Exporter{
|
||||
traceFormatter: formatter,
|
||||
removedFields: opts.FilterOutFields,
|
||||
addTraceId: opts.EmitTraceId,
|
||||
onlyErrs: opts.EmitOnlyOnError,
|
||||
lvl: lvl,
|
||||
}
|
||||
}
|
||||
|
||||
// Implements [trace.SpanExporter]
|
||||
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
|
||||
e.stoppedMu.RLock()
|
||||
stopped := e.stopped
|
||||
e.stoppedMu.RUnlock()
|
||||
if stopped {
|
||||
return nil
|
||||
}
|
||||
if len(spans) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
e.printerMu.Lock()
|
||||
defer e.printerMu.Unlock()
|
||||
printLine, err := e.traceFormatter.FormatSpans(spans, e.removedFields, e.lvl, e.addTraceId, e.onlyErrs)
|
||||
if err != nil {
|
||||
fmt.Printf("FAILED TO FORMAT A TRACE WITH ERR: %#v\n", err)
|
||||
}
|
||||
if len(printLine) > 0 {
|
||||
fmt.Println(printLine)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Implements [trace.SpanExporter]
|
||||
func (e *Exporter) Shutdown(ctx context.Context) error {
|
||||
e.stoppedMu.Lock()
|
||||
e.stopped = true
|
||||
e.stoppedMu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
97
pkg/exporters/console_exporter/event_only_formatter.go
Normal file
97
pkg/exporters/console_exporter/event_only_formatter.go
Normal file
@ -0,0 +1,97 @@
|
||||
package console_exporter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
"git.ma-al.com/gora_filip/pkg/attr"
|
||||
"git.ma-al.com/gora_filip/pkg/code_location"
|
||||
"git.ma-al.com/gora_filip/pkg/console_fmt"
|
||||
"git.ma-al.com/gora_filip/pkg/level"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||
"go.opentelemetry.io/otel/semconv/v1.25.0"
|
||||
)
|
||||
|
||||
func NewEventsOnlyFormatter() TraceFormatter {
|
||||
return &EventsOnlyFormatter{}
|
||||
}
|
||||
|
||||
// A formatter that will print only events using a multiline format with colors.
|
||||
// It uses attributes from the [attr] and [semconv] packages.
|
||||
type EventsOnlyFormatter struct{}
|
||||
|
||||
func (f *EventsOnlyFormatter) FormatSpans(spans []trace.ReadOnlySpan, removeFields []attribute.Key, verbosityLevel level.SeverityLevel, addTraceId bool, onlyOnError bool) (string, error) {
|
||||
stubs := tracetest.SpanStubsFromReadOnlySpans(spans)
|
||||
|
||||
var formattedSpanString string
|
||||
|
||||
for i := range stubs {
|
||||
stub := &stubs[i]
|
||||
for j := range stub.Events {
|
||||
var attributes map[attribute.Key]string = make(map[attribute.Key]string, 0)
|
||||
var msg string
|
||||
var lvl level.SeverityLevel
|
||||
var isErr bool
|
||||
var location code_location.CodeLocation
|
||||
|
||||
for _, attrKV := range stub.Attributes {
|
||||
if _, exists := attributes[attrKV.Key]; !exists {
|
||||
attributes[attrKV.Key] = attrKV.Value.AsString()
|
||||
}
|
||||
}
|
||||
|
||||
for _, attrKV := range stub.Events[j].Attributes {
|
||||
switch attrKV.Key {
|
||||
case attr.LogMessageLongKey:
|
||||
msg = attrKV.Value.AsString()
|
||||
case attr.LogMessageShortKey:
|
||||
if len(msg) == 0 {
|
||||
msg = attrKV.Value.AsString()
|
||||
}
|
||||
case attr.SeverityLevelKey:
|
||||
lvl = level.FromString(attrKV.Value.AsString())
|
||||
case semconv.CodeFilepathKey:
|
||||
location.FilePath = attrKV.Value.AsString()
|
||||
case semconv.CodeLineNumberKey:
|
||||
location.LineNumber = int(attrKV.Value.AsInt64())
|
||||
case semconv.CodeColumnKey:
|
||||
location.ColumnNumber = int(attrKV.Value.AsInt64())
|
||||
case semconv.ExceptionMessageKey:
|
||||
attributes[attrKV.Key] = attrKV.Value.AsString()
|
||||
isErr = true
|
||||
default:
|
||||
if !slices.Contains(removeFields, attrKV.Key) && len(attrKV.Key) > 0 {
|
||||
attributes[attrKV.Key] = attrKV.Value.AsString()
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(msg) == 0 {
|
||||
msg = stub.Name
|
||||
}
|
||||
if addTraceId {
|
||||
attributes[attribute.Key("trace_id")] = stub.SpanContext.TraceID().String()
|
||||
}
|
||||
if len(location.FilePath) > 0 {
|
||||
attributes["code.location"] = fmt.Sprintf("%s:%d:%d", location.FilePath, location.LineNumber, location.ColumnNumber)
|
||||
}
|
||||
|
||||
if !(!isErr && onlyOnError) && lvl <= verbosityLevel {
|
||||
attrs := ""
|
||||
for k, v := range attributes {
|
||||
attrs += fmt.Sprintf("\t%s%s%s = %s\n", console_fmt.ColorBold, k, console_fmt.ColorReset, v)
|
||||
}
|
||||
|
||||
formattedSpanString += fmt.Sprintf(
|
||||
"%s %s\n%s",
|
||||
fmt.Sprintf("%s[%s]", console_fmt.SeverityLevelToColor(lvl), lvl.String()),
|
||||
fmt.Sprintf("%s%s", msg, console_fmt.ColorReset),
|
||||
attrs,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return formattedSpanString, nil
|
||||
}
|
58
pkg/exporters/exporters.go
Normal file
58
pkg/exporters/exporters.go
Normal file
@ -0,0 +1,58 @@
|
||||
package exporters
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.ma-al.com/gora_filip/pkg/exporters/console_exporter"
|
||||
gelf_exporter "git.ma-al.com/gora_filip/pkg/exporters/gelf_exporter"
|
||||
otlphttp_exporter "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
)
|
||||
|
||||
func NewWithConfig(exporter sdktrace.SpanExporter) ExporterWithConfig {
|
||||
return ExporterWithConfig{
|
||||
exporter: exporter,
|
||||
}
|
||||
}
|
||||
|
||||
// Combined exporter with batch processor config
|
||||
type ExporterWithConfig struct {
|
||||
exporter sdktrace.SpanExporter
|
||||
config []sdktrace.BatchSpanProcessorOption
|
||||
}
|
||||
|
||||
func (ecfg ExporterWithConfig) Add(opt sdktrace.BatchSpanProcessorOption) ExporterWithConfig {
|
||||
ecfg.config = append(ecfg.config, opt)
|
||||
return ecfg
|
||||
}
|
||||
|
||||
func (ecfg ExporterWithConfig) IntoTraceProviderOption() sdktrace.TracerProviderOption {
|
||||
return sdktrace.WithBatcher(ecfg.exporter, ecfg.config...)
|
||||
}
|
||||
|
||||
// An exporter printing to console with very small delay
|
||||
func DevConsoleExporter(opts ...console_exporter.ExporterOptions) ExporterWithConfig {
|
||||
batchTimeout := (time.Millisecond * 250)
|
||||
exportTimeout := (time.Millisecond * 250)
|
||||
var exporter ExporterWithConfig
|
||||
if len(opts) > 0 {
|
||||
exporter = NewWithConfig(console_exporter.NewExporter(opts[0]))
|
||||
} else {
|
||||
exporter = NewWithConfig(console_exporter.DefaultConsoleExporter())
|
||||
}
|
||||
return exporter.Add(sdktrace.WithBatchTimeout(batchTimeout)).Add(sdktrace.WithExportTimeout(exportTimeout))
|
||||
}
|
||||
|
||||
// Default exporter to Graylog.
|
||||
func GelfExporter(opts ...gelf_exporter.Option) (ExporterWithConfig, error) {
|
||||
gelfExp, err := gelf_exporter.New(opts...)
|
||||
return NewWithConfig(gelfExp), err
|
||||
}
|
||||
|
||||
// Exporter for traces over HTTP. Can be used with Jaeger.
|
||||
// See documentation of [otlhttp_exporter] for details.
|
||||
func OtlpHTTPExporter(opts ...otlphttp_exporter.Option) (ExporterWithConfig, error) {
|
||||
otlpExp, err := otlphttp_exporter.New(context.Background(), opts...)
|
||||
return NewWithConfig(otlpExp), err
|
||||
}
|
42
pkg/exporters/gelf_exporter/config.go
Normal file
42
pkg/exporters/gelf_exporter/config.go
Normal file
@ -0,0 +1,42 @@
|
||||
package gelfexporter
|
||||
|
||||
type config struct {
|
||||
GelfUrl string
|
||||
AppName string
|
||||
}
|
||||
|
||||
// newConfig creates a validated Config configured with options.
|
||||
func newConfig(options ...Option) (config, error) {
|
||||
cfg := config{}
|
||||
for _, opt := range options {
|
||||
cfg = opt.apply(cfg)
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// Option sets the value of an option for a Config.
|
||||
type Option interface {
|
||||
apply(config) config
|
||||
}
|
||||
|
||||
func WithGelfUrl(url string) Option {
|
||||
return gelfUrlOption(url)
|
||||
}
|
||||
|
||||
type gelfUrlOption string
|
||||
|
||||
func (o gelfUrlOption) apply(cfg config) config {
|
||||
cfg.GelfUrl = string(o)
|
||||
return cfg
|
||||
}
|
||||
|
||||
func WithAppName(url string) Option {
|
||||
return appName(url)
|
||||
}
|
||||
|
||||
type appName string
|
||||
|
||||
func (o appName) apply(cfg config) config {
|
||||
cfg.AppName = string(o)
|
||||
return cfg
|
||||
}
|
49
pkg/exporters/gelf_exporter/gelf.go
Normal file
49
pkg/exporters/gelf_exporter/gelf.go
Normal file
@ -0,0 +1,49 @@
|
||||
package gelfexporter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"time"
|
||||
|
||||
"git.ma-al.com/gora_filip/pkg/syslog"
|
||||
"gopkg.in/Graylog2/go-gelf.v2/gelf"
|
||||
)
|
||||
|
||||
type GELFMessage struct {
|
||||
// Name of the application
|
||||
Host string `json:"host"`
|
||||
// Short, descriptive message
|
||||
ShortMessage string `json:"short_message"`
|
||||
// Optional long message.
|
||||
LongMessage string `json:"long_message,omitempty"`
|
||||
// Timestamp in Unix
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
// Severity level matching Syslog standard.
|
||||
Level syslog.SyslogLevel `json:"level"`
|
||||
|
||||
// All additional field names must start with an underline.
|
||||
ExtraFields map[string]interface{} `json:"extrafields,omitempty"`
|
||||
}
|
||||
|
||||
func Log(writer *gelf.UDPWriter, msg GELFMessage) {
|
||||
if writer != nil {
|
||||
err := writer.WriteMessage(msg.GELFFormat())
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
fmt.Printf("msg: %v sent\n", msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (g GELFMessage) GELFFormat() *gelf.Message {
|
||||
return &gelf.Message{
|
||||
Version: "1.1",
|
||||
Host: g.Host,
|
||||
Short: g.ShortMessage,
|
||||
Full: g.LongMessage,
|
||||
TimeUnix: float64(g.Timestamp.Unix()),
|
||||
Level: int32(g.Level),
|
||||
Extra: g.ExtraFields,
|
||||
}
|
||||
}
|
144
pkg/exporters/gelf_exporter/trace.go
Normal file
144
pkg/exporters/gelf_exporter/trace.go
Normal file
@ -0,0 +1,144 @@
|
||||
package gelfexporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"git.ma-al.com/gora_filip/pkg/attr"
|
||||
"git.ma-al.com/gora_filip/pkg/level"
|
||||
"git.ma-al.com/gora_filip/pkg/syslog"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||
"gopkg.in/Graylog2/go-gelf.v2/gelf"
|
||||
)
|
||||
|
||||
var _ trace.SpanExporter = &Exporter{}
|
||||
|
||||
// New creates an Exporter with the passed options.
|
||||
func New(options ...Option) (*Exporter, error) {
|
||||
cfg, err := newConfig(options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
writer, err := gelf.NewUDPWriter(cfg.GelfUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Exporter{
|
||||
gelfWriter: writer,
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
type Exporter struct {
|
||||
gelfWriter *gelf.UDPWriter
|
||||
appName string
|
||||
stoppedMu sync.RWMutex
|
||||
stopped bool
|
||||
}
|
||||
|
||||
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
e.stoppedMu.RLock()
|
||||
stopped := e.stopped
|
||||
e.stoppedMu.RUnlock()
|
||||
if stopped {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(spans) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
stubs := tracetest.SpanStubsFromReadOnlySpans(spans)
|
||||
|
||||
for i := range stubs {
|
||||
stub := &stubs[i]
|
||||
|
||||
var attributes = make(map[string]interface{})
|
||||
for _, attr := range stub.Attributes {
|
||||
attributes[string(attr.Key)] = GetByType(attr.Value)
|
||||
}
|
||||
|
||||
for i := range stub.Events {
|
||||
event := stub.Events[i]
|
||||
|
||||
var gelf GELFMessage = GELFMessage{
|
||||
Host: e.appName,
|
||||
ShortMessage: event.Name,
|
||||
Timestamp: stub.StartTime,
|
||||
// Defaults to ALERT since we consider lack of the level a serious error that should be fixed ASAP.
|
||||
// Otherwise some dangerous unexpected behaviour could go unnoticed.
|
||||
Level: syslog.ALERT,
|
||||
ExtraFields: attributes,
|
||||
}
|
||||
for _, attrKV := range event.Attributes {
|
||||
if attrKV.Key == "long_message" {
|
||||
gelf.LongMessage = attrKV.Value.AsString()
|
||||
continue
|
||||
}
|
||||
|
||||
if attrKV.Key == attr.SeverityLevelKey {
|
||||
gelf.Level = level.FromString(attrKV.Value.AsString()).IntoSyslogLevel()
|
||||
continue
|
||||
}
|
||||
|
||||
attributes[string(attrKV.Key)] = GetByType(attrKV.Value)
|
||||
}
|
||||
|
||||
Log(e.gelfWriter, gelf)
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetByType(val attribute.Value) interface{} {
|
||||
switch val.Type() {
|
||||
case attribute.INVALID:
|
||||
return "invalid value"
|
||||
case attribute.BOOL:
|
||||
return val.AsBool()
|
||||
case attribute.INT64:
|
||||
return val.AsInt64()
|
||||
case attribute.FLOAT64:
|
||||
return val.AsFloat64()
|
||||
case attribute.STRING:
|
||||
return val.AsString()
|
||||
case attribute.BOOLSLICE:
|
||||
return val.AsBoolSlice()
|
||||
case attribute.INT64SLICE:
|
||||
return val.AsInt64Slice()
|
||||
case attribute.FLOAT64SLICE:
|
||||
return val.AsFloat64Slice()
|
||||
case attribute.STRINGSLICE:
|
||||
return val.AsStringSlice()
|
||||
}
|
||||
|
||||
return "invalid value"
|
||||
}
|
||||
|
||||
// Shutdown is called to stop the exporter, it performs no action.
|
||||
func (e *Exporter) Shutdown(ctx context.Context) error {
|
||||
e.stoppedMu.Lock()
|
||||
e.stopped = true
|
||||
e.stoppedMu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// // MarshalLog is the marshaling function used by the logging system to represent this Exporter.
|
||||
// func (e *Exporter) MarshalLog() interface{} {
|
||||
// return struct {
|
||||
// Type string
|
||||
// WithTimestamps bool
|
||||
// }{
|
||||
// Type: "stdout",
|
||||
// WithTimestamps: e.timestamps,
|
||||
// }
|
||||
// }
|
Reference in New Issue
Block a user