From 729e7ac04f135f3b81c27ae61e69f58908bad01f Mon Sep 17 00:00:00 2001 From: *** <***@example.com> Date: Wed, 21 Apr 2021 09:06:03 +0000 Subject: [PATCH 2/2] Use run number, Unmarshal block json to actual slices --- btc_manager.go | 65 ++++++---- mine.go | 20 ++- miner.go | 339 ++++++++++++++++++++++++++++--------------------- 3 files changed, 242 insertions(+), 182 deletions(-) diff --git a/btc_manager.go b/btc_manager.go index 25b74ea..5e9e599 100644 --- a/btc_manager.go +++ b/btc_manager.go @@ -1,46 +1,73 @@ package main - import ( "encoding/json" + "errors" "fmt" "io/ioutil" "log" "net/http" "strings" "time" - "errors" ) - var btc_is_connected = false - func set_connected(onoff bool) { if btc_is_connected != onoff { btc_is_connected = onoff switch onoff { case true: fmt.Println("Connected to BTC") - case false : + case false: fmt.Println("Disconnecting from BTC...") miner_command_channel <- "shutdown" - resp := <- miner_output_channel + resp := <-miner_output_channel if resp != "confirmed" { log.Println("Unexpected response from BTC disconnect miner shutdown: ", resp) } fmt.Println("Disconnected from BTC") } - } + } +} + +type BlockResult struct { + Result Block `json:"result"` +} + +func make_bitcoin_block_call(client *http.Client, method, params string) (*Block, error) { + var bytes, err = make_bitcoin_internal_call(client, method, params) + + var result BlockResult + + err = json.Unmarshal(bytes, &result) + if err != nil { + return nil, errors.New("bad_json") + } + + return &result.Result, nil + } -func make_bitcoin_call(client *http.Client, method, params string) (interface{}, error) { +func make_bitcoin_other_call(client *http.Client, method, params string) (interface{}, error) { + var bytes, err = make_bitcoin_internal_call(client, method, params) + + var result map[string]interface{} + + err = json.Unmarshal(bytes, &result) + if err != nil { + return nil, errors.New("bad_json") + } + return result["result"], nil +} + +func make_bitcoin_internal_call(client *http.Client, method, params string) ([]byte, error) { port := "8332" - if u_config.regtest{ + if u_config.regtest { port = "18443" } - body := strings.NewReader("{\"jsonrpc\":\"1.0\",\"id\":\"curltext\",\"method\":\""+method+"\",\"params\":["+params+"]}") + body := strings.NewReader("{\"jsonrpc\":\"1.0\",\"id\":\"curltext\",\"method\":\"" + method + "\",\"params\":[" + params + "]}") req, err := http.NewRequest("POST", "http://"+u_config.username+":"+u_config.password+"@127.0.0.1:"+port, body) if err != nil { @@ -55,7 +82,7 @@ func make_bitcoin_call(client *http.Client, method, params string) (interface{}, } defer resp.Body.Close() - resp_bytes, err := ioutil.ReadAll(resp.Body) + resp_bytes, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal("phone btc ERROR 3", err) @@ -67,30 +94,22 @@ func make_bitcoin_call(client *http.Client, method, params string) (interface{}, return nil, errors.New("bad_log_info") } - var result map[string]interface{} - - err = json.Unmarshal(resp_bytes, &result) - if err != nil { - return nil, errors.New("bad_json") - } - set_connected(true) - return result["result"], nil + return resp_bytes, nil } - func ping_btc_loop() { // This pings BTC every X seconds while not connected. Once receiving a successful response, it sets connected from within the make_bitcoin_call func. for { if !btc_is_connected { - _, err := make_bitcoin_call(http.DefaultClient, "getbestblockhash", "") + _, err := make_bitcoin_other_call(http.DefaultClient, "getbestblockhash", "") if err != nil && err.Error() == "bad_log_info" { return } } - time.Sleep(1*time.Second) + time.Sleep(1 * time.Second) } -} \ No newline at end of file +} diff --git a/miner.go b/miner.go index 4bdbbb1..5ecbf54 100644 --- a/miner.go +++ b/miner.go @@ -1,4 +1,4 @@ -package main +package main import ( //"bytes" @@ -7,18 +7,18 @@ import ( "net/http" //"os" //"strconv" + "encoding/hex" + "encoding/json" + "errors" "strings" "sync" "time" - "errors" ) - var active_miner int var miner_command_channel chan string var miner_output_channel chan string - // ~~~DATA TYPES~~~ // Mining run config @@ -29,7 +29,7 @@ type MiningConfig struct { password string // Start and finish - start_height int + start_height int target_height int // Mine/Unmine @@ -40,21 +40,20 @@ type MiningConfig struct { // DB Path mined_db_path string - } - // Output by callers type Call_Output struct { - height int - hash string - content interface{} + height int + hash string + content interface{} + block_content *Block } // Read_Block -type Read_Block struct{ - height int - hash string +type Read_Block struct { + height int + hash string content [][4]string } @@ -71,7 +70,7 @@ func make_client() *http.Client { } func check_reorg(client *http.Client, hc_height int, u_config UserConfig) (bool, error) { - + hc_hash, err := commitsdb.Get([]byte("99999999"+fmt.Sprintf("%08d", hc_height)), nil) if err != nil { // If key can't be found, return true. Gross. @@ -81,8 +80,7 @@ func check_reorg(client *http.Client, hc_height int, u_config UserConfig) (bool, log.Fatal("DB GET ERROR", err) } - - result, err := make_bitcoin_call(client, "getblockhash", fmt.Sprint(hc_height)) + result, err := make_bitcoin_other_call(client, "getblockhash", fmt.Sprint(hc_height)) if err != nil { return false, err } @@ -92,7 +90,7 @@ func check_reorg(client *http.Client, hc_height int, u_config UserConfig) (bool, if string(hc_hash) == btc_hash { fmt.Println("NO REORG AT HEIGHT", fmt.Sprint(hc_height), ":", string(hc_hash), btc_hash) return false, nil - } + } fmt.Println("REORG AT HEIGHT", fmt.Sprint(hc_height), ":", string(hc_hash), btc_hash) return true, nil @@ -118,7 +116,7 @@ func find_reorg(client *http.Client, hc_height int, u_config UserConfig) (int, e log.Fatal("DB GET ERROR", err) } - result, err := make_bitcoin_call(client, "getblockhash", fmt.Sprint(hc_height)) + result, err := make_bitcoin_other_call(client, "getblockhash", fmt.Sprint(hc_height)) if err != nil { return 0, err } @@ -131,14 +129,13 @@ func find_reorg(client *http.Client, hc_height int, u_config UserConfig) (int, e return hc_height, nil } - // ~~~COUNTER~~~ type Counter struct { sync.Mutex - s int // start - h int // height - t int // target + s int // start + h int // height + t int // target dir int // direction (mine: 1, unmine: -1) } @@ -162,15 +159,16 @@ func (c *Counter) check() int { x := c.h c.Unlock() return x -} - +} // ~~~INDEX~~~ type Index struct { - // StopStart Marker. Routines check this; if false, stop. Only main routine modifies - run bool + // StopStart Marker. Routines check this; if changed, stop. Only main routine modifies + // To prevent races, every reader / writer needs to hold mutex to change the integer + run_counter_mutex sync.RWMutex + run_counter int // Channel to the miner, reffed by readers to_miner chan Read_Block @@ -186,10 +184,20 @@ type Index struct { direction int } +func (i *Index) ForceExit() { + i.run_counter_mutex.Lock() + i.run_counter++ + i.run_counter_mutex.Unlock() +} + // ~~~CALLER~~~ type Caller struct { + // run number. This number must be equal to the one in index + // if the one in index increased, that means all Callers needs to exit + run_counter int + index *Index // Log info @@ -213,9 +221,18 @@ type Caller struct { // Miner Emergency Channel e_chan chan int - } +func (c *Caller) NeedsExit() bool { + // If main_run_counter is changed, stop running + c.index.run_counter_mutex.RLock() + main_run_counter := c.index.run_counter + c.index.run_counter_mutex.RUnlock() + if c.run_counter != main_run_counter { + return true + } + return false +} func make_caller(in_user, in_pass string, e_chan chan int, index *Index) *Caller { in_http_client := &http.Client{ @@ -225,31 +242,29 @@ func make_caller(in_user, in_pass string, e_chan chan int, index *Index) *Caller Timeout: 300 * time.Second, } caller := &Caller{ - index: index, - user: in_user, - pass: in_pass, + index: index, + user: in_user, + pass: in_pass, http_client: in_http_client, - e_chan: e_chan, + e_chan: e_chan, } return caller } - - -func (c Caller) get_block_info_for_height(height int) (map[string]interface{}, string, error) { +func (c Caller) get_block_info_for_height(height int) (*Block, string, error) { var hash string for { - // Only return an error when loop starts and run is false - if !c.index.run { + // Only return an error when loop starts and run is false + if c.NeedsExit() { return nil, "", errors.New("run is false") } if c.index.direction == 1 { // If MINE // Get hash and remove \n - result, err := make_bitcoin_call(c.http_client, "getblockhash", fmt.Sprint(height)) + result, err := make_bitcoin_other_call(c.http_client, "getblockhash", fmt.Sprint(height)) if err != nil { continue } @@ -280,40 +295,38 @@ func (c Caller) get_block_info_for_height(height int) (map[string]interface{}, s hash = string(hc_hash) } - // Get Block - block_info, err := make_bitcoin_call(c.http_client, "getblock", "\""+hash+"\", "+"2") + // Get Block + block_info, err := make_bitcoin_block_call(c.http_client, "getblock", "\""+hash+"\", "+"2") if err != nil { continue } - - return block_info.(map[string]interface{}), hash, nil + + return block_info, hash, nil } } - -func (c Caller) run() { +func (c *Caller) run() { // Outer loop runs forever, inner loop only runs if set active // Cycle counter var x int for { - // If run is off, stop running - if !c.index.run { + if c.NeedsExit() { return } // Check for emergency calls from the miner select { - case height := <- c.e_chan: + case height := <-c.e_chan: //Run the emergency call out, hash, err := c.get_block_info_for_height(height) if err != nil { return } - co := &Call_Output{height: height, hash: hash, content: out} - go read_block(c.index, co) + co := &Call_Output{height: height, hash: hash, block_content: out} + go c.read_block(c.index, co) default: } @@ -321,21 +334,21 @@ func (c Caller) run() { // Every 200 cycles check if ahead by 400, if so, wait until caught up to 200 if x >= 200 { x = 0 - if c.index.call_counter.check() > c.index.mine_counter.check() + 400*c.index.direction { - for c.index.call_counter.check() > c.index.mine_counter.check() + 200*c.index.direction{ + if c.index.call_counter.check() > c.index.mine_counter.check()+400*c.index.direction { + for c.index.call_counter.check() > c.index.mine_counter.check()+200*c.index.direction { // Check again for emergency calls select { - case height := <- c.e_chan: + case height := <-c.e_chan: //Run the emergency call out, hash, err := c.get_block_info_for_height(height) if err != nil { return } - co := &Call_Output{height: height, hash: hash, content: out} - go read_block(c.index, co) - + co := &Call_Output{height: height, hash: hash, block_content: out} + go c.read_block(c.index, co) + default: } @@ -358,8 +371,8 @@ func (c Caller) run() { if err != nil { return } - co := &Call_Output{height: c.current_pull, hash: hash, content: out} - go read_block(c.index, co) + co := &Call_Output{height: c.current_pull, hash: hash, block_content: out} + go c.read_block(c.index, co) x++ } @@ -367,26 +380,25 @@ func (c Caller) run() { // ~~~READER~~~ -func read_block(index *Index, input *Call_Output) { +func (c *Caller) read_block(index *Index, input *Call_Output) { fmt.Println("READ FOR ", input.height) block_output := Read_Block{ height: input.height, } - + // Read the block // This should probably be changed so that it doesn't return a completely new Read_Block, and just modifies the current. - if input != nil { - block_output = p_get_all_P2WSH(input.height, input.content.(map[string]interface{}), index) + if input != nil && input.block_content != nil { + block_output = p_get_all_P2WSH(input.height, input.block_content, index) } - block_output.hash = input.hash - + // Wait your turn, then mine for { // If run is off, stop running - if !index.run { + if c.NeedsExit() { return } @@ -398,7 +410,7 @@ func read_block(index *Index, input *Call_Output) { switch { case ch*dir < block_output.height*dir: // Wait - time.Sleep(5*time.Millisecond) + time.Sleep(5 * time.Millisecond) continue case ch*dir == block_output.height*dir: // My turn @@ -412,78 +424,114 @@ func read_block(index *Index, input *Call_Output) { } } -func p_get_all_P2WSH(height int, block_json map[string]interface{}, index *Index) Read_Block { +type HexKey [32]byte + +// MarshalJSON serializes ByteArray to hex +func (s *HexKey) UnmarshalJSON(data []byte) error { + var x string + err := json.Unmarshal(data, &x) + if err == nil { + str, e := hex.DecodeString(x) + copy(s[:], []byte(str)) + err = e + } + return err +} + +type SPubKey struct { + Type string `json:"type"` + Hex HexKey `json:"hex"` +} +type Output struct { + ScriptPubKey SPubKey `json:"scriptPubKey"` +} +type Transaction struct { + Vout []Output `json:"vout"` +} + +type Block struct { + Tx []Transaction `json:"tx"` +} +func p_get_all_P2WSH(height int, block_json *Block, index *Index) Read_Block { add_array := [][4]string{} - txes := block_json["tx"].([]interface{}) + txes := block_json.Tx // For each TX... for x := range txes { // ...Check all outputs for P2WSH - tx_info := txes[x].(map[string]interface{}) - vout := tx_info["vout"].([]interface{}) + vout := txes[x].Vout for i := range vout { - this_out := vout[i].(map[string]interface{}) - - // If it has a scriptPubKey - if this_out["scriptPubKey"] != nil { - scriptPubKey := this_out["scriptPubKey"].(map[string]interface{}) - - // If it has type - if scriptPubKey["type"] != nil { - my_type := scriptPubKey["type"].(string) - - // If type is "witness_v0_scripthash" - if my_type == "witness_v0_scripthash" { - - // Pull the hex - if scriptPubKey["hex"] != nil { - - // Setup the storage height - s_height := height - if index.direction == -1 { - s_height += 50000000 - } - hex := fmt.Sprintf("%v", scriptPubKey["hex"]) - ro := [4]string{ - strings.ToUpper(hex[4:]), - fmt.Sprintf("%08d", s_height), - fmt.Sprintf("%04d", x), - fmt.Sprintf("%04d", i), - } - add_array = append(add_array, ro) - } + this_out := vout[i] + + scriptPubKey := this_out.ScriptPubKey + + // If it has type + if scriptPubKey.Type != "" { + my_type := scriptPubKey.Type + + // If type is "witness_v0_scripthash" + if my_type == "witness_v0_scripthash" { + + // Setup the storage height + s_height := height + if index.direction == -1 { + s_height += 50000000 } + hex := fmt.Sprintf("%X", scriptPubKey.Hex) + ro := [4]string{ + hex, + fmt.Sprintf("%08d", s_height), + fmt.Sprintf("%04d", x), + fmt.Sprintf("%04d", i), + } + add_array = append(add_array, ro) } } + } - } + } output := Read_Block{ - height: height, + height: height, content: add_array, } + return output } - // ~~~MINER~~~ type Miner struct { - index *Index - in_chan chan Read_Block - e_chan chan int + // run number. This number must be equal to the one in index + // if the one in index increased, that means all Callers needs to exit + run_counter int + + index *Index + in_chan chan Read_Block + e_chan chan int out_chan chan string } -func (m Miner) push(input Read_Block) { +func (m *Miner) NeedsExit() bool { + // If main_run_counter is changed, stop running + m.index.run_counter_mutex.RLock() + main_run_counter := m.index.run_counter + m.index.run_counter_mutex.RUnlock() + if m.run_counter != main_run_counter { + return true + } + return false +} + +func (m *Miner) push(input Read_Block) { content := input.content // Format hash key - hash_key := "99999999"+fmt.Sprintf("%08d", input.height) + hash_key := "99999999" + fmt.Sprintf("%08d", input.height) // If unmining, erase the hash first, just in case. if m.index.direction == -1 { @@ -495,7 +543,6 @@ func (m Miner) push(input Read_Block) { } } - // Mine the commits for i := range content { miner_mine_commit_pulled(content[i][0], content[i][1]+content[i][2]+content[i][3]) @@ -505,7 +552,6 @@ func (m Miner) push(input Read_Block) { // Flush miner_mine_commit_pulled("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", "9999999999999999") - // If mining, store the hash last, just in case. if m.index.direction == 1 { err := commitsdb.Put([]byte(hash_key), []byte(input.hash), nil) @@ -514,35 +560,34 @@ func (m Miner) push(input Read_Block) { } } - fmt.Println("MINED:", m.index.mine_counter.h) // Increment counter m.index.mine_counter.tick() - + } -func (m Miner) run() { +func (m *Miner) run() { // Emergency counter x := time.Now() for { // If run is off, stop running - if !m.index.run { + if m.NeedsExit() { m.out_chan <- "Miner has shutdown" return } - + select { case inc := <-m.in_chan: // Mine the block m.push(inc) - + // Reset emergency counter x = time.Now() default: - if time.Since(x) > 30*time.Second{ + if time.Since(x) > 30*time.Second { fmt.Println("EMERGENCY PULL", m.index.mine_counter.check()) // Make an emergency pull request for the current height m.e_chan <- m.index.mine_counter.check() @@ -552,21 +597,20 @@ func (m Miner) run() { } } - func mine(config MiningConfig) string { fmt.Println("START MINING from/to/dir: ", config.start_height, config.target_height, config.direction) // Make the counters call_counter := &Counter{ - s: config.start_height, - h: config.start_height, - t: config.target_height, + s: config.start_height, + h: config.start_height, + t: config.target_height, dir: config.direction, } mine_counter := &Counter{ - s: config.start_height, - h: config.start_height, - t: config.target_height, + s: config.start_height, + h: config.start_height, + t: config.target_height, dir: config.direction, } @@ -574,17 +618,16 @@ func mine(config MiningConfig) string { index := &Index{ call_counter: call_counter, mine_counter: mine_counter, - to_miner: make(chan Read_Block), - run: true, - regtest: config.regtest, - direction: config.direction, + to_miner: make(chan Read_Block), + regtest: config.regtest, + direction: config.direction, } // Make the miner miner := &Miner{ - index: index, - in_chan: index.to_miner, - e_chan: make(chan int), + index: index, + in_chan: index.to_miner, + e_chan: make(chan int), out_chan: make(chan string), } go miner.run() @@ -592,7 +635,7 @@ func mine(config MiningConfig) string { // Make callers // 6 is just what worked beest for me, in the future it can be built to determine what's fastest I think. callers := []*Caller{} - for x:= 1; x <= 6; x++ { + for x := 1; x <= 6; x++ { callers = append(callers, make_caller(config.username, config.password, miner.e_chan, index)) } @@ -604,9 +647,11 @@ func mine(config MiningConfig) string { time_start := time.Now().UnixNano() for { index.mine_counter.Lock() - if index.mine_counter.h == index.mine_counter.t+index.mine_counter.dir{ + if index.mine_counter.h == index.mine_counter.t+index.mine_counter.dir { index.mine_counter.Unlock() - index.run = false + + index.ForceExit() + break } index.mine_counter.Unlock() @@ -617,7 +662,9 @@ func mine(config MiningConfig) string { log.Println("miner command: ", cmd) switch cmd { case "shutdown": - index.run = false + + index.ForceExit() + log.Println("Miner shutdown initiated") resp := <-miner.out_chan log.Println(resp) @@ -625,16 +672,14 @@ func mine(config MiningConfig) string { return "interrupt" } default: - time.Sleep(100*time.Millisecond) + time.Sleep(100 * time.Millisecond) } } fmt.Println("DONE MINING to: ", index.mine_counter.t) - log.Println("DONE MINING; TIME:", float64(time.Now().UnixNano() - time_start)/float64(1000000000), "seconds") + log.Println("DONE MINING; TIME:", float64(time.Now().UnixNano()-time_start)/float64(1000000000), "seconds") return "completed" -} - - +} //-------------------------------------------------------- @@ -691,7 +736,7 @@ func miner_start() { } // Pull the current BTC height - result, err := make_bitcoin_call(http_client, "getblockcount", "") + result, err := make_bitcoin_other_call(http_client, "getblockcount", "") // If error, go to next cycle if err != nil || result == nil { @@ -699,7 +744,7 @@ func miner_start() { } target_height := int(result.(float64)) - + // If caught up, skip this cycle // This should really also be comparing the block hashes, but this works for now. // !!! - Reorgs that reorg to the same height won't be caught until the next block comes in. - !!! @@ -708,7 +753,7 @@ func miner_start() { } fmt.Println("CURR: ", curr_height, ", TARGET: ", target_height) - + // Set reorg check lowest lowest := 481824 if u_config.regtest { @@ -743,12 +788,12 @@ func miner_start() { } mConfig := MiningConfig{ - username: u_config.username, - password: u_config.password, - start_height: curr_height, + username: u_config.username, + password: u_config.password, + start_height: curr_height, target_height: target_height, - direction: mine_dir, - regtest: u_config.regtest, + direction: mine_dir, + regtest: u_config.regtest, } result = mine(mConfig) @@ -760,4 +805,4 @@ func miner_start() { } -} \ No newline at end of file +} -- 2.25.1