Collector.go
package service
import (
"fmt"
"log"
"os"
"os/exec"
"time"
"saseul/config"
"saseul/data/env"
"saseul/data/mainchain"
"saseul/data/tracker"
"saseul/datasource/poolclient"
"saseul/staff/observer"
"saseul/staff/processmanager"
)
// Collector ๊ตฌ์กฐ์ฒด๋ ์์ง ์๋น์ค์ ๊ธฐ๋ณธ ๊ตฌ์กฐ์ฒด๋ฅผ ์ ์ํฉ๋๋ค.
type Collector struct {
Description string
Iterate int
}
// NewCollector ํจ์๋ Collector์ ์ธ์คํด์ค๋ฅผ ์์ฑํ๊ณ ์ด๊ธฐํํฉ๋๋ค.
func NewCollector() *Collector {
return &Collector{
Description: "Collector service",
Iterate: 100000,
}
}
// Main ํจ์๋ Collector์ ๋ฉ์ธ ์์
์ ์ํํฉ๋๋ค.
func (c *Collector) Main() {
// ํ๊ฒฝ์ด "process"์ธ ๊ฒฝ์ฐ, ์ปฌ๋ ํฐ ํ๋ก์ธ์ค๊ฐ ์ด๋ฏธ ์คํ ์ค์ธ์ง ํ์ธํ๊ณ ์คํ ์ค์ด๋ฉด ์ข
๋ฃํฉ๋๋ค.
if config.Environment == "process" {
if processmanager.IsRunning(processmanager.COLLECTOR) {
log.Println("The collector process is already running.")
os.Exit(1)
}
processmanager.Save(processmanager.COLLECTOR)
}
// Pool ํด๋ผ์ด์ธํธ๋ฅผ "rewind" ๋ชจ๋๋ก ์ค์ ํฉ๋๋ค.
poolclient.Instance().SetMode("rewind")
// ๋ฉ์ธ ์์
์ด ์๋ฃ๋๋ฉด ํ๋ก์ธ์ค๋ฅผ ์ญ์ ํฉ๋๋ค.
defer func() {
if processmanager.PID(processmanager.COLLECTOR) == os.Getpid() {
log.Println("Collector process has been successfully removed.")
processmanager.Delete(processmanager.COLLECTOR)
}
}()
// ์ด๊ธฐํ ๋ฐ ์์ง ์์
์ ๋ฐ๋ณต์ ์ผ๋ก ์ํํฉ๋๋ค.
c.Init()
for i := 0; i < c.Iterate; i++ {
c.Collect()
time.Sleep(300 * time.Millisecond) // ํ์ํ ๊ฒฝ์ฐ ์กฐ์
}
}
// Init ํจ์๋ Collector๋ฅผ ์ด๊ธฐํํฉ๋๋ค.
func (c *Collector) Init() {
log.Println("The collector process has started.")
}
// Collect ํจ์๋ ๋ฐ์ดํฐ๋ฅผ ์์งํฉ๋๋ค.
func (c *Collector) Collect() bool {
// ํผ์ด ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ ํธ์คํธ ๋งต์ ์์ฑํฉ๋๋ค.
peers := tracker.GetPeers()
hosts := tracker.HostMap(env.Peer().Address(), peers)
// ๋ธ๋ก๋์บ์คํธ๋ฅผ ๊ด์ฐฐํฉ๋๋ค.
observer.Instance().SeeBroadcasts(hosts, mainchain.Instance().LastBlock())
return true
}
// main ํจ์๋ Collector์ ์ธ์คํด์ค๋ฅผ ์์ฑํ๊ณ ๋ฉ์ธ ์์
์ ์ํํฉ๋๋ค.
func main() {
c := NewCollector()
c.Main()
}๊ตฌ์ฑ ์์์ ํจ์ ์ค๋ช
์์ฝ
๊ฐ ํจ์์ ์ญํ
Last updated