Master.go
package service
import (
"fmt"
"log"
"os"
"time"
"saseul/config"
"saseul/data/chain"
"saseul/data/mainchain"
"saseul/data/resourcechain"
"saseul/data/tracker"
"saseul/datasource/database"
"saseul/datasource/poolclient"
"saseul/staff/processmanager"
"saseul/util/logger"
"saseul/util/timer"
"github.com/gorilla/websocket"
)
// Master ꡬ쑰체λ λ§μ€ν° μλΉμ€λ₯Ό μ μν©λλ€.
type Master struct {
Iterate int
Socket *websocket.Conn
}
// NewMaster ν¨μλ Master μΈμ€ν΄μ€λ₯Ό μμ±νκ³ μ΄κΈ°νν©λλ€.
func NewMaster() *Master {
if processmanager.IsRunning(processmanager.MASTER) {
logger.Log("Master process is already running.")
stop()
}
processmanager.Save(processmanager.MASTER)
tracker.Init()
return &Master{
Iterate: 1000,
Socket: createSocket(),
}
}
// Init ν¨μλ Master μλΉμ€λ₯Ό μ΄κΈ°ννκ³ νμν μμ
μ μ€μ ν©λλ€.
func (m *Master) Init() {
t := timer.NewTimer()
logger.Log("Master Process Initializing...")
load()
logger.Log(fmt.Sprintf("Master Process: Status Initializing Time: %.2fs", t.Check().Seconds()))
go m.listen()
checkProcess()
checkDB()
logRotation()
}
// createSocket ν¨μλ μΉμμΌ μ°κ²°μ μμ±ν©λλ€.
func createSocket() *websocket.Conn {
u := fmt.Sprintf("ws://%s:%d", config.MasterAddr, config.MasterPort)
conn, _, err := websocket.DefaultDialer.Dial(u, nil)
if err != nil {
log.Fatal("dial:", err)
}
return conn
}
// listen ν¨μλ μμΌ λ©μμ§λ₯Ό μμ νκ³ μ²λ¦¬ν©λλ€.
func (m *Master) listen() {
for {
_, message, err := m.Socket.ReadMessage()
if err != nil {
log.Println("read:", err)
return
}
handleMessage(message)
}
}
// handleMessage ν¨μλ μμ ν λ©μμ§λ₯Ό μ²λ¦¬ν©λλ€.
func handleMessage(message []byte) {
// λ©μμ§ μ²λ¦¬ λ‘μ§ μΆκ°
}
// Main ν¨μλ μ£Όμ μμ
μ μνν©λλ€.
func (m *Master) Main() {
for i := 0; i < m.Iterate; i++ {
time.Sleep(1 * time.Second) // νμν κ²½μ° μ‘°μ
}
}
// load ν¨μλ μ΄κΈ° λ‘λ μμ
μ μνν©λλ€.
func load() {
processmanager.Spawn("SERVICE_BIN", "DataPool")
for !poolclient.Instance().IsRunning() {
time.Sleep(1 * time.Second)
}
// μ μ±
μ€μ
poolclient.Instance().SetPolicy("chain_maker", config.Consensus)
poolclient.Instance().SetPolicy("resource_miner", config.Consensus)
poolclient.Instance().SetPolicy("collector", config.Collect)
poolclient.Instance().SetPolicy("peer_searcher", config.Collect)
poolclient.Instance().SetPolicy("main_chain_waiting", false)
poolclient.Instance().SetPolicy("resource_chain_waiting", false)
}
// stop ν¨μλ μλΉμ€λ₯Ό μ€μ§ν©λλ€.
func stop() {
processmanager.Kill(processmanager.RESOURCE_MINER)
processmanager.Kill(processmanager.CHAIN_MAKER)
processmanager.Kill(processmanager.COLLECTOR)
processmanager.Kill(processmanager.PEER_SEARCHER)
processmanager.Kill(processmanager.DATA_POOL)
logger.Log("The master process has stopped.")
os.Exit(0)
}
// reload ν¨μλ μλΉμ€λ₯Ό μ¬μμν©λλ€.
func reload() bool {
mining := poolclient.Instance().GetPolicy("mining").(bool)
fixedHeight := chain.FixedHeight()
reloadPoint := resourcechain.Instance().Block(fixedHeight)
logger.Log(fmt.Sprintf("Master: Reload - reload_point: %d", fixedHeight))
poolclient.Instance().SetPolicy("chain_maker", false)
poolclient.Instance().SetPolicy("resource_miner", false)
poolclient.Instance().SetPolicy("collector", false)
poolclient.Instance().SetPolicy("peer_searcher", false)
processmanager.Kill(processmanager.RESOURCE_MINER)
processmanager.Kill(processmanager.CHAIN_MAKER)
processmanager.Kill(processmanager.COLLECTOR)
processmanager.Kill(processmanager.PEER_SEARCHER)
processmanager.Kill(processmanager.DATA_POOL)
for processmanager.IsRunning(processmanager.RESOURCE_MINER) ||
processmanager.IsRunning(processmanager.CHAIN_MAKER) ||
processmanager.IsRunning(processmanager.COLLECTOR) ||
processmanager.IsRunning(processmanager.PEER_SEARCHER) ||
processmanager.IsRunning(processmanager.DATA_POOL) {
logger.Log("Master: Reloading processes...")
time.Sleep(1 * time.Second)
}
resourcechain.Instance().Remove(reloadPoint.Height + 1)
mainchain.Instance().Remove(reloadPoint.MainHeight + 1)
load()
poolclient.Instance().SetPolicy("mining", mining)
return true
}
// checkProcess ν¨μλ νλ‘μΈμ€λ₯Ό νμΈνκ³ μ μ±
μ λ°λΌ μμ λλ μ’
λ£ν©λλ€.
func checkProcess() {
policy := poolclient.Instance().GetPolicy()
if rmPolicy, ok := policy["resource_miner"].(bool); ok {
rmRunning := processmanager.IsRunning(processmanager.RESOURCE_MINER)
if rmPolicy && !rmRunning {
processmanager.Spawn("SERVICE_BIN", "ResourceMiner")
} else if !rmPolicy && rmRunning {
logger.Log("Resource miner process end.")
processmanager.Kill(processmanager.RESOURCE_MINER)
}
}
if cmPolicy, ok := policy["chain_maker"].(bool); ok {
cmRunning := processmanager.IsRunning(processmanager.CHAIN_MAKER)
if cmPolicy && !cmRunning {
processmanager.Spawn("SERVICE_BIN", "ChainMaker")
} else if !cmPolicy && cmRunning {
logger.Log("Chain maker process end.")
processmanager.Kill(processmanager.CHAIN_MAKER)
}
}
if colPolicy, ok := policy["collector"].(bool); ok {
colRunning := processmanager.IsRunning(processmanager.COLLECTOR)
if colPolicy && !colRunning {
processmanager.Spawn("SERVICE_BIN", "Collector")
} else if !colPolicy && colRunning {
logger.Log("Collector process end.")
processmanager.Kill(processmanager.COLLECTOR)
}
}
if psPolicy, ok := policy["peer_searcher"].(bool); ok {
psRunning := processmanager.IsRunning(processmanager.PEER_SEARCHER)
if psPolicy && !psRunning {
processmanager.Spawn("SERVICE_BIN", "PeerSearcher")
} else if !psPolicy && psRunning {
logger.Log("Peer searcher process end.")
processmanager.Kill(processmanager.PEER_SEARCHER)
}
}
}
// checkDB ν¨μλ λ°μ΄ν°λ² μ΄μ€ μνλ₯Ό νμΈν©λλ€.
func checkDB() {
if config.DatabaseEnabled {
if !database.Instance().IsConnected() {
stop()
}
}
}
// logRotation ν¨μλ λ‘κ·Έ λ‘ν
μ΄μ
μ μνν©λλ€.
func logRotation() {
logger.Backup()
logger.CleanOldLogs()
}
// main ν¨μλ Master μΈμ€ν΄μ€λ₯Ό μμ±νκ³ μ΄κΈ°ν ν λ©μΈ μμ
μ μνν©λλ€.
func main() {
master := NewMaster()
master.Init()
master.Main()
}
μ½λ μ€λͺ
ν¨μ μ€λͺ
:
Last updated