Collector.go

package service

import (
	"fmt"
	"log"
	"os"
	"os/exec"
	"time"

	"saseul/config"
	"saseul/data/env"
	"saseul/data/mainchain"
	"saseul/data/tracker"
	"saseul/datasource/poolclient"
	"saseul/staff/observer"
	"saseul/staff/processmanager"
)

// Collector ๊ตฌ์กฐ์ฒด๋Š” ์ˆ˜์ง‘ ์„œ๋น„์Šค์˜ ๊ธฐ๋ณธ ๊ตฌ์กฐ์ฒด๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.
type Collector struct {
	Description string
	Iterate     int
}

// NewCollector ํ•จ์ˆ˜๋Š” Collector์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค.
func NewCollector() *Collector {
	return &Collector{
		Description: "Collector service",
		Iterate:     100000,
	}
}

// Main ํ•จ์ˆ˜๋Š” Collector์˜ ๋ฉ”์ธ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
func (c *Collector) Main() {
	// ํ™˜๊ฒฝ์ด "process"์ธ ๊ฒฝ์šฐ, ์ปฌ๋ ‰ํ„ฐ ํ”„๋กœ์„ธ์Šค๊ฐ€ ์ด๋ฏธ ์‹คํ–‰ ์ค‘์ธ์ง€ ํ™•์ธํ•˜๊ณ  ์‹คํ–‰ ์ค‘์ด๋ฉด ์ข…๋ฃŒํ•ฉ๋‹ˆ๋‹ค.
	if config.Environment == "process" {
		if processmanager.IsRunning(processmanager.COLLECTOR) {
			log.Println("The collector process is already running.")
			os.Exit(1)
		}
		processmanager.Save(processmanager.COLLECTOR)
	}

	// Pool ํด๋ผ์ด์–ธํŠธ๋ฅผ "rewind" ๋ชจ๋“œ๋กœ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.
	poolclient.Instance().SetMode("rewind")

	// ๋ฉ”์ธ ์ž‘์—…์ด ์™„๋ฃŒ๋˜๋ฉด ํ”„๋กœ์„ธ์Šค๋ฅผ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.
	defer func() {
		if processmanager.PID(processmanager.COLLECTOR) == os.Getpid() {
			log.Println("Collector process has been successfully removed.")
			processmanager.Delete(processmanager.COLLECTOR)
		}
	}()

	// ์ดˆ๊ธฐํ™” ๋ฐ ์ˆ˜์ง‘ ์ž‘์—…์„ ๋ฐ˜๋ณต์ ์œผ๋กœ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
	c.Init()
	for i := 0; i < c.Iterate; i++ {
		c.Collect()
		time.Sleep(300 * time.Millisecond) // ํ•„์š”ํ•œ ๊ฒฝ์šฐ ์กฐ์ •
	}
}

// Init ํ•จ์ˆ˜๋Š” Collector๋ฅผ ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค.
func (c *Collector) Init() {
	log.Println("The collector process has started.")
}

// Collect ํ•จ์ˆ˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•ฉ๋‹ˆ๋‹ค.
func (c *Collector) Collect() bool {
	// ํ”ผ์–ด ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์™€ ํ˜ธ์ŠคํŠธ ๋งต์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
	peers := tracker.GetPeers()
	hosts := tracker.HostMap(env.Peer().Address(), peers)

	// ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ๋ฅผ ๊ด€์ฐฐํ•ฉ๋‹ˆ๋‹ค.
	observer.Instance().SeeBroadcasts(hosts, mainchain.Instance().LastBlock())

	return true
}

// main ํ•จ์ˆ˜๋Š” Collector์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๋ฉ”์ธ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
func main() {
	c := NewCollector()
	c.Main()
}

๊ตฌ์„ฑ ์š”์†Œ์™€ ํ•จ์ˆ˜ ์„ค๋ช…

Collector ๊ตฌ์กฐ์ฒด

  • Description: ์„œ๋น„์Šค์˜ ์„ค๋ช…์„ ๋‹ด๊ณ  ์žˆ๋Š” ๋ฌธ์ž์—ด.

  • Iterate: ๋ฐ˜๋ณต ํšŸ์ˆ˜๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ์ •์ˆ˜. ์—ฌ๊ธฐ์„œ๋Š” 100,000๋ฒˆ ๋ฐ˜๋ณตํ•˜๋„๋ก ์„ค์ •๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

NewCollector ํ•จ์ˆ˜

์ด ํ•จ์ˆ˜๋Š” ์ƒˆ๋กœ์šด Collector ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค.

Main ํ•จ์ˆ˜

Collector์˜ ์ฃผ์š” ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ํ•จ์ˆ˜์ž…๋‹ˆ๋‹ค.

  • ํ™˜๊ฒฝ ๋ณ€์ˆ˜๊ฐ€ "process"๋กœ ์„ค์ •๋˜์–ด ์žˆ์œผ๋ฉด, ์ด๋ฏธ ์‹คํ–‰ ์ค‘์ธ Collector ํ”„๋กœ์„ธ์Šค๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•˜๊ณ , ์žˆ์œผ๋ฉด ์ข…๋ฃŒํ•ฉ๋‹ˆ๋‹ค.

  • ํ”„๋กœ์„ธ์Šค๊ฐ€ ์‹คํ–‰ ์ค‘์ด ์•„๋‹ˆ๋ผ๋ฉด, Collector ํ”„๋กœ์„ธ์Šค ์ •๋ณด๋ฅผ ์ €์žฅํ•˜๊ณ , rewind ๋ชจ๋“œ๋กœ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

  • defer๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ž‘์—…์ด ์™„๋ฃŒ๋˜๋ฉด ํ”„๋กœ์„ธ์Šค๋ฅผ ์‚ญ์ œํ•˜๋„๋ก ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

  • ์ดˆ๊ธฐํ™” ๋ฐ ์ˆ˜์ง‘ ์ž‘์—…์„ ๋ฐ˜๋ณต์ ์œผ๋กœ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

Init ํ•จ์ˆ˜

Collector ์„œ๋น„์Šค๋ฅผ ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค.

Collect ํ•จ์ˆ˜

๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๋Š” ํ•จ์ˆ˜์ž…๋‹ˆ๋‹ค.

  • ํ”ผ์–ด ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์™€ ํ˜ธ์ŠคํŠธ ๋งต์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

  • ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ๋ฅผ ๊ด€์ฐฐํ•ฉ๋‹ˆ๋‹ค.

main ํ•จ์ˆ˜

Collector์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ  Main ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

์š”์•ฝ

์ด ์ฝ”๋“œ๋Š” ๋ธ”๋ก์ฒด์ธ ๋„คํŠธ์›Œํฌ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๋Š” Collector ์„œ๋น„์Šค๋ฅผ ์ •์˜ํ•˜๊ณ  ์‹คํ–‰ํ•˜๋Š” ํ”„๋กœ๊ทธ๋žจ์ž…๋‹ˆ๋‹ค. Collector๋Š” ํ”ผ์–ด ์ •๋ณด๋ฅผ ์ˆ˜์ง‘ํ•˜๊ณ , ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ๋ฅผ ๊ด€์ฐฐํ•˜์—ฌ ๋„คํŠธ์›Œํฌ์˜ ์ƒํƒœ๋ฅผ ๋ชจ๋‹ˆํ„ฐ๋งํ•ฉ๋‹ˆ๋‹ค. ๊ฐ ํ•จ์ˆ˜๋Š” ์ดˆ๊ธฐํ™”, ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ ๋ฐ ํ”„๋กœ์„ธ์Šค ๊ด€๋ฆฌ๋ฅผ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

๊ฐ ํ•จ์ˆ˜์˜ ์—ญํ• 

  1. NewCollector:

    • Collector ๊ตฌ์กฐ์ฒด์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค.

    • Description๊ณผ Iterate ์†์„ฑ์„ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

  2. Main:

    • ํ™˜๊ฒฝ์ด process๋กœ ์„ค์ •๋œ ๊ฒฝ์šฐ, ๊ธฐ์กด ์ˆ˜์ง‘๊ธฐ ํ”„๋กœ์„ธ์Šค๊ฐ€ ์‹คํ–‰ ์ค‘์ธ์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค. ์‹คํ–‰ ์ค‘์ด๋ผ๋ฉด ์ข…๋ฃŒํ•ฉ๋‹ˆ๋‹ค.

    • poolclient์˜ ๋ชจ๋“œ๋ฅผ rewind๋กœ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.

    • ํ”„๋กœ์„ธ์Šค๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ ํ˜„์žฌ ํ”„๋กœ์„ธ์Šค๊ฐ€ ์ˆ˜์ง‘๊ธฐ ํ”„๋กœ์„ธ์Šค์ธ์ง€ ํ™•์ธํ•˜๊ณ , ๋งž๋‹ค๋ฉด ํ”„๋กœ์„ธ์Šค๋ฅผ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.

    • Init ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ดˆ๊ธฐํ™”ํ•˜๊ณ , Iterate ํšŸ์ˆ˜๋งŒํผ Collect ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•ฉ๋‹ˆ๋‹ค. ๊ฐ ๋ฐ˜๋ณต ํ›„ 300๋ฐ€๋ฆฌ์ดˆ ๋™์•ˆ ๋Œ€๊ธฐํ•ฉ๋‹ˆ๋‹ค.

  3. Init:

    • ์ˆ˜์ง‘๊ธฐ ํ”„๋กœ์„ธ์Šค์˜ ์‹œ์ž‘์„ ๋กœ๊ทธ์— ๊ธฐ๋กํ•ฉ๋‹ˆ๋‹ค.

  4. Collect:

    • ํŠธ๋ž˜์ปค์—์„œ ํ”ผ์–ด ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ , ํ˜„์žฌ ๋…ธ๋“œ์˜ ์ฃผ์†Œ์™€ ํ•จ๊ป˜ ํ˜ธ์ŠคํŠธ ๋งต์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

    • ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๋ฉ”์‹œ์ง€๋ฅผ ๊ด€์ฐฐํ•ฉ๋‹ˆ๋‹ค.

  5. main:

    • NewCollector ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ Collector ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•˜๊ณ , Main ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ˆ˜์ง‘ ์ž‘์—…์„ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.

์ด ์ฝ”๋“œ๋Š” ๋ธ”๋ก์ฒด์ธ ๋„คํŠธ์›Œํฌ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ณ , ์ด๋ฅผ ๋ฐ˜๋ณต์ ์œผ๋กœ ๊ด€์ฐฐํ•˜์—ฌ ๋„คํŠธ์›Œํฌ ์ƒํƒœ๋ฅผ ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๋Š” ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค.

Last updated