Building a Solana wallet tracker (Telegram bot) with Helius

FESS...JsTo
30 Apr 2023
77

Hey everyone. Recently, I have been quite active on Solana: AMM with Tensor, weekly drops from DRiP, NFT trading, etc.
My biggest pain for the last few weeks was the fact that Phantom notification were unreliable and not always sufficient. So I decided to build a simple Telegram bot to track any transactions happening with my wallets. You can try it here: https://t.me/solana_notify_bot
Main features:

  • Up to 5 wallets: Add, delete & display wallets
  • Instant transaction notifications with concise summaries
  • Compressed NFTs support

Here I want to briefly describe the process of building the bot and explain some of the architecture choices that I had to make.
First, I used Python with Flask and MongoDB. I deployed everything on a free-tier AWS EC2.
The whole application has two main parts:

  1. Telegram bot
  2. a server listening to Helius webhook events and sending messages to Telegram.

! Disclaimer:

  • I am not a developer. Coding is my hobby. My code can be simplistic and not well organized.
  • I use ChatGPT a lot to help me with basic functionality


Telegram bot development:

Initially, the bot functionality was quite simple: Adding wallets by sending your wallet address, Deleting wallets, Showing already added wallets. Having that in mind, I asked ChatGPT to help me design the bot.
Here I met the first issue - ChatGPT did not know about updates to Telegram APIs after 2021, so I had to install an older version of Telegram library to ensure that I can continue using it.


Adding wallet:

My design requirement were:

  • Check if input looks like a valid Solana address
def is_solana_wallet_address(address):
    base58_chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
    address_length = [32, 44]

    if len(address) < address_length[0]:
        return False

    if len(address) > address_length[1]:
        return False

    for char in address:
        if char not in base58_chars:
            return False

    return True
  • Check how many transactions this wallet is doing per day (I didn’t want to support wallets doing 1,000s of transactions). Currently, I get the last 100 transactions from Helius. Then, I check the difference between now and the oldest transaction. Then I calculate how many transaction per day this wallet sends based on this data. (Example: right now is 2:00pm Sunday. The oldest of 100 transactions was send on 2:00pm Wednesday. This would mean that this wallet is doing 25 tx per day).
def check_wallet_transactions(wallet):
    try:
        url = f'<https://api.helius.xyz/v0/addresses/{wallet}/raw-transactions?api-key={HELIUS_KEY}>'
        r = requests.get(url)
        j = r.json()
        if len(j) < 10:
            return True, 0
        first_date = datetime.utcfromtimestamp(j[-1]['blockTime'])
        current_date = datetime.now()
        num_txs = len(j)
        delta = (current_date - first_date).total_seconds()
        av_per_day = num_txs / delta * 86400
        if av_per_day > 50:
            return False, av_per_day
        else:
            return True, av_per_day
    except:
        logging.info('ERROR checking wallet txs')
        return True, 0
  • Check how many wallets this user has already added (to prevent spammers):
def wallet_count_for_user(user_id: int) -> int:
    print('start wallet count')
    wallet_count = wallets_collection.count_documents({"user_id": str(user_id), "status": "active"})
    return wallet_count


Then, to add new wallets, I needed to use Helius API for editing webhooks. I would first check if someone has already added this wallet before, and if not I would send a put request to update addresses (you can learn more about Helius webhooks here):

def add_webhook(user_id, user_wallet, webhook_id, addresses):
    url = f"<https://api.helius.xyz/v0/webhooks/{webhook_id}?api-key={HELIUS_KEY}>"
    if user_wallet in addresses:
        logging.info('existing wallet, returning true')
        return True
    addresses.append(user_wallet)
    data = {
        "webhookURL": HELIUS_WEBHOOK,
        "accountAddresses": addresses,
        "transactionTypes":["Any"],
        "webhookType": "enhanced",
    }
    r = requests.put(url, json=data)
    if r.status_code == 200:
        return True
    else:
        return False

Here is the final version of the main function:

def add_wallet_finish(update: Update, context: CallbackContext) -> int:
    reply_markup = back_button(update, context)
    wallet_address = update.message.text
    user_id = update.effective_user.id

    if not wallet_address:
        update.message.reply_text("Oops! Looks like you forgot the wallet address. Send it over so we can get things rolling! 📨", reply_markup=reply_markup)
        return

    # Validate the wallet address
    if not is_solana_wallet_address(wallet_address):
        update.message.reply_text("Uh-oh! That Solana wallet address seems a bit fishy. Double-check it and send a valid one, please! 🕵️‍♂️", reply_markup=reply_markup)
        return
    
    # Check # of transactions for the last day for this wallet
    check_res, check_num_tx = check_wallet_transactions(wallet_address)
    if not check_res:
        update.message.reply_text(f"Whoa, slow down Speedy Gonzales! 🏎️ We can only handle wallets with under 50 transactions per day. Your wallet's at {round(check_num_tx, 1)}. Let's pick another, shall we? 😉", reply_markup=reply_markup)
        return

    # Check how many wallets a user has. Limit to 5
    if wallet_count_for_user(user_id) >= 5:
        update.message.reply_text("Oops! You've reached the wallet limit! It seems you're quite the collector, but we can only handle up to 5 wallets per user. Time to make some tough choices! 😄", reply_markup=reply_markup)
        return

    # Load existing wallet data
    existing_wallet = wallets_collection.find_one(
        {
            "user_id": str(user_id),
            "address": wallet_address,
            "status": "active"
        })

    if existing_wallet:
        update.message.reply_text("Hey there, déjà vu! You've already added this wallet. Time for a different action, perhaps? 🔄", reply_markup=reply_markup)
    else:
        reply_markup = next(update, context)
        success, webhook_id, addresses = get_webhook()
        r_success = add_webhook(user_id, wallet_address, webhook_id, addresses)
        
        if (success) and (r_success):
            # insert main wallet
            main = {
                "user_id": str(user_id),
                "address": wallet_address,
                "datetime": datetime.now(),
                "status": 'active',
            }
            # insert all token addresses
            wallets_collection.insert_one(main)
                
            update.message.reply_text("Huzzah! Your wallet has been added with a flourish! 🎉 Now you can sit back, relax, and enjoy your Solana experience as I keep an eye on your transactions. What's your next grand plan?", reply_markup=reply_markup)
        else:
            update.message.reply_text("Bummer! We hit a snag while saving your wallet. Let's give it another whirl, shall we? 🔄", reply_markup=reply_markup)

    return ConversationHandler.END


Deleting wallets

Here, the design requirement were simpler:

  • Check if this wallet is added by multiple users. If so, then we shouldn’t delete this wallet from the webhook. We only should delete the database entry connecting current user to this wallet

Here is the final code for the main delete function:

def delete_wallet_finish(update: Update, context: CallbackContext) -> int:
    reply_markup = next(update, context)
    wallet_address = update.message.text
    user_id = update.effective_user.id

    # check if other users have this wallet
    wallets_exist = wallets_collection.find(
        {
            "address": wallet_address,
            "status": "active"
        })
    r_success = True
    if len(list(wallets_exist)) == 1:
        logging.info('deleting unique address')
        success, webhook_id, addresses = get_webhook()
        r_success = delete_webhook(user_id, wallet_address, webhook_id, addresses)
    else:
        logging.info('address not unique, not deleting')

    # delete from DB
    reply_markup = back_button(update, context)
    if r_success:
        result = wallets_collection.delete_one({"user_id": str(user_id), "address": wallet_address})
        if result.deleted_count == 0:
            update.message.reply_text("Hmm, that wallet's either missing or not yours. Let's try something else, okay? 🕵️‍♀️", reply_markup=reply_markup)
        else:
            update.message.reply_text("Poof! Your wallet has vanished into thin air! Now, what other adventures await? ✨", reply_markup=reply_markup)
    else:
        update.message.reply_text("Yikes, we couldn't delete the wallet. Don't worry, we'll get it next time! Try again, please. 🔄", reply_markup=reply_markup)

        return ConversationHandler.END


Showing wallets:

Nothing complicated here, I only needed to get wallets for this user from the db:

def show_wallets(update: Update, context: CallbackContext) -> None:
    reply_markup = next(update, context)
    user_id = update.effective_user.id

    user_wallets = list(wallets_collection.find(
        {
            "user_id": str(user_id),
            "status": "active"
        }))
    print(len(user_wallets))
    if len(user_wallets) == 0:
        update.callback_query.answer()
        update.callback_query.edit_message_text("Whoa, no wallets here! Let's add some, or pick another action to make things exciting! 🎢", reply_markup=reply_markup)
    else:
        wallet_list = "\\n".join([wallet["address"] for wallet in user_wallets])
        update.callback_query.answer()
        update.callback_query.edit_message_text(f"Feast your eyes upon your wallet collection! 🎩\\n\\n{wallet_list}\\n\\nNow, what's your next move, my friend? 🤔", reply_markup=reply_markup)


Server to listen to Helius webhook events

My design requirement were relatively simple:

  • Listen to all webhook events
  • For each event identify relevant users
  • Create simple message describing the event by using Helius description
  • Check if this transaction involved NFTs or compressed NFTs. If yes - get an image to send together with the message.

My final code is a bit messy, but it is working for now.

Main function:

def handle_webhook():
    # Extract data from incoming request
    data = request.json
    # Create the message
    messages = create_message(data)

    for message in messages:
		# log the message into DB for debugging
        db_entry = {
            "user": message['user'],
            "message": message['text'],
            "datetime": datetime.now()
        }
        db.messages.insert_one(db_entry)

        logging.info(message)
		# Check if this message has an image. If yes - use a different function
        if len(message['image']) > 0:
            try:
                send_image_to_user(BOT_TOKEN, message['user'], message['text'], message['image'])
            except Exception as e:
                logging.info(e)
                send_message_to_user(BOT_TOKEN, message['user'], message['text'])    
        else:
            send_message_to_user(BOT_TOKEN, message['user'], message['text'])

    logging.info('ok event')
    return 'OK'


Telegram functions to send messages and images

def send_message_to_user(bot_token, user_id, message):
    request = Request(con_pool_size=8)
    bot = Bot(bot_token, request=request)
    bot.send_message(
        chat_id=user_id,
        text=message,
        parse_mode="Markdown",
        disable_web_page_preview=True)

def send_image_to_user(bot_token, user_id, message, image_url):
    request = Request(con_pool_size=8)
    bot = Bot(bot_token, request=request)
    image_bytes = get_image(image_url)
    bot.send_photo(
        chat_id=user_id,
        photo=image_bytes,
        caption=message,
        parse_mode="Markdown")


Create message to send

def create_message(data):
    tx_type = data[0]['type'].replace("_", " ")
    tx = data[0]['signature']
    source = data[0]['source']
    description = data[0]['description']
		
	# get all accounts involved into this tx
    accounts = []
    for inst in data[0]["instructions"]:
        accounts = accounts + inst["accounts"]
	# add token owner accounts for token transfers
    if len(data[0]['tokenTransfers']) > 0:
        for token in data[0]['tokenTransfers']:
            accounts.append(token['fromUserAccount'])
            accounts.append(token['toUserAccount'])
        accounts = list(set(accounts))

    # check if image exists
    image = check_image(data)
    
    # find all users with relevant wallets
    found_docs = list(wallets_collection.find(
        {
            "address": {"$in": accounts},
            "status": "active"
        }
    ))
    found_users = [i['user_id'] for i in found_docs]
    found_users = set(found_users)
    logging.info(found_users)
    messages = []
		
	# for each user: create a message and do some formatting for readability
    for user in found_users:
        if source != "SYSTEM_PROGRAM":
            message = f'*{tx_type}* on {source}'
        else:
            message = f'*{tx_type}*'
        if len(description) > 0:
            message = message + '\\n\\n' + data[0]['description']

            user_wallets = [i['address'] for i in found_docs if i['user_id']==user]
            for user_wallet in user_wallets:
                if user_wallet not in message:
                    continue
                formatted_user_wallet = user_wallet[:4] + '...' + user_wallet[-4:]
                message = message.replace(user_wallet, f'*YOUR WALLET* ({formatted_user_wallet})')

        formatted_text = re.sub(r'[A-Za-z0-9]{32,44}', format_wallet_address, message)
        formatted_text = formatted_text + f'\\n[Link to XRAY](<https://xray.helius.xyz/tx/{tx}>)'
        formatted_text = formatted_text.replace("#", "").replace("_", " ")
        messages.append({'user': user, 'text': formatted_text, 'image': image})
    return messages

def format_wallet_address(match_obj):
    wallet_address = match_obj.group(0)
    return wallet_address[:4] + "..." + wallet_address[-4:]


Check if this is an NFT related tx

def check_image(data):
    # show nft image
    token_mint = ''
    for token in data[0]['tokenTransfers']:
        if 'NonFungible' in token['tokenStandard']:
            token_mint = token['mint']
    # NFTs and pNFTs
    if len(token_mint) > 0:
        url = f"<https://api.helius.xyz/v0/token-metadata?api-key={HELIUS_KEY}>"
        nft_addresses = [token_mint]
        r_data = {
            "mintAccounts": nft_addresses,
            "includeOffChain": True,
            "disableCache": False,
        }

        r = requests.post(url=url, json=r_data)
        j = r.json()
        if 'metadata' not in j[0]['offChainMetadata']:
            return ''
        if 'image' not in j[0]['offChainMetadata']['metadata']:
            return ''
        image = j[0]['offChainMetadata']['metadata']['image']
        return image
    else:
        # check for compressed
        if 'compressed' in data[0]['events']:
            if 'assetId' in data[0]['events']['compressed'][0]:
                asset_id = data[0]['events']['compressed'][0]['assetId']
                try:
                    image = get_compressed_image(asset_id)
                    return image
                except:
                    return ''
        return ''

Handle compressed NFTs

def get_compressed_image(asset_id):
    # compressed
    url = f'<https://rpc.helius.xyz/?api-key={HELIUS_KEY}>'
    r_data = {
        "jsonrpc": "2.0",
        "id": "my-id",
        "method": "getAsset",
        "params": [
            asset_id
        ]
    }
    r = requests.post(url, json=r_data)
    url_meta = r.json()['result']['content']['json_uri']
    r = requests.get(url=url_meta)
    return r.json()['image']

Helper function to handle different image formats

def get_image(url):
    response = requests.get(url).content
    # Open the downloaded image using Pillow
    image = Image.open(BytesIO(response))
    image = image.convert('RGB')
    # Resize the image while maintaining its aspect ratio
    max_size = (800, 800)  # You can adjust the max_size to your desired dimensions
    image.thumbnail(max_size, Image.ANTIALIAS)
    image_bytes = BytesIO()
    image.save(image_bytes, 'JPEG', quality=85)
    image_bytes.seek(0)
    return image_bytes


Conclusion

In this blog post, we walked through the process of building a Solana wallet tracker using a Telegram bot and Helius, which aimed to solve the problem of unreliable notifications from the Phantom wallet.
I wanted to write this post to highlight how easy it is to build useful tools on Solana: Helius and many other teams on Solana build amazing infrastructure to lower the entry barrier.

I would like to encourage everyone to start BUILDING and sharing their tools with Solana community.

Write & Read to Earn with BULB

Learn More

Enjoy this blog? Subscribe to kiryl.sol

7 Comments

B
No comments yet.
Most relevant comments are displayed, so some may have been filtered out.