PeerSearcher.go
package service
import (
"encoding/json"
"fmt"
"log"
"os"
"time"
"saseul/config"
"saseul/data/chain"
"saseul/data/env"
"saseul/data/resourcechain"
"saseul/data/tracker"
"saseul/datasource/poolclient"
"saseul/staff/processmanager"
"saseul/util/clock"
"saseul/util/filter"
"saseul/util/hasher"
"saseul/util/parser"
"saseul/util/restcall"
"saseul/util/signer"
)
// PeerSearcher ꡬ쑰체λ νΌμ΄ νμ μλΉμ€λ₯Ό μ μν©λλ€.
type PeerSearcher struct {
Description string
Iterate int
CheckCount int
}
// NewPeerSearcher ν¨μλ PeerSearcher μΈμ€ν΄μ€λ₯Ό μμ±νκ³ μ΄κΈ°νν©λλ€.
func NewPeerSearcher() *PeerSearcher {
return &PeerSearcher{
Description: "Peer searcher service",
Iterate: 1000000,
CheckCount: 32,
}
}
// Main ν¨μλ PeerSearcherμ λ©μΈ μμ
μ μνν©λλ€.
func (ps *PeerSearcher) Main() {
if config.Environment == "process" {
if processmanager.IsRunning(processmanager.PEER_SEARCHER) {
log.Println("Peer searcher process is already running.")
os.Exit(1)
}
processmanager.Save(processmanager.PEER_SEARCHER)
}
poolclient.Instance().SetMode("rewind")
restcall.Instance().SetTimeout(config.ROUND_TIMEOUT)
defer func() {
if processmanager.PID(processmanager.PEER_SEARCHER) == os.Getpid() {
log.Println("Peer searcher process has been successfully removed.")
processmanager.Delete(processmanager.PEER_SEARCHER)
}
}()
ps.Init()
for i := 0; i < ps.Iterate; i++ {
time.Sleep(1 * time.Second) // νμν κ²½μ° μ‘°μ
}
}
// Init ν¨μλ PeerSearcherλ₯Ό μ΄κΈ°νν©λλ€.
func (ps *PeerSearcher) Init() {
ps.addRoutine(ps.checkRequestedPeers, 5*time.Second)
ps.addRoutine(ps.checkPeers, 120*time.Second)
ps.addRoutine(ps.checkKnownHosts, 60*time.Second)
ps.addRoutine(ps.makeStatusBundle, 60*time.Second)
ps.checkRequestedPeers()
ps.checkPeers()
ps.checkKnownHosts()
ps.makeStatusBundle()
log.Println("Peer searcher process has started.")
}
// addRoutine ν¨μλ μ£ΌκΈ°μ μΈ μμ
μ μννλ κ³ λ£¨ν΄μ μμ±ν©λλ€.
func (ps *PeerSearcher) addRoutine(fn func(), interval time.Duration) {
go func() {
for range time.Tick(interval) {
fn()
}
}()
}
// checkRequestedPeers ν¨μλ μμ²λ νΌμ΄λ₯Ό νμΈν©λλ€.
func (ps *PeerSearcher) checkRequestedPeers() {
requests := poolclient.Instance().DrainPeerRequests()
peerHosts := tracker.GetPeerHosts()
knownHosts := tracker.GetKnownHosts()
bucket := []string{}
for _, item := range requests {
if !contains(peerHosts, item) && filter.IsPublicHost(item) {
bucket = append(bucket, parser.Endpoint(item))
}
}
knownHosts = uniqueStrings(append(knownHosts, bucket...))
tracker.SetKnownHosts(knownHosts)
}
// checkPeers ν¨μλ νΌμ΄λ₯Ό νμΈν©λλ€.
func (ps *PeerSearcher) checkPeers() {
log.Println("Check peers...")
hosts := tracker.GetPeerHosts()
peers := []map[string]interface{}{}
knownHosts := []string{}
for len(hosts) > 0 {
part := take(&hosts, ps.CheckCount)
trackers := ps.seeTrackers(part, true)
peers = append(peers, trackers["peers"].([]map[string]interface{})...)
knownHosts = uniqueStrings(append(knownHosts, trackers["known_hosts"].([]string)...))
}
peerHosts := getHosts(peers)
bucket := []string{}
for _, knownHost := range knownHosts {
if !contains(peerHosts, knownHost) && filter.IsPublicHost(knownHost) {
bucket = append(bucket, parser.Endpoint(knownHost))
}
}
knownHosts = uniqueStrings(append(bucket, tracker.GetKnownHosts()...))
tracker.SetPeers(peers)
tracker.SetKnownHosts(knownHosts)
}
// checkKnownHosts ν¨μλ μλ €μ§ νΈμ€νΈλ₯Ό νμΈν©λλ€.
func (ps *PeerSearcher) checkKnownHosts() {
log.Println("Check requested hosts...")
hosts := tracker.GetKnownHosts()
peers := []map[string]interface{}{}
knownHosts := []string{}
for len(hosts) > 0 {
part := take(&hosts, ps.CheckCount)
trackers := ps.seeTrackers(part, false)
peers = append(peers, trackers["peers"].([]map[string]interface{})...)
knownHosts = uniqueStrings(append(knownHosts, trackers["known_hosts"].([]string)...))
}
newPeers := tracker.GetPeers()
peerHosts := getHosts(newPeers)
for _, peer := range peers {
host := peer["host"].(string)
address := peer["address"].(string)
execTime := peer["exec_time"].(int)
if !contains(peerHosts, host) && filter.IsPublicHost(host) {
newPeers = append(newPeers, map[string]interface{}{
"host": parser.Endpoint(host),
"address": address,
"exec_time": execTime,
})
}
}
tracker.SetPeers(newPeers)
peerHosts = tracker.GetPeerHosts()
bucket := []string{}
for _, knownHost := range knownHosts {
if !contains(peerHosts, knownHost) && filter.IsPublicHost(knownHost) {
bucket = append(bucket, parser.Endpoint(knownHost))
}
}
tracker.SetKnownHosts(bucket)
}
// makeStatusBundle ν¨μλ μν λ²λ€μ μμ±ν©λλ€.
func (ps *PeerSearcher) makeStatusBundle() {
log.Println("Bundling...")
chain.Bundling()
}
// seeTrackers ν¨μλ νΈλ컀λ₯Ό νμΈνκ³ νΌμ΄ λ° μλ €μ§ νΈμ€νΈ μ 보λ₯Ό λ°νν©λλ€.
func (ps *PeerSearcher) seeTrackers(hosts []string, register bool) map[string]interface{} {
if len(hosts) == 0 {
return nil
}
endpoint := env.Endpoint()
now := clock.UTime()
height := max(chain.FixedHeight()-config.RESOURCE_CONFIRM_COUNT, 0)
items := ps.searchRequestItems(hosts, endpoint, register, height, now)
items2 := ps.searchVersionItems(hosts, endpoint)
var phrase string
if height > 0 {
phrase = resourcechain.Instance().Block(height).Blockhash
} else {
phrase = config.NetworkKey()
}
rs := restcall.Instance().MultiPOST(items)
rs2 := restcall.Instance().MultiPOST(items2)
peers := []map[string]interface{}{}
knownHosts := []string{}
latests := []string{}
for _, item := range rs2 {
host := item["host"].(string)
result := parseJSON(item["result"].(string))
data := result["data"].(map[string]interface{})
version := data["version"].(string)
if version >= "2.1.9.0" {
latests = append(latests, host)
}
}
for _, item := range rs {
host := item["host"].(string)
execTime := item["exec_time"].(int)
result := parseJSON(item["result"].(string))
if result == nil {
continue
}
data := result["data"].(map[string]interface{})
nodeData := data["node"].(map[string]interface{})
peerAddress := ps.peerAddress(nodeData, phrase, now)
if peerAddress == "" || !contains(latests, host) {
continue
}
peerData := data["peers"].([]map[string]interface{})
peerHosts := getHosts(peerData)
peers = append(peers, map[string]interface{}{
"host": host,
"address": peerAddress,
"exec_time": execTime,
})
knownHosts = uniqueStrings(append(knownHosts, peerHosts...))
}
return map[string]interface{}{
"peers": peers,
"known_hosts": knownHosts,
}
}
// searchRequestItems ν¨μλ μμ² μμ΄ν
μ μμ±ν©λλ€.
func (ps *PeerSearcher) searchRequestItems(hosts []string, endpoint string, register bool, height int, utime int) []map[string]interface{} {
items := []map[string]interface{}{}
for _, host := range hosts {
item := map[string]interface{}{
"url": fmt.Sprintf("%s/peer", host),
"data": map[string]interface{}{
"register": register,
"authentication": true,
"height": height,
"t": utime,
},
}
if endpoint != "" {
item["data"].(map[string]interface{})["host"] = endpoint
}
items = append(items, item)
}
return items
}
// searchVersionItems ν¨μλ λ²μ μμ΄ν
μ μμ±ν©λλ€.
func (ps *PeerSearcher) searchVersionItems(hosts []string, endpoint string) []map[string]interface{} {
items := []map[string]interface{}{}
for _, host := range hosts {
item := map[string]interface{}{
"url": fmt.Sprintf("%s/info", host),
"data": map[string]interface{}{},
}
if endpoint != "" {
item["data"].(map[string]interface{})["host"] = endpoint
}
items = append(items, item)
}
return items
}
// peerAddress ν¨μλ νΌμ΄ μ£Όμλ₯Ό λ°νν©λλ€.
func (ps *PeerSearcher) peerAddress(nodeData map[string]interface{}, phrase string, now int) string {
timestamp := int(nodeData["timestamp"].(float64))
genesisAddress := nodeData["genesis_address"].(string)
publicKey := nodeData["public_key"].(string)
signature := nodeData["signature"].(string)
stringToHash := hasher.Hash(hasher.HexTime(timestamp) + phrase)
timeValidity := abs(now-timestamp) < config.TIMESTAMP_ERROR_LIMIT
signatureValidity := signer.SignatureValidity(stringToHash, publicKey, signature)
genesisValidity := genesisAddress == config.GenesisAddress
if timeValidity && signatureValidity && genesisValidity {
return signer.Address(publicKey)
}
return ""
}
// take ν¨μλ μ¬λΌμ΄μ€μμ μ§μ λ μμ μμλ₯Ό κ°μ Έμ΅λλ€.
func take(hosts *[]string, count int) []string {
n := min(count, len(*hosts))
part := (*hosts)[:n]
*hosts = (*hosts)[n:]
return part
}
// contains ν¨μλ μ¬λΌμ΄μ€μ μ§μ λ νλͺ©μ΄ ν¬ν¨λμ΄ μλμ§ νμΈν©λλ€.
func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}
// uniqueStrings ν¨μλ μ¬λΌμ΄μ€μ κ³ μ λ¬Έμμ΄μ λ°νν©λλ€.
func uniqueStrings(input []string) []string {
unique := make(map[string]struct{}, len(input))
for _, str := range input {
unique[str] = struct{}{}
}
output := make([]string, 0, len(unique))
for str := range unique {
output = append(output, str)
}
return output
}
// parseJSON ν¨μλ JSON λ¬Έμμ΄μ νμ±ν©λλ€.
func parseJSON(jsonStr string) map[string]interface{} {
var result map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &result); err != nil {
return nil
}
return result
}
// getHosts ν¨μλ νΌμ΄ λͺ©λ‘μμ νΈμ€νΈλ₯Ό μΆμΆν©λλ€.
func getHosts(peers []map[string]interface{}) []string {
hosts := []string{}
for _, peer := range peers {
hosts = append(hosts, peer["host"].(string))
}
return hosts
}
// abs ν¨μλ μ λκ°μ λ°νν©λλ€.
func abs(x int) int {
if x < 0 {
return -x
}
return x
}
// max ν¨μλ λ κ° μ€ λ ν° κ°μ λ°νν©λλ€.
func max(a, b int) int {
if a > b {
return a
}
return b
}
// min ν¨μλ λ κ° μ€ λ μμ κ°μ λ°νν©λλ€.
func min(a, b int) int {
if a < b {
return a
}
return b
}
// main ν¨μλ PeerSearcher μΈμ€ν΄μ€λ₯Ό μμ±νκ³ λ©μΈ μμ
μ μνν©λλ€.
func main() {
ps := NewPeerSearcher()
ps.Main()
}μ½λ μ€λͺ
:
μ£Όμ ν¨μ:
Last updated