PeerSearcher.go

package service

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"saseul/config"
	"saseul/data/chain"
	"saseul/data/env"
	"saseul/data/resourcechain"
	"saseul/data/tracker"
	"saseul/datasource/poolclient"
	"saseul/staff/processmanager"
	"saseul/util/clock"
	"saseul/util/filter"
	"saseul/util/hasher"
	"saseul/util/parser"
	"saseul/util/restcall"
	"saseul/util/signer"
)

// PeerSearcher κ΅¬μ‘°μ²΄λŠ” ν”Όμ–΄ 탐색 μ„œλΉ„μŠ€λ₯Ό μ •μ˜ν•©λ‹ˆλ‹€.
type PeerSearcher struct {
	Description string
	Iterate     int
	CheckCount  int
}

// NewPeerSearcher ν•¨μˆ˜λŠ” PeerSearcher μΈμŠ€ν„΄μŠ€λ₯Ό μƒμ„±ν•˜κ³  μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.
func NewPeerSearcher() *PeerSearcher {
	return &PeerSearcher{
		Description: "Peer searcher service",
		Iterate:     1000000,
		CheckCount:  32,
	}
}

// Main ν•¨μˆ˜λŠ” PeerSearcher의 메인 μž‘μ—…μ„ μˆ˜ν–‰ν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) Main() {
	if config.Environment == "process" {
		if processmanager.IsRunning(processmanager.PEER_SEARCHER) {
			log.Println("Peer searcher process is already running.")
			os.Exit(1)
		}
		processmanager.Save(processmanager.PEER_SEARCHER)
	}

	poolclient.Instance().SetMode("rewind")
	restcall.Instance().SetTimeout(config.ROUND_TIMEOUT)

	defer func() {
		if processmanager.PID(processmanager.PEER_SEARCHER) == os.Getpid() {
			log.Println("Peer searcher process has been successfully removed.")
			processmanager.Delete(processmanager.PEER_SEARCHER)
		}
	}()

	ps.Init()
	for i := 0; i < ps.Iterate; i++ {
		time.Sleep(1 * time.Second) // ν•„μš”ν•œ 경우 μ‘°μ •
	}
}

// Init ν•¨μˆ˜λŠ” PeerSearcherλ₯Ό μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) Init() {
	ps.addRoutine(ps.checkRequestedPeers, 5*time.Second)
	ps.addRoutine(ps.checkPeers, 120*time.Second)
	ps.addRoutine(ps.checkKnownHosts, 60*time.Second)
	ps.addRoutine(ps.makeStatusBundle, 60*time.Second)

	ps.checkRequestedPeers()
	ps.checkPeers()
	ps.checkKnownHosts()
	ps.makeStatusBundle()

	log.Println("Peer searcher process has started.")
}

// addRoutine ν•¨μˆ˜λŠ” 주기적인 μž‘μ—…μ„ μˆ˜ν–‰ν•˜λŠ” 고루틴을 μƒμ„±ν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) addRoutine(fn func(), interval time.Duration) {
	go func() {
		for range time.Tick(interval) {
			fn()
		}
	}()
}

// checkRequestedPeers ν•¨μˆ˜λŠ” μš”μ²­λœ ν”Όμ–΄λ₯Ό ν™•μΈν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) checkRequestedPeers() {
	requests := poolclient.Instance().DrainPeerRequests()
	peerHosts := tracker.GetPeerHosts()
	knownHosts := tracker.GetKnownHosts()

	bucket := []string{}

	for _, item := range requests {
		if !contains(peerHosts, item) && filter.IsPublicHost(item) {
			bucket = append(bucket, parser.Endpoint(item))
		}
	}

	knownHosts = uniqueStrings(append(knownHosts, bucket...))
	tracker.SetKnownHosts(knownHosts)
}

// checkPeers ν•¨μˆ˜λŠ” ν”Όμ–΄λ₯Ό ν™•μΈν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) checkPeers() {
	log.Println("Check peers...")

	hosts := tracker.GetPeerHosts()
	peers := []map[string]interface{}{}
	knownHosts := []string{}

	for len(hosts) > 0 {
		part := take(&hosts, ps.CheckCount)
		trackers := ps.seeTrackers(part, true)
		peers = append(peers, trackers["peers"].([]map[string]interface{})...)
		knownHosts = uniqueStrings(append(knownHosts, trackers["known_hosts"].([]string)...))
	}

	peerHosts := getHosts(peers)
	bucket := []string{}

	for _, knownHost := range knownHosts {
		if !contains(peerHosts, knownHost) && filter.IsPublicHost(knownHost) {
			bucket = append(bucket, parser.Endpoint(knownHost))
		}
	}

	knownHosts = uniqueStrings(append(bucket, tracker.GetKnownHosts()...))
	tracker.SetPeers(peers)
	tracker.SetKnownHosts(knownHosts)
}

// checkKnownHosts ν•¨μˆ˜λŠ” μ•Œλ €μ§„ 호슀트λ₯Ό ν™•μΈν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) checkKnownHosts() {
	log.Println("Check requested hosts...")

	hosts := tracker.GetKnownHosts()
	peers := []map[string]interface{}{}
	knownHosts := []string{}

	for len(hosts) > 0 {
		part := take(&hosts, ps.CheckCount)
		trackers := ps.seeTrackers(part, false)
		peers = append(peers, trackers["peers"].([]map[string]interface{})...)
		knownHosts = uniqueStrings(append(knownHosts, trackers["known_hosts"].([]string)...))
	}

	newPeers := tracker.GetPeers()
	peerHosts := getHosts(newPeers)

	for _, peer := range peers {
		host := peer["host"].(string)
		address := peer["address"].(string)
		execTime := peer["exec_time"].(int)

		if !contains(peerHosts, host) && filter.IsPublicHost(host) {
			newPeers = append(newPeers, map[string]interface{}{
				"host":      parser.Endpoint(host),
				"address":   address,
				"exec_time": execTime,
			})
		}
	}

	tracker.SetPeers(newPeers)

	peerHosts = tracker.GetPeerHosts()
	bucket := []string{}

	for _, knownHost := range knownHosts {
		if !contains(peerHosts, knownHost) && filter.IsPublicHost(knownHost) {
			bucket = append(bucket, parser.Endpoint(knownHost))
		}
	}

	tracker.SetKnownHosts(bucket)
}

// makeStatusBundle ν•¨μˆ˜λŠ” μƒνƒœ λ²ˆλ“€μ„ μƒμ„±ν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) makeStatusBundle() {
	log.Println("Bundling...")
	chain.Bundling()
}

// seeTrackers ν•¨μˆ˜λŠ” 트래컀λ₯Ό ν™•μΈν•˜κ³  ν”Όμ–΄ 및 μ•Œλ €μ§„ 호슀트 정보λ₯Ό λ°˜ν™˜ν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) seeTrackers(hosts []string, register bool) map[string]interface{} {
	if len(hosts) == 0 {
		return nil
	}

	endpoint := env.Endpoint()
	now := clock.UTime()
	height := max(chain.FixedHeight()-config.RESOURCE_CONFIRM_COUNT, 0)
	items := ps.searchRequestItems(hosts, endpoint, register, height, now)
	items2 := ps.searchVersionItems(hosts, endpoint)

	var phrase string
	if height > 0 {
		phrase = resourcechain.Instance().Block(height).Blockhash
	} else {
		phrase = config.NetworkKey()
	}

	rs := restcall.Instance().MultiPOST(items)
	rs2 := restcall.Instance().MultiPOST(items2)

	peers := []map[string]interface{}{}
	knownHosts := []string{}
	latests := []string{}

	for _, item := range rs2 {
		host := item["host"].(string)
		result := parseJSON(item["result"].(string))
		data := result["data"].(map[string]interface{})
		version := data["version"].(string)

		if version >= "2.1.9.0" {
			latests = append(latests, host)
		}
	}

	for _, item := range rs {
		host := item["host"].(string)
		execTime := item["exec_time"].(int)
		result := parseJSON(item["result"].(string))

		if result == nil {
			continue
		}

		data := result["data"].(map[string]interface{})
		nodeData := data["node"].(map[string]interface{})
		peerAddress := ps.peerAddress(nodeData, phrase, now)

		if peerAddress == "" || !contains(latests, host) {
			continue
		}

		peerData := data["peers"].([]map[string]interface{})
		peerHosts := getHosts(peerData)

		peers = append(peers, map[string]interface{}{
			"host":      host,
			"address":   peerAddress,
			"exec_time": execTime,
		})
		knownHosts = uniqueStrings(append(knownHosts, peerHosts...))
	}

	return map[string]interface{}{
		"peers":       peers,
		"known_hosts": knownHosts,
	}
}

// searchRequestItems ν•¨μˆ˜λŠ” μš”μ²­ μ•„μ΄ν…œμ„ μƒμ„±ν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) searchRequestItems(hosts []string, endpoint string, register bool, height int, utime int) []map[string]interface{} {
	items := []map[string]interface{}{}
	for _, host := range hosts {
		item := map[string]interface{}{
			"url": fmt.Sprintf("%s/peer", host),
			"data": map[string]interface{}{
				"register":       register,
				"authentication": true,
				"height":         height,
				"t":              utime,
			},
		}
		if endpoint != "" {
			item["data"].(map[string]interface{})["host"] = endpoint
		}
		items = append(items, item)
	}
	return items
}

// searchVersionItems ν•¨μˆ˜λŠ” 버전 μ•„μ΄ν…œμ„ μƒμ„±ν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) searchVersionItems(hosts []string, endpoint string) []map[string]interface{} {
	items := []map[string]interface{}{}
	for _, host := range hosts {
		item := map[string]interface{}{
			"url":  fmt.Sprintf("%s/info", host),
				"data": map[string]interface{}{},
			}
		if endpoint != "" {
			item["data"].(map[string]interface{})["host"] = endpoint
		}
		items = append(items, item)
	}
	return items
}

// peerAddress ν•¨μˆ˜λŠ” ν”Όμ–΄ μ£Όμ†Œλ₯Ό λ°˜ν™˜ν•©λ‹ˆλ‹€.
func (ps *PeerSearcher) peerAddress(nodeData map[string]interface{}, phrase string, now int) string {
	timestamp := int(nodeData["timestamp"].(float64))
	genesisAddress := nodeData["genesis_address"].(string)
	publicKey := nodeData["public_key"].(string)
	signature := nodeData["signature"].(string)
	stringToHash := hasher.Hash(hasher.HexTime(timestamp) + phrase)

	timeValidity := abs(now-timestamp) < config.TIMESTAMP_ERROR_LIMIT
	signatureValidity := signer.SignatureValidity(stringToHash, publicKey, signature)
	genesisValidity := genesisAddress == config.GenesisAddress

	if timeValidity && signatureValidity && genesisValidity {
		return signer.Address(publicKey)
	}

	return ""
}

// take ν•¨μˆ˜λŠ” μŠ¬λΌμ΄μŠ€μ—μ„œ μ§€μ •λœ 수의 μš”μ†Œλ₯Ό κ°€μ Έμ˜΅λ‹ˆλ‹€.
func take(hosts *[]string, count int) []string {
	n := min(count, len(*hosts))
	part := (*hosts)[:n]
	*hosts = (*hosts)[n:]
	return part
}

// contains ν•¨μˆ˜λŠ” μŠ¬λΌμ΄μŠ€μ— μ§€μ •λœ ν•­λͺ©μ΄ ν¬ν•¨λ˜μ–΄ μžˆλŠ”μ§€ ν™•μΈν•©λ‹ˆλ‹€.
func contains(slice []string, item string) bool {
	for _, s := range slice {
		if s == item {
			return true
		}
	}
	return false
}

// uniqueStrings ν•¨μˆ˜λŠ” 슬라이슀의 고유 λ¬Έμžμ—΄μ„ λ°˜ν™˜ν•©λ‹ˆλ‹€.
func uniqueStrings(input []string) []string {
	unique := make(map[string]struct{}, len(input))
	for _, str := range input {
		unique[str] = struct{}{}
	}

	output := make([]string, 0, len(unique))
	for str := range unique {
		output = append(output, str)
	}

	return output
}

// parseJSON ν•¨μˆ˜λŠ” JSON λ¬Έμžμ—΄μ„ νŒŒμ‹±ν•©λ‹ˆλ‹€.
func parseJSON(jsonStr string) map[string]interface{} {
	var result map[string]interface{}
	if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
		return nil
	}
	return result
}

// getHosts ν•¨μˆ˜λŠ” ν”Όμ–΄ λͺ©λ‘μ—μ„œ 호슀트λ₯Ό μΆ”μΆœν•©λ‹ˆλ‹€.
func getHosts(peers []map[string]interface{}) []string {
	hosts := []string{}
	for _, peer := range peers {
		hosts = append(hosts, peer["host"].(string))
	}
	return hosts
}

// abs ν•¨μˆ˜λŠ” μ ˆλŒ€κ°’μ„ λ°˜ν™˜ν•©λ‹ˆλ‹€.
func abs(x int) int {
	if x < 0 {
		return -x
	}
	return x
}

// max ν•¨μˆ˜λŠ” 두 κ°’ 쀑 더 큰 값을 λ°˜ν™˜ν•©λ‹ˆλ‹€.
func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

// min ν•¨μˆ˜λŠ” 두 κ°’ 쀑 더 μž‘μ€ 값을 λ°˜ν™˜ν•©λ‹ˆλ‹€.
func min(a, b int) int {
	if a < b {
		return a
	}
	return b
}

// main ν•¨μˆ˜λŠ” PeerSearcher μΈμŠ€ν„΄μŠ€λ₯Ό μƒμ„±ν•˜κ³  메인 μž‘μ—…μ„ μˆ˜ν–‰ν•©λ‹ˆλ‹€.
func main() {
	ps := NewPeerSearcher()
	ps.Main()
}

μ½”λ“œ μ„€λͺ…:

이 μ½”λ“œλŠ” ν”Όμ–΄ 검색 μ„œλΉ„μŠ€(PeerSearcher)λ₯Ό κ΅¬ν˜„ν•©λ‹ˆλ‹€. 이 μ„œλΉ„μŠ€λŠ” λ„€νŠΈμ›Œν¬μ—μ„œ ν”Όμ–΄λ₯Ό μ°Ύκ³ , μ•Œλ €μ§„ 호슀트 정보λ₯Ό μœ μ§€ κ΄€λ¦¬ν•˜λ©°, μƒνƒœ λ²ˆλ“€μ„ μƒμ„±ν•˜λŠ” μž‘μ—…μ„ μˆ˜ν–‰ν•©λ‹ˆλ‹€.

μ£Όμš” ν•¨μˆ˜:

  • NewPeerSearcher: PeerSearcher ꡬ쑰체의 μΈμŠ€ν„΄μŠ€λ₯Ό μƒμ„±ν•˜κ³  μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.

  • Main: PeerSearcher의 메인 μž‘μ—…μ„ μˆ˜ν–‰ν•©λ‹ˆλ‹€. μ„œλΉ„μŠ€κ°€ 이미 μ‹€ν–‰ 쀑인지 ν™•μΈν•˜κ³  μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.

  • Init: PeerSearcherλ₯Ό μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€. 주기적인 μž‘μ—…μ„ μ„€μ •ν•˜κ³  초기 μƒνƒœλ₯Ό ν™•μΈν•©λ‹ˆλ‹€.

  • addRoutine: 주기적인 μž‘μ—…μ„ μˆ˜ν–‰ν•˜λŠ” 고루틴을 μƒμ„±ν•©λ‹ˆλ‹€.

  • checkRequestedPeers: μš”μ²­λœ ν”Όμ–΄λ₯Ό ν™•μΈν•˜κ³ , μ•Œλ €μ§„ 호슀트 λͺ©λ‘μ„ μ—…λ°μ΄νŠΈν•©λ‹ˆλ‹€.

  • checkPeers: ν”Όμ–΄λ₯Ό ν™•μΈν•˜κ³ , ν”Όμ–΄ 및 μ•Œλ €μ§„ 호슀트 λͺ©λ‘μ„ μ—…λ°μ΄νŠΈν•©λ‹ˆλ‹€.

  • checkKnownHosts: μ•Œλ €μ§„ 호슀트λ₯Ό ν™•μΈν•˜κ³ , ν”Όμ–΄ 및 μ•Œλ €μ§„ 호슀트 λͺ©λ‘μ„ μ—…λ°μ΄νŠΈν•©λ‹ˆλ‹€.

  • makeStatusBundle: μƒνƒœ λ²ˆλ“€μ„ μƒμ„±ν•©λ‹ˆλ‹€.

  • seeTrackers: 트래컀λ₯Ό ν™•μΈν•˜κ³  ν”Όμ–΄ 및 μ•Œλ €μ§„ 호슀트 정보λ₯Ό λ°˜ν™˜ν•©λ‹ˆλ‹€.

  • searchRequestItems: μš”μ²­ μ•„μ΄ν…œμ„ μƒμ„±ν•©λ‹ˆλ‹€.

  • searchVersionItems: 버전 μ•„μ΄ν…œμ„ μƒμ„±ν•©λ‹ˆλ‹€.

  • peerAddress: ν”Όμ–΄ μ£Όμ†Œλ₯Ό λ°˜ν™˜ν•©λ‹ˆλ‹€.

  • take: μŠ¬λΌμ΄μŠ€μ—μ„œ μ§€μ •λœ 수의 μš”μ†Œλ₯Ό κ°€μ Έμ˜΅λ‹ˆλ‹€.

  • contains: μŠ¬λΌμ΄μŠ€μ— μ§€μ •λœ ν•­λͺ©μ΄ ν¬ν•¨λ˜μ–΄ μžˆλŠ”μ§€ ν™•μΈν•©λ‹ˆλ‹€.

  • uniqueStrings: 슬라이슀의 고유 λ¬Έμžμ—΄μ„ λ°˜ν™˜ν•©λ‹ˆλ‹€.

  • parseJSON: JSON λ¬Έμžμ—΄μ„ νŒŒμ‹±ν•©λ‹ˆλ‹€.

  • getHosts: ν”Όμ–΄ λͺ©λ‘μ—μ„œ 호슀트λ₯Ό μΆ”μΆœν•©λ‹ˆλ‹€.

  • abs: μ ˆλŒ€κ°’μ„ λ°˜ν™˜ν•©λ‹ˆλ‹€.

  • max: 두 κ°’ 쀑 더 큰 값을 λ°˜ν™˜ν•©λ‹ˆλ‹€.

  • min: 두 κ°’ 쀑 더 μž‘μ€ 값을 λ°˜ν™˜ν•©λ‹ˆλ‹€.

  • main: PeerSearcher μΈμŠ€ν„΄μŠ€λ₯Ό μƒμ„±ν•˜κ³  메인 μž‘μ—…μ„ μˆ˜ν–‰ν•©λ‹ˆλ‹€.

Last updated