k8s与日志--采用golang实 现Fluent Bit的output插件
s" "time" "unsafe" "github.com/Shopify/sarama" "github.com/fluent/fluent-bit-go/output" "github.com/ugorji/go/codec" ) var ( brokers []string producer sarama.SyncProducer timeout = 0 * time.Minute topic string module string messageKey string ) //export FLBPluginRegister func FLBPluginRegister(ctx unsafe.Pointer) int { return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!") } //export FLBPluginInit // ctx (context) pointer to fluentbit context (state/ c code) func FLBPluginInit(ctx unsafe.Pointer) int { if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" { brokers = strings.Split(bs, ",") } else { log.Printf("you must set brokers") return output.FLB_ERROR } if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" { topic = tp } else { log.Printf("you must set topics") return output.FLB_ERROR } if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" { module = mo } else { log.Printf("you must set module") return output.FLB_ERROR } if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" { messageKey = key } else { log.Printf("you must set message_key") return output.FLB_ERROR } config := sarama.NewConfig() config.Producer.Return.Successes = true if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" { if acks, err := strconv.Atoi(required_acks); err == nil { config.Producer.RequiredAcks = sarama.RequiredAcks(acks) } } if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" { if codec, err := strconv.Atoi(compression_codec); err == nil { config.Producer.Compression = sarama.CompressionCodec(codec) } } if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" { if max_retry, err := strconv.Atoi(max_retry); err == nil { config.Producer.Retry.Max = max_retry } } if timeout == 0 { timeout = 5 * time.Minute } // If Kafka is not running on init, wait to connect deadline := time.Now().Add(timeout) for tries := 0; time.Now().Before(deadline); tries++ { var err error if producer == nil { producer, err = sarama.NewSyncProducer(brokers, config) } if err == nil { return output.FLB_OK } log.Printf("Cannot connect to Kafka: (%s) retrying...", err) time.Sleep(time.Second * 30) } log.Printf("Kafka failed to respond after %s", timeout) return output.FLB_ERROR } //export FLBPluginFlush // FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent. func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { var h codec.MsgpackHandle var b []byte var m interface{} var err error b = C.GoBytes(data, length) dec := codec.NewDecoderBytes(b, &h) // Iterate the original MessagePack array var msgs []*sarama.ProducerMessage for { // decode the msgpack data err = dec.Decode(&m) if err != nil { if err == io.EOF { break } log.Printf("Failed to decode msgpack data: %v\n", err) return output.FLB_ERROR } // Get a slice and their two entries: timestamp and map slice := reflect.ValueOf(m) data := slice.Index(1) // Convert slice data to a real map and iterate mapData := data.Interface().(map[interface{}]interface{}) flattenData, err := Flatten(mapData, "", UnderscoreStyle) if err != nil { break } message := "" host := "" for k, v := range flattenData { value := "" switch t := v.(type) { case string: value = t case []byte: value = string(t) default: value = fmt.Sprintf("%v", v) } if k == "pod_name" { host = value } if k == messageKey { message = value } } if message == "" || host == "" { break } m := &sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)), Value: sarama.ByteEncoder(message), } msgs = append(msgs, m) } err = producer.SendMessages(msgs) if err != nil { log.Printf("FAILED to send kafka message: %s\n", err) return output.FLB_ERROR } return output.FLB_OK } //export FLBPluginExit func FLBPluginExit() int { producer.Close() return output.FLB_OK } func main() { } FLBPluginExit 插件退出的时候需要执行的一些方法,比如关闭连接。 FLBPluginRegister 注册插件 FLBPluginInit 插件初始化 FLBPluginFlush flush到数据到output FLBPluginConfigKey 获取配置文件中参数 PS 当然除了FLBPluginConfigKey之外,也可以通过获取环境变量来获得设置参数。 ctx相当于一个上下文,负责之间的数据的传递。 编译和执行 编译的时候 go build -buildmode=c-shared -o out_kafka.so . 生成out_kafka.so 执行的时候 /fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so 总结 采用类似的编写结构,就可以定制化自己的输出插件了。 本文转自SegmentFault-k8s与日志--采用golang实现Fluent Bit的output插件