[分享] praefact源码解析-1

praefact 源码解析

参考文档:

修订记录:

2021/8/23 rzhu创建文档

[TOC]

praefact 架构

  • 架构图

  1. 流量经过haproxy 到达Praefect API后,首先会由Intercepter 进行流量的分类。由Praefect进行处理的流量发送给Praefect Server。否则根据转发规则发送给gitaly 集群。

  2. 目前gitlay版本为14.1.3。本文主要针对14.1.3的版本进行分析。根据官方文档部分的描述,不同版本具有不同的选举策略。

    1. 13.12 之后的选举策略为 per_repository。但是13.12-14.0 与14.1 的策略不同

    Primary elections are run:

    • In GitLab 14.1 and later, lazily. This means that Praefect doesn’t immediately elect a new primary node if the current one is unhealthy. A new primary is elected if it is necessary to serve a request while the current primary is unavailable.
    • In GitLab 13.12 to GitLab 14.0 when:
      • Praefect starts up.
      • The cluster’s consensus of a Gitaly node’s health changes.
    1. 13.12 之前使用sql_elector
  3. praefect 根据请求的范围,将流量分为两种类型,同时根据接口的功能又进行了细分。

    总共将 请求分为了 2个大类5种 小类型。

    • 执行的目标为node

      • RouteStorageAccessor - 对某一个node 的请求

      • RouteStorageMutator - 对primary node 和 secondaries nodes

    • 请求目标为Repository

      • RouteRepositoryAccessor - 从 consistentStorages 中随机选择一个,如果forcePrimary = true 则返回Primary 节点。

      • RouteRepositoryMutator - 返回primary secondaries nodes和 需要同步的节点。

      • RouteRepositoryCreation - 处理创建Repository 的请求。

  4. pgsql中有一个Queue。Pg message queue 基于postgresql 的异步通知 notify 来实现可靠的队列传输。

main 函数

cmd/praefect/main.go

main函数 中包含了组件初始化和服务启动的主要流程。

func run(cfgs []starter.Config, conf config.Config) error {
	...

	var queue datastore.ReplicationEventQueue
	var rs datastore.RepositoryStore
	var csg datastore.ConsistentStoragesGetter
	var metricsCollectors []prometheus.Collector

	if conf.MemoryQueueEnabled {
		
    ...
    
	} else {
    // 使用pgsql 消息队列
		queue = datastore.NewPostgresReplicationEventQueue(db)
    
    // 使用pgsql 管理 Repository 数据
		rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames())

		if conf.DB.ToPQString(true) == "" {
      
			...
      
		} else {
			listenerOpts := datastore.DefaultPostgresListenerOpts
			listenerOpts.Addr = conf.DB.ToPQString(true)
			listenerOpts.Channels = []string{"repositories_updates", "storage_repositories_updates"}
			
      // 使用CachingConsistentStoragesGetter,参数中的rs 会连接数据库更新数据
			storagesCached, err := datastore.NewCachingConsistentStoragesGetter(logger, rs, conf.VirtualStorageNames())
			if err != nil {
				return fmt.Errorf("caching storage provider: %w", err)
			}

			postgresListener, err := datastore.NewPostgresListener(logger, listenerOpts, storagesCached)
			if err != nil {
				return err
			}

			defer func() {
				if err := postgresListener.Close(); err != nil {
					logger.WithError(err).Error("error on closing Postgres notifications listener")
				}
			}()

			metricsCollectors = append(metricsCollectors, storagesCached, postgresListener)
			csg = storagesCached
			logger.Info("reads distribution caching is enabled by configuration")
		}
	}

	var errTracker tracker.ErrorTracker

	...
  
	transactionManager := transactions.NewManager(conf)
	clientHandshaker := backchannel.NewClientHandshaker(logger, praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager)))
	assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames())
	var (
		nodeManager   nodes.Manager
		healthChecker praefect.HealthChecker
		nodeSet       praefect.NodeSet
		router        praefect.Router
		primaryGetter praefect.PrimaryGetter
	)
  // 使用PerRepository 选举策略
	if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
		nodeSet, err = praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker)
		if err != nil {
			return fmt.Errorf("dial nodes: %w", err)
		}
		defer nodeSet.Close()
		
    // node 健康监测
		hm := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients())
		go func() {
			if err := hm.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil {
				logger.WithError(err).Error("health manager exited")
			}
		}()
		healthChecker = hm

    // 选举器
		elector := nodes.NewPerRepositoryElector(db)
		primaryGetter = elector
    // repo 和 Store 的对应关系
		assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames())

    // 初始化router
		router = praefect.NewPerRepositoryRouter(
			nodeSet.Connections(),
			elector,
			hm,
			praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
			csg,
			assignmentStore,
			conf.DefaultReplicationFactors(),
		)
	} else {
		if conf.Failover.Enabled {
			...
	}

	logger.Infof("election strategy: %q", conf.Failover.ElectionStrategy)
	logger.Info("background started: gitaly nodes health monitoring")

	var (
		// top level server dependencies
    // 定义了三个顶层变量
		coordinator = praefect.NewCoordinator(
			queue,
			rs,
			router,
			transactionManager,
			conf,
			protoregistry.GitalyProtoPreregistered,
		)

		repl = praefect.NewReplMgr(
			logger,
			conf.VirtualStorageNames(),
			queue,
			rs,
			healthChecker,
			nodeSet,
			praefect.WithDelayMetric(delayMetric),
			praefect.WithLatencyMetric(latencyMetric),
			praefect.WithDequeueBatchSize(conf.Replication.BatchSize),
		)
		srvFactory = praefect.NewServerFactory(
			conf,
			logger,
			coordinator.StreamDirector,
			nodeManager,
			transactionManager,
			queue,
			rs,
			assignmentStore,
			protoregistry.GitalyProtoPreregistered,
			nodeSet.Connections(),
			primaryGetter,
		)
	)
	
    ...

	b, err := bootstrap.New()
	if err != nil {
		return fmt.Errorf("unable to create a bootstrap: %v", err)
	}

	b.StopAction = srvFactory.GracefulStop
	for _, cfg := range cfgs {
    
    // 创建服务的主要入口
		srv, err := srvFactory.Create(cfg.IsSecure())
		if err != nil {
			return fmt.Errorf("create gRPC server: %w", err)
		}
		b.RegisterStarter(starter.New(cfg, srv))
	}

	if conf.PrometheusListenAddr != "" {
		logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener")

		b.RegisterStarter(func(listen bootstrap.ListenFunc, _ chan<- error) error {
			l, err := listen(starter.TCP, conf.PrometheusListenAddr)
			if err != nil {
				return err
			}

			go func() {
				if err := monitoring.Start(
					monitoring.WithListener(l),
					monitoring.WithBuildInformation(praefect.GetVersion(), praefect.GetBuildTime())); err != nil {
					logger.WithError(err).Errorf("Unable to start prometheus listener: %v", conf.PrometheusListenAddr)
				}
			}()

			return nil
		})
	}
    
	// 不存在 nodeManager
	if db != nil && nodeManager != nil {
		...
	}

	if err := b.Start(); err != nil {
		return fmt.Errorf("unable to start the bootstrap: %v", err)
	}

	go repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second))
	logger.Info("background started: processing of the replication events")
	repl.ProcessStale(ctx, 30*time.Second, time.Minute)
	logger.Info("background started: processing of the stale replication events")

	...

	return b.Wait(conf.GracefulStopTimeout.Duration())
}