HoneyComb - Processor Update

in #dev2 years ago

Brain Melting Code Review

Novemblog 12

Explosion ~ Expansion

HoneyComb - block processor

This is... well. A block processor. It gets blocks from an API and matches any transactions to smart contracts.
There is a fair amount going on here... and I guess it's best to break it into a few pieces.

const fetch = require("node-fetch");
const { TXID } = require("./index");
module.exports = function (
  client,
  nextBlock = 1,
  prefix = "dlux_",
  account = "null",
  vOpsRequired = false
) {
  var onCustomJsonOperation = {}; // Stores the function to be run for each operation id.
  var onOperation = {};

  var onNewBlock = function () {};
  var onStreamingStart = function () {};
  var behind = 0;
  var head_block;
  var isStreaming;
  var vOps = false;
  var stream;
  var blocks = {
    processing: 0,
    completed: nextBlock,
    stop: function () {
      blocks.clean(1);
    },
    ensure: function (last) {
      setTimeout(() => {
        if (!blocks.processing && blocks.completed == last) {
          getBlockNumber(nextBlock);
          if (!(last % 3))
            getHeadOrIrreversibleBlockNumber(function (result) {
              if (nextBlock < result - 5) {
                behind = result - nextBlock;
                beginBlockComputing();
              } else if (!isStreaming) {
                beginBlockStreaming();
              }
            });
        }
      }, 1000);
    },
    clean: function (stop = false) {
      var blockNums = Object.keys(blocks);
      for (var i = 0; i < blockNums.length; i++) {
        if (
          (parseInt(blockNums[i]) && parseInt(blockNums[i]) < nextBlock - 1) ||
          (stop && parseInt(blockNums[i]))
        ) {
          delete blocks[blockNums[i]];
          if (vOps) delete blocks[blockNums.v[i]];
        }
      }
      var blockNums = Object.keys(blocks.v);
      for (var i = 0; i < blockNums.length; i++) {
        if (
          (parseInt(blockNums[i]) && parseInt(blockNums[i]) < nextBlock - 1) ||
          (stop && parseInt(blockNums[i]))
        ) {
          delete blocks.v[blockNums[i]];
        }
      }
    },
    v: {},
    requests: {
      last_range: 0,
      last_block: 0,
    },
    manage: function (block_num, vOp = false) {
      if (!head_block || block_num > head_block || !(block_num % 100))
        getHeadOrIrreversibleBlockNumber(function (result) {
          head_block = result;
          behind = result - nextBlock;
        });
      if (
        !(block_num % 100) &&
        head_block > blocks.requests.last_range + 200 &&
        Object.keys(blocks).length < 1000
      ) {
        gbr(blocks.requests.last_range + 1, 100, 0);
      }
      if (
        !(block_num % 100) &&
        head_block - blocks.requests.last_range + 1 > 100
      ) {
        gbr(blocks.requests.last_range + 1, 100, 0);
      }
      if (!(block_num % 100)) blocks.clean();
      if (blocks.processing) {
        setTimeout(() => {
          blocks.manage(block_num);
        }, 100);
        blocks.clean();
      } else if (vOps && !blocks.v[block_num]) return;
      else if (vOp && !blocks[block_num]) return;
      else if (blocks[block_num] && block_num == nextBlock) {
        blocks.processing = nextBlock;
        processBlock(blocks[block_num]).then(() => {
          nextBlock = block_num + 1;
          blocks.completed = blocks.processing;
          blocks.processing = 0;
          delete blocks[block_num];
          if (blocks[nextBlock]) blocks.manage(nextBlock);
        });
      } else if (block_num > nextBlock) {
        if (blocks[nextBlock]) {
          processBlock(blocks[nextBlock]).then(() => {
            delete blocks[nextBlock];
            nextBlock++;
            blocks.completed = blocks.processing;
            blocks.processing = 0;
            if (blocks[nextBlock]) blocks.manage(nextBlock);
          });
        } else if (!blocks[nextBlock]) {
          getBlock(nextBlock);
        }
        if (!isStreaming || behind < 5) {
          getHeadOrIrreversibleBlockNumber(function (result) {
            head_block = result;
            if (nextBlock < result - 3) {
              behind = result - nextBlock;
              beginBlockComputing();
            } else if (!isStreaming) {
              beginBlockStreaming();
            }
          });
        }
      }
      blocks.ensure(block_num);
    },
  };

I've removede catch blocks from all of this to make it more compact... But it is just the same as it is in the file. Define some scoped variables. Blocks is a small block buffer, that will hold up to ~1000 blocks to be processed. Due to the nature of all of the processing is single threaded. One block at a time is processed, and each transaction in the block is processed in the same way. clean() removes all the blocks that have been processed from the buffer. ensure() will try to catch up to live if the block stream drops out for any reason. v{} will hold virtual operations if any virtual operations need to be looked at. manage() will process a block and based on the block number, a few house keeping tasks.

  var stopping = false;

  function getHeadOrIrreversibleBlockNumber(callback) {
    client.database.getDynamicGlobalProperties().then(function (result) {
      callback(result.last_irreversible_block_num);
    });
  }

  function getVops(bn) {
    return new Promise((resolve, reject) => {
      fetch(client.currentAddress, {
        body: `{"jsonrpc":"2.0", "method":"condenser_api.get_ops_in_block", "params":[${bn},true], "id":1}`,
        headers: {
          "Content-Type": "application/x-www-form-urlencoded",
          "User-Agent": `${prefix}HoneyComb/${account}`,
        },
        method: "POST",
      })
        .then((res) => res.json())
        .then((json) => {
          if (!json.result) {
            blocks.v[bn] = [];
            blocks.manage(bn, true);
          } else {
            blocks.v[bn] = json.result;
            blocks.manage(bn, true);
          }
        })
    });
  }

The lack of a get virtual ops in range call really cramps the style of this processor for a few things... This might be some work HAF is custom made to handle.

  function isAtRealTime(computeBlock) {
    getHeadOrIrreversibleBlockNumber(function (result) {
      head_block = result;
      if (nextBlock >= result) {
        beginBlockStreaming();
      } else {
        behind = result - nextBlock;
        computeBlock();
      }
    });
  }

How to shift between get block range calls, and d-hives multiple API block stream call.

  function getBlockNumber(bln) {
    client.database
      .getBlock(bln)
      .then((result) => {
        if (result) {
          blocks[parseInt(result.block_id.slice(0, 8), 16)] = result;
          blocks.manage(bln);
        }
      })
  }

Here we can see how a block get's loaded into the block buffer. This call only comes from blocks.ensure()

  function getBlock(bn) {
    if (behind && !stopping) gbr(bn, behind > 100 ? 100 : behind, 0);
    if (stopping) stream = undefined;
    else if (!stopping) gb(bn, 0);
  }

This call is just a traffic cop to keep things moving depending on conditions.

  function gb(bln, at) { //getBlock( block number, attempt)
    if (blocks[bln]) {
      blocks.manage(bln);
      return;
    } else if (blocks.requests.last_block == bln) return;
    if (bln < TXID.saveNumber + 50) {
      blocks.requests.last_block = bln;
      client.database
        .getBlock(bln)
        .then((result) => {
          blocks[parseInt(result.block_id.slice(0, 8), 16)] = result;
          blocks.manage(bln);
        })
        .catch((err) => {
          if (at < 3) {
            setTimeout(() => {
              gbr(bln, at + 1);
            }, Math.pow(10, at + 1));
          } else {
            console.log("Get block attempt:", at, client.currentAddress);
          }
        });
    } else {
      setTimeout(() => {
        gb(bln, at + 1);
      }, Math.pow(10, at + 1));
    }
  }

Tries to prevent extra API calls, won't make single block API calls if the request differs from the current processed block by more than 50.

  function gbr(bln, count, at) {
    if (!at && blocks.requests.last_range > bln) return; //prevents double API calls, unless it's a reattempt
    console.log({ bln, count, at });
    if (!at) blocks.requests.last_range = bln + count - 1; //doesn't update the buffer get head for reattempts
    fetch(client.currentAddress, {
      body: `{"jsonrpc":"2.0", "method":"block_api.get_block_range", "params":{"starting_block_num": ${bln}, "count": ${count}}, "id":1}`,
      headers: {
        "Content-Type": "application/x-www-form-urlencoded",
        "User-Agent": `${prefix}HoneyComb/${account}`, //tattles on a user for heavy API calls, hopefully to prevent DDoS Bans
      },
      method: "POST",
    })
      .then((res) => res.json())
      .then((result) => {
        try {
          var Blocks = result.result.blocks;
          for (var i = 0; i < Blocks.length; i++) { //range call blocks are in a slightly different configuration and need to be put into the streaming format
            const bkn = parseInt(Blocks[i].block_id.slice(0, 8), 16);
            for (var j = 0; j < Blocks[i].transactions.length; j++) {
              Blocks[i].transactions[j].block_num = bkn;
              Blocks[i].transactions[j].transaction_id =
                Blocks[i].transaction_ids[j];
              Blocks[i].transactions[j].transaction_num = j;
              var ops = [];
              for (
                var k = 0;
                k < Blocks[i].transactions[j].operations.length;
                k++
              ) {
                ops.push([
                  Blocks[i].transactions[j].operations[k].type.replace(
                    "_operation",
                    ""
                  ),
                  Blocks[i].transactions[j].operations[k].value,
                ]);
              }
              Blocks[i].transactions[j].operations = ops;
              blocks[bkn] = Blocks[i];
            }
          }
          blocks.manage(bln);
        } catch (e) { //exponential back off and retry attempter
          if (at < 3) {
            setTimeout(() => {
              gbr(bln, count, at + 1);
            }, Math.pow(10, at + 1));
          } 
        }
      })
      .catch((err) => {
        if (at < 3) {
          setTimeout(() => {
            gbr(bln, count, at + 1);
          }, Math.pow(10, at + 1));
        }
      });
  }

Range calls are probably the hardest to understand by looking at the code... Roughly 1 in 50 range calls fail to my node on the first attempt, as well as range blocks have a different structure than streamed blocks. These extra checks are mostly to deal with this.


  function beginBlockComputing() {
    var blockNum = nextBlock; // Helper variable to prevent race condition
    blocks.ensure(nextBlock);
    getBlock(blockNum);
  }

  function beginBlockStreaming() {
    isStreaming = true;
    onStreamingStart();
    stream = client.blockchain.getBlockStream();
    stream.on("data", function (Block) {
      var blockNum = parseInt(Block.block_id.slice(0, 8), 16);
      blocks[blockNum] = Block;
      blocks.requests.last_block = blockNum;
      blocks.requests.last_range = blockNum;
      blocks.manage(blockNum);
    });
    stream.on("end", function () {
      console.error(
        "Block stream ended unexpectedly. Restarting block computing."
      );
      beginBlockComputing();
      stream = undefined;
    });
    stream.on("error", function (err) {
      beginBlockComputing();
      stream = undefined;
    });
  }

Start and stop block streams, and keep the get block variables correct.

  function transactional(ops, i, pc, num, block, vops) {
    if (ops.length) {
      doOp(ops[i], [ops, i, pc, num, block, vops])
        .then((v) => {
          if (ops.length > i + 1) {
            transactional(v[0], v[1] + 1, v[2], v[3], v[4], v[5]);
          } else {
            onNewBlock(num, v, v[4].witness_signature, {
              timestamp: v[4].timestamp,
              block_id: v[4].block_id,
              block_number: num,
            })
              .then((r) => {
                pc[0](pc[2]);
              })
            // }
          }
        })
        .catch((e) => {
          pc[1](e);
        });
    } else if (parseInt(block.block_id.slice(0, 8), 16) != num) {
      pc[0]();
    } else {
      onNewBlock(num, pc, block.witness_signature, {
        timestamp: block.timestamp,
        block_id: block.block_id,
        block_number: num,
      })
        .then((r) => {
          r[0]();
        })
        .catch((e) => {
          pc[1](e);
        });
    }

Attaches header data to each transaction so the processor can be block agnostic when computing. block number, witness signature, timestamp... all have important uses in NFT creation, cron jobs, and DEX ordering. Transactional builds a promise chain that effectively gives each transaction a database lock. Transaction_ids let us have a way to verify hive transactions were processed on the layer 2.

    function doOp(op, pc) {
      return new Promise((resolve, reject) => {
        if (op.length == 4) {
          onCustomJsonOperation[op[0]](op[1], op[2], op[3], [
            resolve,
            reject,
            pc,
          ]);
        } else if (op.length == 2) {
          onOperation[op[0]](op[1], [resolve, reject, pc]);
        }
      });
    }

Calling the appropriate smart contracts by name. These can be triggered from any operation, such as a send, or a vote.

    function doVop(op, pc) {
      return new Promise((resolve, reject) => {
        console.log(op, pc);
        onVOperation[op[0]](op[1], [resolve, reject, pc]);
      });
    }
  }

Curently unused, but will do the same for virtual operations like DHF payout.


  function processBlock(Block, Pvops) {
    return new Promise((resolve, reject) => {
      var transactions = Block.transactions;
      let ops = [];
      if (parseInt(Block.block_id.slice(0, 8), 16) === nextBlock) {
        for (var i = 0; i < transactions.length; i++) {
          for (var j = 0; j < transactions[i].operations.length; j++) {
            var op = transactions[i].operations[j];
            if (op[0] === "custom_json") {
              //console.log('check')
              if (typeof onCustomJsonOperation[op[1].id] === "function") {
                var ip = JSON.parse(op[1].json),
                  from = op[1].required_posting_auths[0],
                  active = false;
                if (
                  typeof ip === "string" ||
                  typeof ip === "number" ||
                  Array.isArray(ip)
                )
                  ip = {};
                ip.transaction_id = transactions[i].transaction_id;
                ip.block_num = transactions[i].block_num;
                ip.timestamp = Block.timestamp;
                ip.prand = Block.witness_signature;
                if (!from) {
                  from = op[1].required_auths[0];
                  active = true;
                }
                ops.push([op[1].id, ip, from, active]); //onCustomJsonOperation[op[1].id](ip, from, active);
              }
            } else if (onOperation[op[0]] !== undefined) {
              op[1].transaction_id = transactions[i].transaction_id;
              op[1].block_num = transactions[i].block_num;
              op[1].timestamp = Block.timestamp;
              op[1].prand = Block.witness_signature;
              ops.push([op[0], op[1]]); //onOperation[op[0]](op[1]);
            }
          }
        }
        transactional(ops, 0, [resolve, reject], nextBlock, Block, Pvops);
      }
    });
  }

Breaking the block down to transactions.

  return {
    /*
          Determines a state update to be called when a new operation of the id
            operationId (with added prefix) is computed.
        */
    on: function (operationId, callback) {
      onCustomJsonOperation[prefix + operationId] = callback;
    },

    onOperation: function (type, callback) {
      onOperation[type] = callback;
    },

    onNoPrefix: function (operationId, callback) {
      onCustomJsonOperation[operationId] = callback;
    },

    /*
          Determines a state update to be called when a new block is computed.
        */
    onBlock: function (callback) {
      onNewBlock = callback;
    },

    start: function () {
      beginBlockComputing();
      isStreaming = false;
    },

    getCurrentBlockNumber: function () {
      return nextBlock;
    },

    isStreaming: function () {
      return isStreaming;
    },
    onStreamingStart: function (callback) {
      onStreamingStart = callback;
    },

    stop: function (callback) {
      if (isStreaming) {
        isStreaming = false;
        stopping = true;
        stream = undefined;
        blocks.stop();
        setTimeout(callback, 1000);
      } else {
        blocks.stop();
        stopping = true;
        stopCallback = callback;
      }
    },
  };
};

And finally, allowing the processor to be fed smart contracts and logic outside of the module.

Thanks for attending my TED talk.

Sort:  

🍕 PIZZA !

I gifted $PIZZA slices here:
(1/10) @darkflame tipped @disregardfiat (x1)

Learn more at https://hive.pizza!

@disregardfiat! The Hive.Pizza team manually upvoted your post.

Wow that is some impressive coding !PIZZA

i hope you dont mind.... I skipped to the end.
Im sure its great! lol
keep up the epic work