Go语言学习13-常见软件架构的实现
架构模式
An architectural pattern is a general, reusable solution to a commonly occurring problem in software architectural within a given context. ——wikipedia
Pipe-Filter 架构
Pipe-Filter 模式
-
非常适合于数据处理及数据分析系统
-
Filter 封装数据处理的功能
-
松耦合: Filter只跟数据(格式) 耦合
-
Pipe用于连接 Filter 传递数据或者在异步处理过程中缓冲数据流
进程内同步调用时, pipe 演变为数据在方法调用间传递
Filter和组合模式
示例
// filter.go
// Package pipefilter is to define the interfaces
// and the structures for pipe-filter style implementation
package pipefilter
// Request is the input of the filter
type Request interface{}
// Response is the output of the filter
type Response interface{}
// Filter interface is the definition of the data processing components
// Pipe-Filter structure
type Filter interface {
Process(data Request) (Response, error)
}
// split_filter.go
package pipefilter
import (
"errors"
"strings"
)
var SplitFilterWrongFormatError = errors.New("input data should be string")
type SplitFilter struct {
delimiter string
}
func NewSplitFilter(delimiter string) *SplitFilter {
return &SplitFilter{delimiter}
}
func (sf *SplitFilter) Process(data Request) (Response, error) {
str, ok := data.(string) // 检查数据格式/类型, 是否可以处理
if !ok {
return nil, SplitFilterWrongFormatError
}
parts := strings.Split(str, sf.delimiter)
return parts, nil
}
// to_int_filter.go
package pipefilter
import (
"errors"
"strconv"
)
var ToIntFilterWrongFormatError = errors.New("input data should be []string")
type ToIntFilter struct {
}
func NewToIntFilter() *ToIntFilter {
return &ToIntFilter{}
}
func (tif *ToIntFilter) Process(data Request) (Response, error) {
parts, ok := data.([]string)
if !ok {
return nil, ToIntFilterWrongFormatError
}
ret := []int{}
for _, part := range parts {
s, err := strconv.Atoi(part)
if err != nil {
return nil, err
}
ret = append(ret, s)
}
return ret, nil
}
// sum_filter.go
package pipefilter
import "errors"
var SumFilterWrongFormatError = errors.New("input data should be []int")
type SumFilter struct {
}
func NewSumFilter() *SumFilter {
return &SumFilter{}
}
func (sf *SumFilter) Process(data Request) (Response, error) {
elems, ok := data.([]int)
if !ok {
return nil, SumFilterWrongFormatError
}
ret := 0
for _, elem := range elems {
ret += elem
}
return ret, nil
}
// straight_pipeline.go
package pipefilter
// StraightPipeline is composed of the filters, and the filters are piled as a straight line.
type StraightPipeline struct {
Name string
Filters *[]Filter
}
// NewStraightPipeline create a new StraightPipelineWithWallTime
func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline {
return &StraightPipeline{
Name: name,
Filters: &filters,
}
}
// Process is to process the coming data by the pipeline
func (f *StraightPipeline) Process(data Request) (Response, error) {
var ret interface{}
var err error
for _, filter := range *f.Filters {
ret, err = filter.Process(data)
if err != nil {
return ret, err
}
data = ret
}
return ret, err
}
Micro Kernel架构
- 特点
- 易于扩展
- 错误隔离
- 保持架构一致性
- 要点
- 内核包含公共流程或通用逻辑
- 将可变或可扩展部分规划为扩展点
- 抽象扩展点行为, 定义接口
- 利用插件进行扩展
示例
package microkernel
import (
"context"
"errors"
"fmt"
"strings"
"sync"
)
const (
Waiting = iota
Running
)
var WrongStateError = errors.New("can not take the operation in the current state")
type CollectorsError struct {
CollectorsErrors []error
}
func (ce CollectorsError) Error() string {
var strs []string
for _, err := range ce.CollectorsErrors {
strs = append(strs, err.Error())
}
return strings.Join(strs, ";")
}
type Event struct {
Source string
Content string
}
type EventReceiver interface {
OnEvent(evt Event)
}
type Collector interface {
Init(evtReceiver EventReceiver) error
Start(agtCtx context.Context) error
Stop() error
Destroy() error
}
type Agent struct {
collectors map[string]Collector
evtBuf chan Event
cancel context.CancelFunc
ctx context.Context
state int
}
func (agt *Agent) EventProcessGroutine() {
var evtSeg [10]Event
for {
for i := 0; i < 10; i++ {
select {
case evtSeg[i] = <-agt.evtBuf:
case <-agt.ctx.Done():
return
}
}
fmt.Println(evtSeg)
}
}
func NewAgent(sizeEvtBuf int) *Agent {
agt := Agent{
collectors: map[string]Collector{},
evtBuf: make(chan Event, sizeEvtBuf),
state: Waiting,
}
return &agt
}
func (agt *Agent) RegisterCollector(name string, collector Collector) error {
if agt.state != Waiting {
return WrongStateError
}
agt.collectors[name] = collector
return collector.Init(agt)
}
func (agt *Agent) startCollectors() error {
var err error
var errs CollectorsError
var mutex sync.Mutex
for name, collector := range agt.collectors {
go func(name string, collector Collector, ctx context.Context) {
defer func() {
mutex.Unlock()
}()
err = collector.Start(ctx)
mutex.Lock()
if err != nil {
errs.CollectorsErrors = append(errs.CollectorsErrors,
errors.New(name+":"+err.Error()))
}
}(name, collector, agt.ctx)
}
if len(errs.CollectorsErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) stopCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Stop(); err != nil {
errs.CollectorsErrors = append(errs.CollectorsErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorsErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) destroyCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Destroy(); err != nil {
errs.CollectorsErrors = append(errs.CollectorsErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorsErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) Start() error {
if agt.state != Waiting {
return WrongStateError
}
agt.state = Running
agt.ctx, agt.cancel = context.WithCancel(context.Background())
go agt.EventProcessGroutine()
return agt.startCollectors()
}
func (agt *Agent) Stop() error {
if agt.state != Running {
return WrongStateError
}
agt.state = Waiting
agt.cancel()
return agt.stopCollectors()
}
func (agt *Agent) Destroy() error {
if agt.state != Waiting {
return WrongStateError
}
return agt.destroyCollectors()
}
func (agt *Agent) OnEvent(evt Event) {
agt.evtBuf <- evt
}