Go分布式爬虫笔记(二十)

news2025/1/13 23:21:15

文章目录

  • 20 调度引擎
  • 调度引擎目标
  • 通道
  • 函数选项模式
    • 函数式选项模式的好处
  • 通道底层原理
    • 无缓冲区的通道
    • 带缓冲区的通道
    • Select 机制的底层原理
  • 思考题
    • 在我们的课程中,schedule 函数其实有一个 bug,您能看出来吗?你觉得可以用什么方式找出这样的 Bug?

20 调度引擎

调度引擎目标

  • 创建调度程序,接收任务并将任务存储起来
  • 执行调度任务,通过一定的调度算法将任务调度到合适的 worker 中执行
  • 创建指定数量的 worker,完成实际任务的处理
  • 创建数据处理协程,对爬取到的数据进行进一步处理

scheduler/scheduler.go

package engine

import (
	"github.com/funbinary/go_example/example/crawler/13/collect"
	"go.uber.org/zap"
)

type ScheduleEngine struct {
	requestCh chan *collect.Request //负责接收请求
	workerCh  chan *collect.Request //负责分配任务给 worker
	WorkCount int                   //为执行任务的数量,可以灵活地去配置。
	Fetcher   collect.Fetcher
	Logger    *zap.Logger
	out       chan collect.ParseResult 负责处理爬取后的数据,完成下一步的存储操作。schedule 函数会创建调度程序,负责的是调度的核心逻辑。
	Seeds     []*collect.Request
}

func (s *ScheduleEngine) Run() {
	s.requestCh = make(chan *collect.Request)
	s.workerCh = make(chan *collect.Request)
	s.out = make(chan collect.ParseResult)
	go s.Schedule()
	// 创建指定数量的 worker,完成实际任务的处理
	// 其中
	for i := 0; i < s.WorkCount; i++ {
		go s.CreateWork()
	}
	s.HandleResult()
}

func (s *ScheduleEngine) Schedule() {
	// workerCh
	var reqQueue = s.Seeds
	go func() {
		for {
			var req *collect.Request
			var ch chan *collect.Request

			//如果任务队列 reqQueue 大于 0,意味着有爬虫任务,这时我们获取队列中第一个任务,并将其剔除出队列。

			if len(reqQueue) > 0 {
				req = reqQueue[0]
				reqQueue = reqQueue[1:]
				ch = s.workerCh
			}
			select {
			case r := <-s.requestCh:
				// 接收来自外界的请求,并将请求存储到 reqQueue 队列中
				reqQueue = append(reqQueue, r)

			case ch <- req:
				// ch <- req 会将任务发送到 workerCh 通道中,等待 worker 接收。
			}
		}
	}()

}

func (s *ScheduleEngine) CreateWork() {
	for {
		// 接收到调度器分配的任务;
		r := <-s.workerCh
		// 访问服务器
		body, err := s.Fetcher.Get(r)
		if err != nil {
			s.Logger.Error("can't fetch ",
				zap.Error(err),
			)
			continue
		}
		//解析服务器返回的数据
		result := r.ParseFunc(body, r)
		// 将返回的数据发送到 out 通道中,方便后续的处理。
		s.out <- result
	}
}

func (s *ScheduleEngine) HandleResult() {
	for {
		select {
		// 接收所有 worker 解析后的数据
		case result := <-s.out:
			// 要进一步爬取的 Requests 列表将全部发送回 s.requestCh 通道
			for _, req := range result.Requesrts {
				s.requestCh <- req
			}
			//包含了我们实际希望得到的结果,所以我们先用日志把结果打印出来
			for _, item := range result.Items {
				// todo: store
				s.Logger.Sugar().Info("get result", item)
			}
		}
	}
}

main.go

package main

import (
	"fmt"
	"github.com/funbinary/go_example/example/crawler/13/collect"
	"github.com/funbinary/go_example/example/crawler/13/engine"
	"github.com/funbinary/go_example/example/crawler/13/log"
	"github.com/funbinary/go_example/example/crawler/13/parse/doubangroup"
	"github.com/funbinary/go_example/example/crawler/13/proxy"
	"go.uber.org/zap/zapcore"

	"time"
)

// xpath
func main() {
	plugin := log.NewStdoutPlugin(zapcore.InfoLevel)
	logger := log.NewLogger(plugin)
	logger.Info("log init end")

	proxyURLs := []string{"http://127.0.0.1:10809", "http://127.0.0.1:10809"}
	p, err := proxy.RoundRobinProxySwitcher(proxyURLs...)
	if err != nil {
		logger.Error("RoundRobinProxySwitcher failed")
	}

	// douban cookie
	var seeds []*collect.Request
	for i := 0; i <= 0; i += 25 {
		str := fmt.Sprintf("https://www.douban.com/group/szsh/discussion?start=%d", i)
		seeds = append(seeds, &collect.Request{
			Url:       str,
			WaitTime:  1 * time.Second,
			Cookie:    "bid=-UXUw--yL5g; dbcl2=\"214281202:q0BBm9YC2Yg\"; __yadk_uid=jigAbrEOKiwgbAaLUt0G3yPsvehXcvrs; push_noty_num=0; push_doumail_num=0; __utmz=30149280.1665849857.1.1.utmcsr=accounts.douban.com|utmccn=(referral)|utmcmd=referral|utmcct=/; __utmv=30149280.21428; ck=SAvm; _pk_ref.100001.8cb4=%5B%22%22%2C%22%22%2C1665925405%2C%22https%3A%2F%2Faccounts.douban.com%2F%22%5D; _pk_ses.100001.8cb4=*; __utma=30149280.2072705865.1665849857.1665849857.1665925407.2; __utmc=30149280; __utmt=1; __utmb=30149280.23.5.1665925419338; _pk_id.100001.8cb4=fc1581490bf2b70c.1665849856.2.1665925421.1665849856.",
			ParseFunc: doubangroup.ParseURL,
		})
	}

	var f collect.Fetcher = &collect.BrowserFetch{
		Timeout: 3000 * time.Millisecond,
		Logger:  logger,
		Proxy:   p,
	}

	s := engine.ScheduleEngine{
		WorkCount: 5,
		Logger:    logger,
		Fetcher:   f,
		Seeds:     seeds,
	}
	s.Run()

}

通道

特性

  • 我们往 nil 通道中写入数据会陷入到堵塞的状态。因此,如果 reqQueue 为空,这时 req 和 ch 都是 nil,当前协程就会陷入到堵塞的状态,直到接收到新的请求才会被唤醒。

    package main
    
    import (
    	"fmt"
    )
    
    func main() {
    	var ch chan *int
    	go func() {
    		<-ch
    	}()
    	select {
    	case ch <- nil:
    		fmt.Println("it's time")
    	}
    
    }
    //fatal error: all goroutines are asleep - deadlock!
    
    

函数选项模式

  • 原由: 在实践中函数可能有几十个参数等着我们赋值,不同参数的灵活组合可能会带来不同的调度器类型。在实践中为了方便使用,开发者可能会创建非常多的 API 来满足不同场景的需要,随着参数的不断增多,这种 API 会变得越来越多,这就增加了开发者的心理负担。

    // 基本调度器
    func NewBaseSchedule() *Schedule {
      return &Schedule{
        WorkCount: 1,
        Fetcher:baseFetch,
      }
    }
    // 多worker调度器
    func NewMultiWorkSchedule(workCount int) *Schedule {
      return &Schedule{
        WorkCount: workCount,
        Fetcher:baseFetch,
      }
    }
    
    // 代理调度器
    func NewProxySchedule(proxy string) *Schedule {
      return &Schedule{
        WorkCount: 1,
        Fetcher:proxyFetch(proxy),
      }
    }
    
  • 另一种使用方式就是传递一个统一的 Config 配置结构,如下所示。这种方式只需要创建单个 API,但是需要在内部对所有的变量进行判断,繁琐且不优雅。对于使用者来说,也很难确定自己需要使用哪一个字段。

    type Config struct {
      WorkCount int
      Fetcher   collect.Fetcher
      Logger    *zap.Logger
      Seeds     []*collect.Request
    }
    
    func NewSchedule(c *Config) *Schedule {
      var s = &Schedule{}
      if c.Seeds != nil {
        s.Seeds = c.Seeds
      }
      if c.Fetcher != nil {
        s.Fetcher = c.Fetcher
      }
    
      if c.Logger != nil {
        s.Logger = c.Logger
      }
      ...
      return s
    }
    
  • Rob Pike 在 2014 年的一篇博客中提到了一种优雅的处理方法叫做函数式选项模式 (Functional Options)。这种模式展示了闭包函数的有趣用途,目前在很多开源库中都能看到它的身影,我们项目中使用的日志库 Zap 也使用了这种模式。

  1. 我们要对 schedule 结构进行改造,把可以配置的参数放入到options 结构中:
type Schedule struct {
  requestCh chan *collect.Request
  workerCh  chan *collect.Request
  out       chan collect.ParseResult
  options
}

type options struct {
  WorkCount int
  Fetcher   collect.Fetcher
  Logger    *zap.Logger
  Seeds     []*collect.Request
}
  1. 我们需要书写一系列的闭包函数,这些函数的返回值是一个参数为 options 的函数:
type Option func(opts *options)

func WithLogger(logger *zap.Logger) Option {
  return func(opts *options) {
    opts.Logger = logger
  }
}
func WithFetcher(fetcher collect.Fetcher) Option {
  return func(opts *options) {
    opts.Fetcher = fetcher
  }
}

func WithWorkCount(workCount int) Option {
  return func(opts *options) {
    opts.WorkCount = workCount
  }
}

func WithSeeds(seed []*collect.Request) Option {
  return func(opts *options) {
    opts.Seeds = seed
  }
}
  1. 创建一个生成 schedule 的新函数,函数参数为 Option 的可变参数列表。defaultOptions 为默认的 Option,代表默认的参数列表,然后循环遍历可变函数参数列表并执行。
func NewSchedule(opts ...Option) *Schedule {
  options := defaultOptions
  for _, opt := range opts {
    opt(&options)
  }
  s := &Schedule{}
  s.options = options
  return s
}
  1. 在 main 函数中调用 NewSchedule。让我们来看看函数式选项模式的效果:
func main(){
  s := engine.NewSchedule(
      engine.WithFetcher(f),
      engine.WithLogger(logger),
      engine.WithWorkCount(5),
      engine.WithSeeds(seeds),
    )
  s.Run()
}

函数式选项模式的好处

  • API 具有可扩展性,高度可配置化,新增参数不会破坏现有代码;
  • 参数列表非常简洁,并且可以使用默认的参数;
  • option 函数使参数的含义非常清晰,易于开发者理解和使用;
  • 如果将 options 结构中的参数设置为小写,还可以限制这些参数的权限,防止这些参数在 package 外部使用。

通道底层原理

通道的实现并没有想象中复杂。它利用互斥锁实现了并发安全,只不过 Go 运行时为我们屏蔽了底层的细节。通道包括两种类型,一种是无缓冲的通道,另一种是带缓冲区的通道。通道的结构如下:

image

可以看到,通道中包含了数据的类型、大小、数量,堵塞协程队列,以及用于缓存区的诸多字段。

无缓冲区的通道

通道需要有多个协程分别完成读和写的功能,这样才能保证数据传输是顺畅的。对于无缓冲区的通道来说,如果有一个协程正在将数据写入通道,但是当前没有协程读取数据,那么写入协程将立即陷入到休眠状态。写入协程堵塞之前协程会被封装到 sudog 结构中,并存储到写入的堵塞队列 sendq 中,之后协程陷入休眠。

之前我们介绍过,协程的堵塞是位于用户态的,协程切换时,运行时会保存当前协程的状态、并调用 gopark 函数切换到 g0 完成新一轮的调度。如果之后有协程读取数据,那么读取协程会立即读取 sendq 队列中第一个等待的协程,并将该协程对应的元素拷贝到读取协程中,同时调用 goready 唤醒写入协程,将写入协程放入到可运行队列中等待被调度器调度。

image

带缓冲区的通道

而对于带缓冲区的通道来说,假设缓存队列的数量为 N,那么如果写入的数据量不大于 N,写入协程就不会陷入到休眠状态,所有数据都会存储在缓冲队列中。

缓冲队列可以在一定程度上削峰填谷,加快处理速度。但是如果写入速度始终大于读取数据,那么缓冲区迟早也有写满的时候,到时候仍然会陷入堵塞,只是延迟了问题的暴露并带来内存的浪费。因此缓冲区的容量不可以过大,我们可以根据实际情况给出一个经验值。例如上面的爬虫案例中,我们就可以给接收任务的 requestCh 通道加上缓存区,先将缓存区设置为 500,这样就不会频繁堵塞住调度器了

对于有缓存的通道,存储在 buf 中的数据虽然是线性的数组,但是这些数组和序号 recvx、recvq 模拟了一个环形队列。recvx 可以定位到 buf 是从哪个位置读取的通道中的元素,而 sendx 则能够找到写入时放入 buf 的位置,这样做主要是为了再次利用已经使用过的空间。从 recvx 到 sendx 的距离 qcount 就是通道队列中的元素数量。

image

Select 机制的底层原理

原由: 受到通道特性的限制,如果单个通道被堵塞,协程就无法继续执行了。

那有没有一种机制可以像网络中的多路复用一样,监听多个通道,使后续处理协程能够及时地运行?

其实就和网络中把 select 用于 Socket 的多路复用机制一样,Go 中也可以用 select 语句实现这样的多路复用机制。

select 语句中的每一个 case 都对应着一个待处理的读取或写入通道。

举个简单实用的例子,下面的程序如果 800 毫秒之后也接受不到通道 c 中的数据,定时器 time.After 就会接收到数据,从而打印 timeout。

select {
  case <-c:
    fmt.Println("random 01")
  case <-time.After(800 * time.Millisecond):
    fmt.Println("timeout")
  }

Select 底层调用了 selectgo 函数,它的工作可以分为三个部分:

  • 第一部分涉及到遍历。

    • selectgo 首先循环查找当前准备就绪的通道,如果能够找到,则正常进行后续处理。
      在具体的实现方式上,由于 select 内部的 scases 数组存储了所有需要管理的通道,所以很容易想到循环遍历 scases ,最终找到可用的通道。
    • 不过这可能导致一个问题,那就是如果前面的通道始终有数据,后面的通道将永远得不到执行的机会。
      为了解决这一问题,Go 语言为 select 加入了随机性,会利用洗牌算法随机打散数组顺序,保证了所有通道都有执行的机会。
  • 第二部分涉及到协程的休眠。如果 select 找不到准备就绪的通道,这时和单个协程的堵塞一样,它会将当前协程放入到所有通道的等待队列中,并陷入到休眠状态。

  • 第三部分涉及到协程的唤醒。如果有任意一个通道准备就绪,当前的协程将会被唤醒,并到准备就绪的 case 处继续执行。
    要注意的一点是,最后 selectgo 会将 sudog 结构体从其他通道的等待队列中移出,因为当前协程已经能够正常运行,不再需要被其他通道唤醒了。

思考题

在我们的课程中,schedule 函数其实有一个 bug,您能看出来吗?你觉得可以用什么方式找出这样的 Bug?

会丢失发给worker的任务。 case r := <-s.requestCh:的情况下,如果req不是nil,应该把req再添加到reqQueue头部

「此文章为4月Day4学习笔记,内容来源于极客时间《Go分布式爬虫实战》,强烈推荐该课程!/推荐该课程」

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/415945.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OTA A/B 分区升级 update_engine简介

近期猛然发现公司的项目都已经换成了AB升级&#xff0c;AB升级之前一直有所了解&#xff0c;只是一直都没有去仔细查看过其具体升级流程&#xff0c;这两天抽空捋了捋&#xff0c;简单整理下。 AB升级&#xff08;谷歌官网叫法无缝更新&#xff09;是自android7.0开始新增的一…

头歌(Linux之进程管理一):第2关:进程创建操作-fork

任务描述 在上一关我们学习如何获取进程的pid信息&#xff0c;本关我们将介绍如何编程创建一个新的进程。 本关任务&#xff1a;学会使用C语言在Linux系统中使用fork系统调用创建一个新的进程。 相关知识 在Linux系统中创建进程有很多函数可以使用&#xff0c;其中包括了系…

初识Elasticsearch

文章目录介绍一、什么是elasticsearch&#xff1f;二、基本概念三、安装elasticsearch与kibana四、安装kibana&#xff08;跟ES要在同一个网络中&#xff09;五、IK分词器总结介绍 好处&#xff1a;可以帮助从海量数据中查找需要的内容&#xff1b; 一、什么是elasticsearch&…

ETL工具-pentaho企业实战部署

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

C++二叉搜索树与KV模型

二叉搜索树与KV模型二叉搜索树概念与操作性能分析实现KV模型二叉搜索树 本章是为了C的map和set做铺垫 概念与操作 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 若它的左子树不为空&#xff0c;则左子树上所有节点的值都小…

面试题之vue的响应式

文章目录前言一、响应式是什么&#xff1f;二、Object.defineProperty二、简单模拟vue三、深度监听四、监听数组总结前言 为了应对面试而进行的学习记录&#xff0c;可能不够有深度甚至有错误&#xff0c;还请各位谅解&#xff0c;并不吝赐教&#xff0c;共同进步。 一、响应式…

如何做好 IT 项目管理?做好项目管理常用的9大项目管理平台、7大管理方法

一个好的管理&#xff0c;是70%在流程、规范、工具&#xff0c;剩下的30%自由发挥。一个不好的管理&#xff0c;只有地板&#xff0c;每个人都要自己想办法&#xff0c;够到天花板。一个好的工具&#xff0c;就是帮助团队够到天花板的台阶。——刘润 项目管理是一门复杂的艺术&…

统一的文件管理,团队轻松协作

目前IT行业大都采用项目经理制的管理方式&#xff0c;这种管理方式下各个部门间相互独立&#xff0c;同时各部门间也缺乏沟通协作。因此IT行业在文件管理上主要面临以下几个问题&#xff1a; 文档缺乏集中管理&#xff1a;企业在管理过程中产生的大量文件分散在各个部门中&…

Python升级 pip : python -m pip install --upgrade pip,socket.timeout加入超时处理方法

人生苦短&#xff0c;我用python 最近又遇到了一个小的报错问题&#xff0c; 趁现在我还没有忘记&#xff0c; 赶紧来写一写… python 安装包资料报错交流:点击此处跳转文末名片获取 WARNING: You are using pip version 19.3.1; however, version 20.0.2 is available. You…

系统学习Numpy(一)——numpy的安装与基础入门[向量、矩阵]

系列文章目录 numpy的安装与基础入门[向量、矩阵与维度] numpy的安装与基础入门[向量、矩阵与维度]系列文章目录前言numpy安装向量与矩阵生成向量生成矩阵向量类型前言 numpy是科学计算以及机器学习深度学习的基础必备工具&#xff0c;本文将介绍numpy的安装&#xff0c;以及…

C语言课设项目-51单片机-中断系统

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 51单片机的中断系统 一、中断的概念 二、51单片机的中断系统结构 三、中断允许控制 四、中断…

C#,初学琼林(06)——组合数的算法、数据溢出问题的解决方法及相关C#源代码

1 排列permutation 排列&#xff0c;一般地&#xff0c;从n个不同元素中取出m&#xff08;m≤n&#xff09;个元素&#xff0c;按照一定的顺序排成一列&#xff0c;叫做从n个元素中取出m个元素的一个排列(permutation)。特别地&#xff0c;当mn时&#xff0c;这个排列被称作全…

vs code c语言断点调试window版解决方案

序&#xff1a; 1、这一步不懂劝退多少人&#xff0c;博主搜到了多少博文都是mac的&#xff0c;结果发现都对不上&#xff01; 先看最终效果演示 接下去我每个步骤&#xff0c;你都仔细看&#xff0c;漏看一个环境都对不上&#xff01; 正文 1、先去看博主的c/c运行环境配置图…

10-vue3动画

文章目录1.vue的transition动画1.1transition的基本使用1.2transition组件的原理1.3过渡动画的class1.4class的命名规则和添加时机1.5显示的指定过渡时间1.6过渡的模式mode1.7动态组件的切换1.8.appear初次渲染2、animation动画2.1同时设置animation和transition3.结合第三方库…

【Bard】来自谷歌的“吟游诗人”

个人主页&#xff1a;【&#x1f60a;个人主页】 文章目录前言Bard与相关产品的对比Bard VS 弱智吧来自对手的评论ChatGPT文心一言总结&#xff1a;前言 相比较ChatGPT的话题不断&#xff0c;谷歌的“Bard”显然低调了许多&#xff0c;在“画大饼”失败一个多月后&#xff0c…

【Python开发手册】深入剖析Google Python开发规范:规范Python注释写作

&#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是Zeeland&#xff0c;全栈领域优质创作者。&#x1f4dd; CSDN主页&#xff1a;Zeeland&#x1f525;&#x1f4e3; 我的博客&#xff1a;Zeeland&#x1f4da; Github主页: Undertone0809 (Zeeland) (github.com)&…

高通开发系列 - linux kernel内核升级msm-4.9升级至msm-4.19(2)

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 目录 MDSS PLL驱动问题msm-4.19内核适配nand flashMDSS PLL驱动问题 | /home/peeta/sc262R_private_rl/build-msm8909/tmp/work-shared/ms…

第01章_Java语言概述

第01章_Java语言概述 讲师&#xff1a;尚硅谷-宋红康&#xff08;江湖人称&#xff1a;康师傅&#xff09; 官网&#xff1a;http://www.atguigu.com 1. Java知识脉络图 1.1 Java基础全程脉络图 1.2 本章专题与脉络 2. 抽丝剥茧话Java 2.1 当前大学生就业形势 麦可思研究院…

C++之红黑树

文章目录前言一、概念二、性质三、结点的定义四、红黑树的结构五、插入操作1.插入代码2.左单旋3.右单旋4.插入新结点的情况分析与总结第一步、按照搜索二叉树的规则插入新结点第二步、分析插入结点后红黑树的性质是否被破坏动态演示&#xff1a;六、验证红黑树1.检测是否满足二…

口令暴力破解--Telnet协议暴力破解、数据库暴力破解与远程桌面暴力破解

Telnet协议暴力破解 Telnet Telnet协议是TCP/IP协议族中的一员&#xff0c;是Internet远程登陆服务的标准协议和主要方式。它为用户提供了在本地计算机上完成远程主机工作的能力。要开始一个telnet会话&#xff0c;必须输入用户名和密码来登录服务器。而一般服务器不会对用户名…