Repository
https://github.com/Juless89/steem-dashboard
Website
What is SteemChain?
SteemChain in an open source application to analyse transactions and operations from the STEEM blockchain. Store these into a MySQL database and visualise with charts and tables via the web. Looking to bring data analytics to STEEM like websites as blockchain.com do for Bitcoin.
Bug Fixes
Issue
The Blockchain scraper would randomly halt when using more than 64 threads. Never pinpointed the exact reason as block gathering threads would just halt randomly, sometimes after hours, without raising any exceptions and therefor took a while to fix.
Solution
Replaced the http libraries and related code.
import urllib.request
import urllib.parse
for
import urllib3
This solved the issue and also increased the performance by 10-20%.
New Features
Upload large batches of queries via .csv files
While scraping the Blockchain inserting the large amount of operations per day, on average 1 million daily the last couple of months, put a lot of stress on the MySQL server. After some extended research it appeared the fastest way to do this, was by writing all the rows into a .csv file and uploading this file directly into the database.
All operations are stored inside their respective buffer and every hour of data the buffer gets converted to a .csv file and uploaded into the MySQL db. The files are writen to /tmp and directly stored from there.
# add vote operation
def add_vote(self, voter, author, permlink, weight, timestamp, value=0):
query = {
"id": 'NULL',
"voter": voter,
"author": author,
"permlink": permlink,
"weight": weight,
"value": value,
"timestamp": timestamp,
}
self.buffer.append(query)
All rows are extracted from the buffer and stored inside a Dataframe object from pandas. The order is not preserved and have to put into the right order to insert into the database.
# Execute all stored sql queries at once
def dump(self, table):
# location to to file inside /tmp
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
path = base_dir + "/back-end/temp/" + table + ".csv"
# Order of values for each operation type
if table == 'api_votes':
columns = ['id', 'voter', 'author', 'permlink', 'weight', 'value', 'timestamp']
elif table == 'api_transfers':
columns = ['id', 'sender', 'receiver', 'amount', 'precision', 'nai', 'timestamp']
elif table == 'api_claim_rewards':
columns = ['id', 'account', 'reward_steem', 'reward_sbd', 'reward_vests', 'timestamp']
# create dataframe from buffered values
df = pd.DataFrame(self.buffer)
try:
# reorder colums in dataframe
df = df[columns]
# write data to csv file
df.to_csv(
path,
encoding='utf-8',
header = True,
doublequote = True,
sep=',', index=False
)
# upload csv file into db
self.insert_file_into_db(path, table)
except Exception:
pass
New parent class for all operation types
In order to easily scale to all operation types for the next update it was necessary to build a parent class from which each operation could inherit from. In addition it must be possible to track more than just the count of each operation depending on which operation it is. All shared code was put into a new class Operation. Each operation inherits from Operation and has to set the process_operation()
function. It has to set the data
dict with at least one variable count
and call self.counter.set_resolutions(hour, minute, **data)
.
import operation
class Transfers(operation.Operation):
def __init__(self, table, storage, lock, scraping=False):
operation.Operation.__init__(self, table, storage, lock, scraping)
def process_operation(self, operation):
sender = operation['value']['from']
receiver = operation['value']['to']
amount = operation['value']['amount']['amount']
precision = operation['value']['amount']['precision']
nai = operation['value']['amount']['nai']
self.db.add_transfer(
sender, receiver, amount, precision,
nai, self.timestamp)
# Allow for multiple resolutions
hour = self.timestamp.hour
minute = self.timestamp.minute
# collect data to store
steem = 0
sbd = 0
if nai == "@@000000021":
steem += float(amount)
elif nai == "@@000000013":
sbd += float(amount)
# data to be inserted into the db
data = {
"count": 1,
"steem": steem,
"sbd": sbd,
}
self.counter.set_resolutions(hour, minute, **data)
Dynamic SQL queries
The data dict allows for keeping track of multiple variables but also means the SQL query would be unique for each operation type. To allow for easy scaling the SQL query gets generated from the keys and values inside the data dict.
# Insert date, amount into table 'table'. Look if the record already
# exists, update if needed else add.
def insert_selection(self, timestamp, data, table):
# sql query used to insert data into the mysql database
# for 1 value
if len(data) == 1:
query = f"INSERT INTO `{table}` (`count`, `timestamp`)" \
" VALUES ('{}', '{}');".format(data['count'], timestamp)
# for multiple values
else:
first = f"INSERT INTO `{table}` "
second = ""
# sort through dict and construct sql query
count = 0
for key, value in data.items():
if count == 0:
first += f" (`{key}`"
second += f", `timestamp`) VALUES ('{value}'"
count += 1
else:
first += f", `{key}`"
second += f", '{value}'"
# query
query = first + second + f", '{timestamp}');"
Simpler worker thread synchronisation
There is now a global variable current_block
to keep track of the current block. Each worker thread uses this global variable to keep track of their progress relative to the overall progress to keep all worker threads in sync. The workers only read this variable while the sorter is the only thread that alters the variable.
global current_block
if self.num <= current_block + self.n * 10:
Smalls things and bug fixes
Some other small changes include a ETA for pulling in large amount of blocks and a neater start up message.
Next update
For the next update the idea is to expand to all operation types, decide which variables to keep track of, make all changes to the database and make the front-end more dynamic to display all different operation types.