Observer.go

package staff

import (
	"encoding/json"
	"fmt"
	"net/http"
	"net/url"
	"strings"
	"sync"
	"time"

	"saseul/config"
	"saseul/data"
	"saseul/model"
	"saseul/rpc"
	"saseul/util"
)

// Observer 구조체는 네트워크 활동을 관찰하는 역할을 합니다.
type Observer struct {
	TimeWeight  int
	ResetCount  int
	mu          sync.Mutex
}

var instance *Observer
var once sync.Once

// Instance 함수는 Observer의 싱글톤 인스턴스를 반환합니다.
// 인스턴스가 존재하지 않으면 새로운 인스턴스를 생성합니다.
func Instance() *Observer {
	once.Do(func() {
		instance = &Observer{}
	})
	return instance
}

// SeeRounds 함수는 주어진 호스트 리스트를 통해 라운드를 조회합니다.
// 유효한 블록을 확인하고 라운드 정보를 반환합니다.
func (o *Observer) SeeRounds(hosts []string, lastBlock *model.MainBlock) map[string][]RoundInfo {
	if len(hosts) == 0 {
		return nil
	}

	util.RestCallInstance().SetTimeout(config.RoundTimeout + o.TimeWeight)

	data := url.Values{
		"height":     {fmt.Sprint(lastBlock.Height + 1)},
		"chain_type": {"main"},
		"t":          {fmt.Sprint(util.ClockUtime())},
	}

	rounds := make(map[string][]RoundInfo)
	results := util.RestCallInstance().MultiGet(appendQueryToHosts(hosts, data.Encode()))

	for _, item := range results {
		host := item.Host
		result := parseJSONResult(item.Result)

		if result == nil {
			o.addTimeWeight()
			continue
		}
		o.resetTimeWeight()

		block := new(model.MainBlock)
		parseResultData(result, "block", block)

		if block.Height == lastBlock.Height+1 && block.PreviousBlockHash == lastBlock.BlockHash && block.StructureValidity() {
			syncLimit := int(result["data"].(map[string]interface{})["sync_limit"].(float64))
			rounds[block.BlockHash] = append(rounds[block.BlockHash], RoundInfo{Host: host, SyncLimit: syncLimit})
		}
	}

	return rounds
}

// SeeMiningRounds 함수는 주어진 호스트 리스트를 통해 마이닝 라운드를 조회합니다.
// 유효한 리소스 블록을 확인하고 마이닝 라운드 정보를 반환합니다.
func (o *Observer) SeeMiningRounds(hosts []string, fixedHeight int) map[string][]MiningRoundInfo {
	if len(hosts) == 0 {
		return nil
	}

	util.RestCallInstance().SetTimeout(config.RoundTimeout + o.TimeWeight)
	data := url.Values{
		"height":     {fmt.Sprint(fixedHeight)},
		"chain_type": {"resource"},
		"t":          {fmt.Sprint(util.ClockUtime())},
	}

	miningRounds := make(map[string][]MiningRoundInfo)
	results := util.RestCallInstance().MultiGet(appendQueryToHosts(hosts, data.Encode()))

	for _, item := range results {
		host := item.Host
		result := parseJSONResult(item.Result)

		if result == nil {
			o.addTimeWeight()
			continue
		}
		o.resetTimeWeight()

		execTime := item.ExecTime
		block := new(model.ResourceBlock)
		parseResultData(result, "block", block)

		if block.Height >= fixedHeight && block.StructureValidity() {
			syncLimit := int(result["data"].(map[string]interface{})["sync_limit"].(float64))
			miningRounds[block.BlockHash] = append(miningRounds[block.BlockHash], MiningRoundInfo{Host: host, SyncLimit: syncLimit, ExecTime: execTime})
		}
	}

	return miningRounds
}

// SeeBroadcasts 함수는 주어진 호스트 리스트를 통해 브로드캐스트를 조회합니다.
// 유효한 데이터를 처리합니다.
func (o *Observer) SeeBroadcasts(hosts []string, lastBlock *model.MainBlock) {
	if len(hosts) == 0 {
		return
	}

	util.RestCallInstance().SetTimeout(config.DataTimeout + o.TimeWeight)

	now := util.ClockUtime()
	roundKey := lastBlock.BlockHash
	data := url.Values{
		"chain_type": {"main"},
		"round_key":  {roundKey},
		"t":          {fmt.Sprint(now)},
	}

	results := util.RestCallInstance().MultiGet(appendQueryToHosts(hosts, data.Encode()))

	bunchTxs := data.BunchListTx()
	bunchChunks := data.BunchListChunk(roundKey)
	bunchHypotheses := data.BunchListHypothesis(roundKey)

	for _, item := range results {
		result := parseJSONResult(item.Result)

		if result == nil {
			o.addTimeWeight()
			continue
		}
		o.resetTimeWeight()

		processTransactions(result["data"], lastBlock, bunchTxs)
		processChunks(result["data"], lastBlock, now, roundKey, bunchChunks)
		processHypotheses(result["data"], lastBlock, now, roundKey, bunchHypotheses, bunchChunks)
	}
}

// SeeResourceBroadcasts 함수는 주어진 호스트 리스트를 통해 리소스 브로드캐스트를 조회합니다.
// 유효한 리소스 데이터를 처리합니다.
func (o *Observer) SeeResourceBroadcasts(hosts []string, lastBlock *model.ResourceBlock) {
	if len(hosts) == 0 {
		return
	}

	util.RestCallInstance().SetTimeout(config.RoundTimeout + o.TimeWeight)

	roundKey := lastBlock.BlockHash
	data := url.Values{
		"chain_type": {"resource"},
		"round_key":  {roundKey},
		"t":          {fmt.Sprint(util.ClockUtime())},
	}

	results := util.RestCallInstance().MultiGet(appendQueryToHosts(hosts, data.Encode()))
	bunchReceipts := data.BunchListReceipt(roundKey)

	for _, item := range results {
		result := parseJSONResult(item.Result)

		if result == nil {
			o.addTimeWeight()
			continue
		}
		o.resetTimeWeight()

		processReceipts(result["data"], roundKey, bunchReceipts)
	}
}

// SeeBlocks 함수는 주어진 호스트 리스트를 통해 블록을 조회합니다.
// 유효한 블록을 확인하고 결정 정보를 반환합니다.
func (o *Observer) SeeBlocks(hosts []string, height int) []Decision {
	if len(hosts) == 0 {
		return nil
	}

	util.RestCallInstance().SetTimeout(config.DataTimeout + o.TimeWeight)

	data := rpc.FactoryRequest("GetBlocks", map[string]interface{}{
		"target": height,
		"full":   true,
		"t":      util.ClockUtime(),
	}, data.EnvPeer().PrivateKey()).JSON()

	items := makePostItems(hosts, data)
	results := util.RestCallInstance().MultiPost(items, []string{"Content-Type: application/json;"})

	var decisions []Decision
	for _, item := range results {
		host := item.Host
		result := parseJSONResult(item.Result)

		if result == nil {
			o.addTimeWeight()
			continue
		}
		o.resetTimeWeight()

		blocks := result["data"].([]interface{})
		decisions = append(decisions, Decision{Host: host, Blocks: blocks})
	}

	return decisions
}

// SeeResourceBlocks 함수는 주어진 호스트 리스트를 통해 리소스 블록을 조회합니다.
// 유효한 리소스 블록을 확인하고 결정 정보를 반환합니다.
func (o *Observer) SeeResourceBlocks(hosts []string, height int) []Decision {
	if len(hosts) == 0 {
		return nil
	}

	util.RestCallInstance().SetTimeout(config.DataTimeout + o.TimeWeight)

	data := rpc.FactoryRequest("GetResourceBlocks", map[string]interface{}{
		"target": height,
		"full":   true,
		"t":      util.ClockUtime(),
	}, data.EnvPeer().PrivateKey()).JSON()

	items := makePostItems(hosts, data)
	results := util.RestCallInstance().MultiPost(items, []string{"Content-Type: application/json;"})

	var decisions []Decision
	for _, item := range results {
		host := item.Host
		result := parseJSONResult(item.Result)

		if result == nil {
			o.addTimeWeight()
			continue
		}
		o.resetTimeWeight()

		blocks := result["data"].([]interface{})
		decisions = append(decisions, Decision{Host: host, Blocks: blocks})
	}

	return decisions
}

// addTimeWeight 함수는 타임 가중치를 증가시킵니다.
// 최대 값은 3으로 설정됩니다.
func (o *Observer) addTimeWeight() {
	o.mu.Lock()
	defer o.mu.Unlock()

	o.TimeWeight++
	if o.TimeWeight > 3 {
		o.TimeWeight = 3
	}
}

// resetTimeWeight 함수는 타임 가중치를 초기화합니다.
// 특정 조건을 충족하면 가중치를 0으로 설정합니다.
func (o *Observer) resetTimeWeight() {
	o.mu.Lock()
	defer o.mu.Unlock()

	if o.ResetCount > 50 {
		o.ResetCount = 0
		o.TimeWeight = 0
	} else {
		o.ResetCount++
	}
}

// appendQueryToHosts 함수는 호스트 리스트에 쿼리 문자열을 추가합니다.
func appendQueryToHosts(hosts []string, query string) []string {
	for i, host := range hosts {
		hosts[i] = host + "?" + query
	}
	return hosts
}

// parseJSONResult 함수는 JSON 문자열을 파싱하여 결과를 반환합니다.
func parseJSONResult(result string) map[string]interface{} {
	var parsed map[string]interface{}
	if err := json.Unmarshal([]byte(result), &parsed); err != nil {
		return nil
	}
	return parsed
}

// parseResultData 함수는 결과 데이터에서 특정 키의 값을 파싱하여 타겟에 설정합니다.
func parseResultData(result map[string]interface{}, key string, target interface{}) {
	if data, ok := result["data"].(map[string]interface{}); ok {
		if blockData, ok := data[key]; ok {
			bytes, _ := json.Marshal(blockData)
			json.Unmarshal(bytes, target)
		}
	}
}

// processTransactions 함수는 트랜잭션 데이터를 처리합니다.
func processTransactions(data interface{}, lastBlock *model.MainBlock, bunchTxs map[string]interface{}) {
	// 트랜잭션 처리 로직 구현
}

// processChunks 함수는 청크 데이터를 처리합니다.
func processChunks(data interface{}, lastBlock *model.MainBlock, now int, roundKey string, bunchChunks map[string]interface{}) {
	// 청크 처리 로직 구현
}

// processHypotheses 함수는 가설 데이터를 처리합니다.
func processHypotheses(data interface{}, lastBlock *model.MainBlock, now int, roundKey string, bunchHypotheses, bunchChunks map[string]interface{}) {
	// 가설 처리 로직 구현
}

// processReceipts 함수는 영수증 데이터를 처리합니다.
func processReceipts(data interface{}, roundKey string, bunchReceipts map[string]interface{}) {
	// 영수증 처리 로직 구현
}

// makePostItems 함수는 호스트 리스트와 데이터를 사용하여 POST 아이템을 생성합니다.
func makePostItems(hosts []string, data string) []util.PostItem {
	items := make([]util.PostItem, len(hosts))
	for i, host := range hosts {
		items[i] = util.PostItem{
			URL:  fmt.Sprintf("%s/rawrequest", host),
			Data: data,
		}
	}
	return items
}

// 추가적으로 필요한 구조체 정의

// RoundInfo 구조체는 라운드 정보를 담고 있습니다.
type RoundInfo struct {
	Host      string
	SyncLimit int
}

// MiningRoundInfo 구조체는 마이닝 라운드 정보를 담고 있습니다.
type MiningRoundInfo struct {
	Host      string
	SyncLimit int
	ExecTime  int64
}

// Decision 구조체는 결정 정보를 담고 있습니다.
type Decision struct {
	Host   string
	Blocks []interface{}
}

Observer 클래스 설명

Observer 클래스는 네트워크 활동을 관찰하고, 주어진 호스트에서 블록 및 데이터를 수집하여 처리하는 역할을 합니다. 이를 통해 네트워크 상태를 모니터링하고, 필요한 데이터를 동기화합니다.

코드 설명

구조체와 변수

  • Observer

    • TimeWeight: 시간 가중치로, 네트워크 응답 지연에 따른 가중치 값을 저장합니다.

    • ResetCount: 시간 가중치 초기화 카운트로, 일정 횟수 이상 초기화 조건이 충족되면 시간 가중치를 초기화합니다.

    • mu: sync.Mutex 객체로, 동기화를 위해 사용됩니다.

  • Instance

    • instanceonce 변수는 싱글톤 패턴을 구현하기 위해 사용됩니다.

함수 설명

  • Instance()

    • Observer 클래스의 싱글톤 인스턴스를 반환합니다. 인스턴스가 존재하지 않을 경우 새로 생성합니다.

  • *SeeRounds(hosts []string, lastBlock model.MainBlock)

    • 주어진 호스트 리스트에서 블록 라운드를 조회하고 유효한 블록을 확인하여 라운드 정보를 반환합니다.

  • SeeMiningRounds(hosts []string, fixedHeight int)

    • 주어진 호스트 리스트에서 마이닝 라운드를 조회하고 유효한 리소스 블록을 확인하여 마이닝 라운드 정보를 반환합니다.

  • *SeeBroadcasts(hosts []string, lastBlock model.MainBlock)

    • 주어진 호스트 리스트에서 브로드캐스트를 조회하고 유효한 데이터를 처리합니다.

  • *SeeResourceBroadcasts(hosts []string, lastBlock model.ResourceBlock)

    • 주어진 호스트 리스트에서 리소스 브로드캐스트를 조회하고 유효한 리소스 데이터를 처리합니다.

  • SeeBlocks(hosts []string, height int)

    • 주어진 호스트 리스트에서 블록을 조회하고 유효한 블록을 확인하여 결정 정보를 반환합니다.

  • SeeResourceBlocks(hosts []string, height int)

    • 주어진 호스트 리스트에서 리소스 블록을 조회하고 유효한 리소스 블록을 확인하여 결정 정보를 반환합니다.

  • addTimeWeight()

    • 시간 가중치를 증가시킵니다. 최대 값은 3으로 설정됩니다.

  • resetTimeWeight()

    • 시간 가중치를 초기화합니다. 특정 조건을 충족하면 가중치를 0으로 설정합니다.

  • appendQueryToHosts(hosts []string, query string)

    • 호스트 리스트에 쿼리 문자열을 추가합니다.

  • parseJSONResult(result string)

    • JSON 문자열을 파싱하여 결과를 반환합니다.

  • parseResultData(result map[string]interface{}, key string, target interface{})

    • 결과 데이터에서 특정 키의 값을 파싱하여 타겟에 설정합니다.

  • *processTransactions(data interface{}, lastBlock model.MainBlock, bunchTxs map[string]interface{})

    • 트랜잭션 데이터를 처리합니다.

  • *processChunks(data interface{}, lastBlock model.MainBlock, now int, roundKey string, bunchChunks map[string]interface{})

    • 청크 데이터를 처리합니다.

  • *processHypotheses(data interface{}, lastBlock model.MainBlock, now int, roundKey string, bunchHypotheses, bunchChunks map[string]interface{})

    • 가설 데이터를 처리합니다.

  • processReceipts(data interface{}, roundKey string, bunchReceipts map[string]interface{})

    • 영수증 데이터를 처리합니다.

  • makePostItems(hosts []string, data string)

    • 호스트 리스트와 데이터를 사용하여 POST 아이템을 생성합니다.

추가 구조체 설명

  • RoundInfo

    • Host: 호스트 주소

    • SyncLimit: 동기화 제한

  • MiningRoundInfo

    • Host: 호스트 주소

    • SyncLimit: 동기화 제한

    • ExecTime: 실행 시간

  • Decision

    • Host: 호스트 주소

    • Blocks: 블록 리스트

이 코드는 다양한 네트워크 활동을 모니터링하고 데이터를 수집하여 처리하는 데 유용하게 사용될 수 있습니다.

코드 분석

  1. Observer 구조체:

    • Observer 구조체는 피어 네트워크에서 라운드, 블록, 브로드캐스트 등을 관찰하는 역할을 합니다.

    • TimeWeightResetCount는 시간 가중치와 초기화 횟수를 추적합니다.

    • mu는 동시성 제어를 위한 뮤텍스입니다.

  2. 싱글톤 인스턴스:

    • instanceonce를 사용하여 Observer의 싱글톤 인스턴스를 관리합니다.

    • Instance 함수는 Observer의 싱글톤 인스턴스를 반환합니다. 인스턴스가 없으면 새로운 인스턴스를 생성합니다.

  3. SeeRounds 함수:

    • 호스트에서 라운드를 조회하고 라운드 정보를 반환합니다.

    • lastBlock의 다음 블록 높이를 기준으로 라운드를 조회합니다.

    • 조회된 결과를 RoundInfo 구조체에 저장합니다.

  4. SeeMiningRounds 함수:

    • 호스트에서 마이닝 라운드를 조회하고 마이닝 라운드 정보를 반환합니다.

    • fixedHeight를 기준으로 라운드를 조회합니다.

    • 조회된 결과를 MiningRoundInfo 구조체에 저장합니다.

  5. SeeBroadcasts 함수:

    • 호스트에서 브로드캐스트를 조회합니다.

    • lastBlock의 라운드 키를 사용하여 브로드캐스트 데이터를 조회합니다.

    • 조회된 결과를 처리하여 트랜잭션, 청크, 가설을 처리합니다.

  6. SeeResourceBroadcasts 함수:

    • 호스트에서 리소스 브로드캐스트를 조회합니다.

    • lastBlock의 라운드 키를 사용하여 리소스 브로드캐스트 데이터를 조회합니다.

    • 조회된 결과를 처리하여 영수증을 처리합니다.

  7. SeeBlocks 함수:

    • 호스트에서 블록을 조회하고 결정을 반환합니다.

    • height를 기준으로 블록을 조회합니다.

    • 조회된 결과를 Decision 구조체에 저장합니다.

  8. SeeResourceBlocks 함수:

    • 호스트에서 리소스 블록을 조회하고 결정을 반환합니다.

    • height를 기준으로 리소스 블록을 조회합니다.

    • 조회된 결과를 Decision 구조체에 저장합니다.

  9. addTimeWeight 함수:

    • 실패 시 TimeWeight를 증가시킵니다. 최대 3까지 증가합니다.

  10. resetTimeWeight 함수:

    • 성공 시 TimeWeight를 초기화합니다. ResetCount가 50을 초과하면 TimeWeight를 0으로 설정합니다.

  11. appendQueryToHosts 함수:

    • 호스트 URL에 쿼리 문자열을 추가합니다.

  12. parseJSONResult 함수:

    • JSON 문자열을 맵으로 파싱합니다.

  13. parseResultData 함수:

    • 결과 데이터에서 특정 키의 데이터를 파싱합니다.

  14. processTransactions 함수:

    • 트랜잭션을 처리합니다.

  15. processChunks 함수:

    • 청크를 처리합니다.

  16. processHypotheses 함수:

    • 가설을 처리합니다.

  17. processReceipts 함수:

    • 영수증을 처리합니다.

  18. makePostItems 함수:

    • POST 요청 항목을 생성합니다.

Last updated