General project design question when creating a larger python project that deals with data storage / analysis

As the title states this is a pretty large project so I’ll try and condense the ideas and important code snippets down as much as possible.

What I’m Trying to Accomplish : This project scans the stock market & creates a local database (in csv files) of data. This database is updated every 5 minutes and at the end of every day. This is done in order to make latency low while allowing scanning to be done multiple times over without having to re-load data into the cache from my provider.

As a successful trader I’m attempting to make my scanning automated & create analysis that I can send off into other formats (discord, email, web viewer etc…) from this data.

My problem : I’m stuck as to how I can make the analysis process occur quickly, I’ve been looking into multiple design patterns & haven’t thought of a clean way to implement my analysis while leaving room for expansion.

What I currently have :

I have a data_loader class that has methods to fetch data in a easy & fast way, this task is scheduled in my main.py to occur periodically. (I know singleton pattern, couldn’t do this any better way)

import asyncio
import io
import datetime
import os
from aiohttp.client import ClientSession
from apscheduler.schedulers.asyncio import AsyncIOScheduler

import pandas as pd
from pandas import DataFrame
from polygon import RESTClient
import matplotlib.pyplot as plt
import common.constants as constants
import aiohttp
from os.path import exists
import concurrent.futures
class DataLoader:
##   

    __instance = None

    @staticmethod
    def get_instance():
        if DataLoader.__instance == None:
            __instance = DataLoader()
        return DataLoader.__instance
        

    def __init__(self):
        if DataLoader.__instance != None:
            raise Exception("DataLoader cannot be isntantiated more than once")
        else:
            self.key = "sa8LofcLVPVDxJGt1kehw1if1jjjVgxz"
            DataLoader.__instance = self

    def threaded_to_csv(self, dict: dict[str,DataFrame], intraday=True):
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = [executor.submit(self.export_to_csv,df,ticker,intraday=intraday) for ticker,df in dict.items()]

            for f in concurrent.futures.as_completed(results):
                print(f"Ticker {f.result()} saved to csv")
    def threaded_from_csv(self, ticker_list: list[str], intraday=True) -> dict[str,DataFrame]:
        
        df_dict = {}
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = [executor.submit(self.import_from_csv,ticker,intraday=intraday) for ticker in ticker_list]

            for f in concurrent.futures.as_completed(results):
                ticker = list(f.result().keys())[0]
                df = f.result()[ticker]

                df_dict[ticker] = df

                print(f"{ticker} loaded \n {df}")
        
        return df_dict

    def export_to_csv(self, df: DataFrame, ticker, intraday=True):
        path = r"D:\\Trading Algo\\Python TheStrat\\stockdata\\3min\\{}.csv" if intraday else r"D:\\Trading Algo\\Python TheStrat\\stockdata\\1D\\{}.csv"
        df.to_csv(path.format(ticker))
        return ticker

    def import_from_csv(self, ticker: str, intraday=True) -> dict[str,DataFrame]:
        path = r"D:\\Trading Algo\\Python TheStrat\\stockdata\\3min\\{}.csv" if intraday else r"D:\\Trading Algo\\Python TheStrat\\stockdata\\1D\\{}.csv"
        df = pd.read_csv(path.format(ticker))

        fixed_time = pd.to_datetime(df['Date'])
        df['Date'] = fixed_time
        df.set_index('Date',inplace=True)

        dict = {ticker: df}
        return dict  
    ##

    async def load_data_multi(self, tickers: list, intraday=True, returnDataframes=False) -> dict:
        
        timeframe = "minute" if intraday else "day"
        aggregation = 5 if intraday else 1
        time_delta = 5 if intraday else 100
        from_ = (datetime.datetime.now()-datetime.timedelta(time_delta)).strftime(constants.DATE_FORMAT)
        to_ = datetime.datetime.now().strftime(constants.DATE_FORMAT)

        tasks = []
        async with aiohttp.ClientSession() as session:
            for ticker in tickers:
                url = f'https://api.polygon.io/v2/aggs/ticker/{ticker}/range/{aggregation}/{timeframe}/{from_}/{to_}?apiKey={self.key}'
                tasks.append(self.async_query_url(session,url,ticker))
            json_responses = await asyncio.gather(*tasks)

        if(returnDataframes):
            return self.format_datas(json_responses,intraday=intraday)

        return json_responses


    async def update_data(self, intraday=True):
        
        old_df_dict = {}

        tasks = []
        path = r"D:\Trading Algo\Python TheStrat\stockdata\3min" if intraday else r"D:\Trading Algo\Python TheStrat\stockdata\1"
        dir_list = os.listdir(path)
        async with aiohttp.ClientSession() as session:
            for file in dir_list:
                ticker = file.split('.')[0]
                old_df = self.import_from_csv(ticker,intraday=intraday)[ticker]
                old_df_dict[ticker] = old_df
                timeframe = "minute" if intraday else "day"
                aggregation = 5 if intraday else 1
                from_ = pd.to_datetime(old_df.index[-1]).strftime(constants.DATE_FORMAT)
                to_ = datetime.datetime.now().strftime(constants.DATE_FORMAT)

                url = f'https://api.polygon.io/v2/aggs/ticker/{ticker}/range/{aggregation}/{timeframe}/{from_}/{to_}?apiKey={self.key}'
                tasks.append(self.async_query_url(session,url,ticker))
            json_responses = await asyncio.gather(*tasks)
        new_dataframes = self.format_datas(json_responses,intraday=intraday)

        for ticker in new_dataframes.keys():
            new_df = old_df_dict[ticker].append(new_dataframes[ticker], ignore_index=False)
            new_df = new_df[~new_df.index.duplicated(keep='first')]
            self.export_to_csv(new_df,ticker,intraday=intraday)

    
##Thread


    def format_datas(self,json_responses,intraday=True) -> dict[str,DataFrame]:
        """Format from JSON_RESPONSES to dict[Ticker,DataFrame]"""
        dataframe_dict = {}
        
        for resp in json_responses:
            ticker = resp['ticker']
            df = pd.DataFrame(resp['results'])
            fixed_time = pd.to_datetime(df['t'], unit='ms', origin='unix')
            df['t'] = fixed_time

            reformatted_data = {}
            reformatted_data['Date'] = pd.to_datetime(df['t'])
            reformatted_data['Open'] = df['o']
            reformatted_data['Close'] = df['c']
            reformatted_data['High'] = df['h']
            reformatted_data['Low'] = df['l']
            reformatted_data['Volume'] = df['v']



            reformatted_df = pd.DataFrame.from_dict(reformatted_data)
            reformatted_df.set_index('Date', inplace=True)

            if(intraday==True):
               reformatted_df = self.remove_ah(reformatted_df)      
            dataframe_dict[ticker] = reformatted_df
        
        return dataframe_dict
    def remove_ah(self, df: DataFrame) -> DataFrame:
        return df.between_time('14:30','20:59')

    async def async_query_url(self,session: ClientSession,url: str,ticker: str):
        async with session.get(url) as response:
            resp_json = await response.json()
        
        return resp_json

    def updateIntradayData(self):
        asyncio.run(self.update_data(intraday=True))
        print(f"Updated 5min data at {datetime.datetime.now()}")

    def updateDailyData(self):
        asyncio.run(self.update_data(intraday=True))
        print(f"Updated daily data at {datetime.datetime.now()}")

Now that data is all stored on my local drive I run into some questions.

  1. Should I be making a class that holds the dataframes (higher timeframe & lower timeframe) and has a function for me to run my analysis like this?

import pandas as pd
from pandas import DataFrame
from TheStrat.analysis.analysis import AnalysisStrategy
from TheStrat.data.data_loader import DataLoader
from common.aggregation_enum import Aggregation
from common.strat_enum import StratCandleType,CandleType
class StratAnalysis(AnalysisStrategy):

    def __init__(self,df_lower: str,df_upper: str, ticker: str):
        self.df_lower_path = df_lower
        self.df_upper_path = df_upper
        self.ticker = ticker
        self.analysis = {}
        self.dl = DataLoader.get_instance()
        self.continuity = "DOWN"

    def do_analysis(self) -> dict[Aggregation,DataFrame]:
        df_lower = self.load_lower_df
        df_upper = self.load_upper_df

        return self.full_analysis(df_lower,df_upper,self.ticker)


    def load_lower_df(self) -> DataFrame:
        return self.load_df(intraday=True)

    def load_upper_df(self) -> DataFrame:
        return self.load_df(intraday=False)

    def load_df(self,intraday=True) -> DataFrame:
        path = r"D:\\Trading Algo\\Python TheStrat\\stockdata\\3min\\{}.csv" if intraday else r"D:\\Trading Algo\\Python TheStrat\\stockdata\\1D\\{}.csv"
        df = pd.read_csv(path.format(self.ticker))

        fixed_time = pd.to_datetime(df['Date'])
        df['Date'] = fixed_time
        df.set_index('Date',inplace=True)

        return df

    def addStratAnalysis(df: DataFrame, analyzeAmount: int) -> DataFrame:
        
        if(len(df.index) < analyzeAmount):
            raise IndexError(f"Strat analysis needs a longer dataframe (Length: {len(df.index)}")
        
        prev_high = 0.0
        prev_low = 0.0
        stratcandletype_dict = []

        df_length = len(df.index)-analyzeAmount-1
        while df_length != 0:
            stratcandletype_dict.append('NaN')
            df_length-=1

        analyzeAmount+=1


        for (index,open,close,high,low,volume,candleType) in df.tail(analyzeAmount).itertuples(name=None):

            if(prev_high == 0.0):
                prev_high = high
                prev_low = low
                stratcandletype_dict.append('NaN')
            else: 
                if(high > prev_high and low >= prev_low):
                    stratcandletype_dict.append(StratCandleType.two_up.value)
                elif(low < prev_low and high <= prev_high):
                    stratcandletype_dict.append(StratCandleType.two_down.value)
                elif(high > prev_high and low < prev_low):
                    if(close >= open):
                        stratcandletype_dict.append(StratCandleType.three_up.value)
                    elif(close < open):
                        stratcandletype_dict.append(StratCandleType.three_down.value)
                elif(high <= prev_high and low >= prev_low):    
                    stratcandletype_dict.append(StratCandleType.one.value)
                else: print("[ERROR] Could not identify candle pattern (something went super wrong) in strat.analyze()")
            analyzeAmount-=1
            prev_high = high
            prev_low = low
        df['StratCandleType'] = stratcandletype_dict

        return df

    def getSetups(self,df: DataFrame, ticker: str, agg): 
        
        analysis_dict = {"Ticker":[ticker]}
        strat_setup = ""
        analyzed_df = self.addStratAnalysis(df,3)
        for (index,open,close,high,low,volume,candleType,stratCandleType) in df.tail(3).itertuples(name=None):
            strat_setup+= stratCandleType + "-"
                
        analysis_dict[agg.value] = strat_setup[:-1] #remove last dash from setup string
        analysis_dict['CandleType'] = df['CandleType'][-1] #Get most recent candle formation
        all_setups = pd.DataFrame(analysis_dict)
        return all_setups

    def get_analysis(self,df: DataFrame, ticker:str, aggregations=Aggregation.lowerTimeframes()):
        dl = DataLoader()
        
        analyzed = []
        for agg in aggregations: #previous   

            resampled_df = dl.resample(df,ticker,agg)[ticker]
            continuity = "UP" if resampled_df['Close'].iloc[-1] > resampled_df['Open'].iloc[-1] else "DOWN"
            setups = self.getSetups(resampled_df,ticker,agg)
            analyzed.append({'Aggregation': agg.value,'CandleType': setups['CandleType'][0],'StratSetup': setups[agg.value][0],'Continuity': continuity})
        
        new_df = pd.DataFrame(analyzed)
        return new_df

    def full_analysis(self,df_lowertf: DataFrame, df_highertf: DataFrame,ticker:str):
        intra_analysis = self.get_analysis(df_lowertf,ticker)
        day_analysis = self.get_analysis(df_highertf,ticker,aggregations=Aggregation.higherTimeframes())

        return intra_analysis.append(day_analysis)

Or should I make the ‘AnalysisStrategy’ a class that takes in a data format that maps Ticker, DataFrame_LowerTF, DataFrame_HigherTF and spits out an analysis of all the stocks in my filesystem?

This would get rid of the need for 100s of analysis objects but makes the code less readable in my opinion.

And even if I did figure out this implementation, lets say I wanted to do a different analysis like ‘stocks above their 50day rolling average’.

That analysis is going to have a MUCH different format of the returned information which is a case that I’d have to hard-code in every time I added a new analysis type.
For example the current analysis spits out a dataframe like such

  Aggregation CandleType StratSetup Continuity
0       15min     hammer   2u-2d-2d       DOWN
1       30min        NaN   2u-2u-2d       DOWN
2       60min        NaN   2d-2u-2d       DOWN
0          1D        NaN   2d-2d-2d       DOWN
1          1W        NaN    2d-2u-1       DOWN
2          1M     hammer   2d-2u-2d       DOWN

But if I were to do an analysis like talked about earlier it’d be something like

  Aggregation >50MA
0       15min    TRUE
1       30min    TRUE 
2       60min    FALSE    
0          1D    TRUE
1          1W    TRUE
2          1M   TRUE  

So what type of path could I take in structuring my code to make this as seamless as possible down the road when looking to add in new analysis

P.S I didn’t get to touch on a lot of things as I felt this was getting lengthy. If you read the whole thing I just want to say thanks in advance, getting help on this would mean the world to me

I think your answer should stem from this. It’s usually best to avoid hard-coding when feasible. If you already know your analysis methods are going to change in the future, develop data processing pipelines that can be passed different analysis methods.
You definitely don’t need to have one analysis object do everything either, for instance your DataLoader class has at least one method you had to recreate in your StratAnalysis class.

While not a prerequisite, I’ve personally found that functional programming concepts offer more flexibility in data processing and analysis and a lot of libraries take advantage of this.

def pipeline(ticker: str, processing_strategy: Callable[[DataFrame], DataFrame],
             analysis_strategy: Callable[[DataFrame], DataFrame, **kwargs) -> DataFrame:
    
    from_dt, to_dt = kwargs.get('from_dt', datetime.date.min()), kwargs.get('to_dt', datetime.now())
    df = Dataloader.load_df_from_csv(ticker, from_dt, to_dt)
    processed_data = processing_strategy(df)
    additional_processing = kwargs.get('additional_processing', [])
    if additional_processing:
        for processing_method in additional_processing:
            processed_data = processing_method(processed_data)
    analyzed_data = analysis_strategy(processed_data)
    return analyzed_data
candle_data = pipeline('TSLA', processing_method_a, candle_strategy)
pen_for_data = pipeline('TSLA', processing_method_b,
                        pennant_strategy,
                        additional_processing=[pennant_forex_strategy]
                       )

By no means is this the only or best way (there are data science libraries with pipeline tools for these purposes) but the point I’m trying to make is to keep your patterns flexible if you’ll be using them a lot and in modified ways. That’s my take on making things as seamless as possible anyway.

1 Like

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.