跳转至

核心模块 API 参考

QKA 的核心功能模块,包含数据管理、回测引擎、策略基类和虚拟经纪商。

qka.Data

数据管理类,负责股票数据的获取、缓存和管理。

数据管理类

负责股票数据的获取、缓存和管理,支持多数据源、并发下载和自定义因子计算。

属性:

名称 类型 描述
symbols List[str]

股票代码列表

period str

数据周期,如 '1d'、'1m' 等

adjust str

复权方式,如 'qfq'、'hfq'、'bfq'

factor Callable

因子计算函数

source str

数据源,如 'akshare'、'qmt'

pool_size int

并发下载线程数

datadir Path

数据缓存目录

target_dir Path

目标存储目录

Source code in qka/core/data.py
class Data():
    """
    数据管理类

    负责股票数据的获取、缓存和管理,支持多数据源、并发下载和自定义因子计算。

    Attributes:
        symbols (List[str]): 股票代码列表
        period (str): 数据周期,如 '1d'、'1m' 等
        adjust (str): 复权方式,如 'qfq'、'hfq'、'bfq'
        factor (Callable): 因子计算函数
        source (str): 数据源,如 'akshare'、'qmt'
        pool_size (int): 并发下载线程数
        datadir (Path): 数据缓存目录
        target_dir (Path): 目标存储目录
    """

    def __init__(
        self, 
        symbols: Optional[List[str]] = None,
        period: str = '1d',
        adjust: str = 'qfq',
        factor: Callable[[pd.DataFrame], pd.DataFrame] = lambda df: df,
        source: str = 'akshare',
        pool_size: int = 10,
        datadir: Optional[Path] = None
    ):
        """
        初始化数据对象

        Args:
            symbols: [维度1] 标的,如 ['000001.SZ', '600000.SH']
            period: [维度2] 周期,如 '1m', '5m', '1d' 等
            factor: [维度3] 因子字典,key为因子名,value为因子函数
            source: [维度4] 数据来源 ('qmt', 'akshare')
            pool_size: 并发池大小
            datadir: 缓存根目录,默认为项目根目录下的 datadir
        """
        self.symbols = symbols or []
        self.period = period
        self.adjust = adjust
        self.factor = factor
        self.source = source
        self.pool_size = pool_size

        # 初始化缓存目录
        if datadir is None:
            # 默认使用当前工作目录下的 datadir
            self.datadir = Path.cwd() / "datadir"
        else:
            self.datadir = Path(datadir)

        self.datadir.mkdir(parents=True, exist_ok=True)

        self.target_dir = self.datadir / self.source / self.period / (self.adjust or "bfq")
        self.target_dir.mkdir(parents=True, exist_ok=True)

    def _download(self, symbol: str) -> Path:
        """
        下载单个股票的数据

        Args:
            symbol (str): 股票代码

        Returns:
            Path: 数据文件路径
        """
        path = self.target_dir / f"{symbol}.parquet"

        if path.exists():
             return path

        if self.source == 'akshare':
            df = self._get_from_akshare(symbol)
        else:
            df = pd.DataFrame()

        if len(df) == 0:
            return path

        table = pa.Table.from_pandas(df)
        pq.write_table(table, path)

        return path

    def get(self) -> dd.DataFrame:
        """
        获取历史数据

        并发下载所有股票数据,应用因子计算,并返回合并后的Dask DataFrame。

        Returns:
            dd.DataFrame: 合并后的股票数据,每只股票的列名格式为 {symbol}_{column}
        """
        # 准备缓存目录

        with ThreadPoolExecutor(max_workers=self.pool_size) as executor:
            # 提交下载任务
            futures = {
                executor.submit(self._download, symbol): symbol
                for symbol in self.symbols
            }

            # 添加tqdm进度条
            with tqdm(total=len(self.symbols), desc="下载数据") as pbar:
                for future in as_completed(futures):
                    symbol = futures[future]
                    pbar.update(1)
                    pbar.set_postfix_str(f"当前: {symbol}")

        dfs = []
        for symbol in self.symbols:
            df = dd.read_parquet(str(self.target_dir / f"{symbol}.parquet"))
            df = self.factor(df)
            column_mapping = {col: f'{symbol}_{col}' for col in df.columns}
            dfs.append(df.rename(columns=column_mapping))

        df = dd.concat(dfs, axis=1, join='outer')

        return df

    def _get_from_akshare(self, symbol: str) -> pd.DataFrame:
        """
        从 akshare 获取单个股票的数据。

        Args:
            symbol (str): 股票代码,支持带后缀如 000001.SZ 或不带后缀的 000001

        Returns:
            pd.DataFrame: 股票数据,以 date 为索引,包含 open, high, low, close, volume, amount 列
        """
        column_mapping = {
            "日期": "date",
            "开盘": "open",
            "收盘": "close",
            "最高": "high",
            "最低": "low",
            "成交量": "volume",
            "成交额": "amount",
        }

        # 下载数据
        df = ak.stock_zh_a_hist(symbol=symbol, period='daily', adjust=self.adjust)

        # 数据标准化处理
        # 1. 标准化列名
        df = df.rename(columns=column_mapping)
        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])

        # 2. 确保数值列为数值类型
        numeric_cols = [c for c in ("open", "high", "low", "close", "volume", "amount") if c in df.columns]
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors="coerce")

        # 3. 只保留需要的列
        mapped_columns = list(column_mapping.values())
        available_columns = [col for col in mapped_columns if col in df.columns]
        df = df[available_columns]

        df = df.set_index('date')
        # 设置索引
        return df

__init__(symbols=None, period='1d', adjust='qfq', factor=lambda df: df, source='akshare', pool_size=10, datadir=None)

初始化数据对象

参数:

名称 类型 描述 默认
symbols Optional[List[str]]

[维度1] 标的,如 ['000001.SZ', '600000.SH']

None
period str

[维度2] 周期,如 '1m', '5m', '1d' 等

'1d'
factor Callable[[DataFrame], DataFrame]

[维度3] 因子字典,key为因子名,value为因子函数

lambda df: df
source str

[维度4] 数据来源 ('qmt', 'akshare')

'akshare'
pool_size int

并发池大小

10
datadir Optional[Path]

缓存根目录,默认为项目根目录下的 datadir

None
源代码位于: qka/core/data.py
def __init__(
    self, 
    symbols: Optional[List[str]] = None,
    period: str = '1d',
    adjust: str = 'qfq',
    factor: Callable[[pd.DataFrame], pd.DataFrame] = lambda df: df,
    source: str = 'akshare',
    pool_size: int = 10,
    datadir: Optional[Path] = None
):
    """
    初始化数据对象

    Args:
        symbols: [维度1] 标的,如 ['000001.SZ', '600000.SH']
        period: [维度2] 周期,如 '1m', '5m', '1d' 等
        factor: [维度3] 因子字典,key为因子名,value为因子函数
        source: [维度4] 数据来源 ('qmt', 'akshare')
        pool_size: 并发池大小
        datadir: 缓存根目录,默认为项目根目录下的 datadir
    """
    self.symbols = symbols or []
    self.period = period
    self.adjust = adjust
    self.factor = factor
    self.source = source
    self.pool_size = pool_size

    # 初始化缓存目录
    if datadir is None:
        # 默认使用当前工作目录下的 datadir
        self.datadir = Path.cwd() / "datadir"
    else:
        self.datadir = Path(datadir)

    self.datadir.mkdir(parents=True, exist_ok=True)

    self.target_dir = self.datadir / self.source / self.period / (self.adjust or "bfq")
    self.target_dir.mkdir(parents=True, exist_ok=True)

get()

获取历史数据

并发下载所有股票数据,应用因子计算,并返回合并后的Dask DataFrame。

返回:

类型 描述
DataFrame

dd.DataFrame: 合并后的股票数据,每只股票的列名格式为 {symbol}_{column}

源代码位于: qka/core/data.py
def get(self) -> dd.DataFrame:
    """
    获取历史数据

    并发下载所有股票数据,应用因子计算,并返回合并后的Dask DataFrame。

    Returns:
        dd.DataFrame: 合并后的股票数据,每只股票的列名格式为 {symbol}_{column}
    """
    # 准备缓存目录

    with ThreadPoolExecutor(max_workers=self.pool_size) as executor:
        # 提交下载任务
        futures = {
            executor.submit(self._download, symbol): symbol
            for symbol in self.symbols
        }

        # 添加tqdm进度条
        with tqdm(total=len(self.symbols), desc="下载数据") as pbar:
            for future in as_completed(futures):
                symbol = futures[future]
                pbar.update(1)
                pbar.set_postfix_str(f"当前: {symbol}")

    dfs = []
    for symbol in self.symbols:
        df = dd.read_parquet(str(self.target_dir / f"{symbol}.parquet"))
        df = self.factor(df)
        column_mapping = {col: f'{symbol}_{col}' for col in df.columns}
        dfs.append(df.rename(columns=column_mapping))

    df = dd.concat(dfs, axis=1, join='outer')

    return df

使用示例

import qka

# 创建数据对象
data = qka.Data(
    symbols=['000001.SZ', '600000.SH'],
    period='1d',
    adjust='qfq'
)

# 获取数据
df = data.get()
print(df.head())

qka.Backtest

回测引擎类,提供基于时间序列的回测功能。

QKA回测引擎类

提供基于时间序列的回测功能,支持多股票横截面数据处理。

属性:

名称 类型 描述
data Data

数据对象实例

strategy Strategy

策略对象实例

Source code in qka/core/backtest.py
class Backtest:
    """
    QKA回测引擎类

    提供基于时间序列的回测功能,支持多股票横截面数据处理。

    Attributes:
        data (Data): 数据对象实例
        strategy (Strategy): 策略对象实例
    """

    def __init__(self, data, strategy):
        """
        初始化回测引擎

        Args:
            data (Data): Data类的实例,包含股票数据
            strategy (Strategy): 策略对象,必须包含on_bar方法
        """
        self.data = data
        self.strategy = strategy

    def run(self):
        """
        执行回测

        遍历所有时间点,在每个时间点调用策略的on_bar方法进行交易决策,
        并记录交易后的状态。
        """
        # 获取所有股票数据(dask DataFrame)
        df = self.data.get()

        for date, row in df.iterrows():
            def get(factor):
                """
                获取指定因子的数据

                Args:
                    factor (str): 因子名称,如 'close', 'volume' 等

                Returns:
                    pd.Series: 该因子在所有股票上的值
                """
                s = row[row.index.str.endswith(factor)]
                s.index = s.index.str.replace(f'_{factor}$', '', regex=True)
                return s

            # 先调用策略的on_bar(可能包含交易操作)
            self.strategy.on_bar(date, get)

            # 再调用broker的on_bar记录状态(包含交易后的状态)
            self.strategy.broker.on_bar(date, get)

    def plot(self, title="回测收益曲线"):
        """
        绘制回测收益曲线图

        Args:
            title (str): 图表标题,默认为"回测收益曲线"

        Returns:
            plotly.graph_objects.Figure: 交互式图表对象,如果无数据则返回None
        """

        # 获取交易数据
        trades_df = self.strategy.broker.trades

        # 检查是否有数据
        if trades_df.empty or 'total' not in trades_df.columns:
            print("错误:没有可用的回测数据或缺少total列")
            return None

        # 提取总资产数据
        total_assets = trades_df['total']

        # 创建交互式图表
        fig = go.Figure()

        fig.add_trace(go.Scatter(
            x=total_assets.index,
            y=total_assets.values,
            mode='lines',
            name='总资产',
            line=dict(color='#2E86AB', width=3),
            hovertemplate='日期: %{x}<br>总资产: ¥%{y:,.2f}<extra></extra>'
        ))

        # 更新布局
        fig.update_layout(
            title=dict(
                text=title,
                x=0.5,
                font=dict(size=16)
            ),
            xaxis_title="日期",
            yaxis_title="总资产 (¥)",
            height=600,
            showlegend=True,
            template='plotly_white',
            hovermode='x unified'
        )

        # 添加网格线
        fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='LightGrey')

        # 显示图表
        fig.show()

        return fig

__init__(data, strategy)

初始化回测引擎

参数:

名称 类型 描述 默认
data Data

Data类的实例,包含股票数据

必需
strategy Strategy

策略对象,必须包含on_bar方法

必需
源代码位于: qka/core/backtest.py
def __init__(self, data, strategy):
    """
    初始化回测引擎

    Args:
        data (Data): Data类的实例,包含股票数据
        strategy (Strategy): 策略对象,必须包含on_bar方法
    """
    self.data = data
    self.strategy = strategy

run()

执行回测

遍历所有时间点,在每个时间点调用策略的on_bar方法进行交易决策, 并记录交易后的状态。

源代码位于: qka/core/backtest.py
def run(self):
    """
    执行回测

    遍历所有时间点,在每个时间点调用策略的on_bar方法进行交易决策,
    并记录交易后的状态。
    """
    # 获取所有股票数据(dask DataFrame)
    df = self.data.get()

    for date, row in df.iterrows():
        def get(factor):
            """
            获取指定因子的数据

            Args:
                factor (str): 因子名称,如 'close', 'volume' 等

            Returns:
                pd.Series: 该因子在所有股票上的值
            """
            s = row[row.index.str.endswith(factor)]
            s.index = s.index.str.replace(f'_{factor}$', '', regex=True)
            return s

        # 先调用策略的on_bar(可能包含交易操作)
        self.strategy.on_bar(date, get)

        # 再调用broker的on_bar记录状态(包含交易后的状态)
        self.strategy.broker.on_bar(date, get)

plot(title='回测收益曲线')

绘制回测收益曲线图

参数:

名称 类型 描述 默认
title str

图表标题,默认为"回测收益曲线"

'回测收益曲线'

返回:

类型 描述

plotly.graph_objects.Figure: 交互式图表对象,如果无数据则返回None

源代码位于: qka/core/backtest.py
def plot(self, title="回测收益曲线"):
    """
    绘制回测收益曲线图

    Args:
        title (str): 图表标题,默认为"回测收益曲线"

    Returns:
        plotly.graph_objects.Figure: 交互式图表对象,如果无数据则返回None
    """

    # 获取交易数据
    trades_df = self.strategy.broker.trades

    # 检查是否有数据
    if trades_df.empty or 'total' not in trades_df.columns:
        print("错误:没有可用的回测数据或缺少total列")
        return None

    # 提取总资产数据
    total_assets = trades_df['total']

    # 创建交互式图表
    fig = go.Figure()

    fig.add_trace(go.Scatter(
        x=total_assets.index,
        y=total_assets.values,
        mode='lines',
        name='总资产',
        line=dict(color='#2E86AB', width=3),
        hovertemplate='日期: %{x}<br>总资产: ¥%{y:,.2f}<extra></extra>'
    ))

    # 更新布局
    fig.update_layout(
        title=dict(
            text=title,
            x=0.5,
            font=dict(size=16)
        ),
        xaxis_title="日期",
        yaxis_title="总资产 (¥)",
        height=600,
        showlegend=True,
        template='plotly_white',
        hovermode='x unified'
    )

    # 添加网格线
    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='LightGrey')

    # 显示图表
    fig.show()

    return fig

使用示例

# 运行回测
strategy = MyStrategy()
backtest = qka.Backtest(data, strategy)
backtest.run()

# 可视化结果
backtest.plot("我的策略回测结果")

qka.Strategy

策略抽象基类,所有自定义策略都应该继承此类。

Bases: ABC

策略抽象基类

所有自定义策略都应该继承此类,并实现on_bar方法。

属性:

名称 类型 描述
broker Broker

交易经纪商实例,用于执行交易操作

Source code in qka/core/strategy.py
class Strategy(ABC):
    """
    策略抽象基类

    所有自定义策略都应该继承此类,并实现on_bar方法。

    Attributes:
        broker (Broker): 交易经纪商实例,用于执行交易操作
    """

    def __init__(self):
        """初始化策略"""
        self.broker = Broker()

    @abstractmethod
    def on_bar(self, date, get):
        """
        每个bar的处理逻辑,必须由子类实现

        Args:
            date: 当前时间戳
            get: 获取因子数据的函数,格式为 get(factor_name) -> pd.Series
        """
        pass

__init__()

初始化策略

源代码位于: qka/core/strategy.py
def __init__(self):
    """初始化策略"""
    self.broker = Broker()

on_bar(date, get) abstractmethod

每个bar的处理逻辑,必须由子类实现

参数:

名称 类型 描述 默认
date

当前时间戳

必需
get

获取因子数据的函数,格式为 get(factor_name) -> pd.Series

必需
源代码位于: qka/core/strategy.py
@abstractmethod
def on_bar(self, date, get):
    """
    每个bar的处理逻辑,必须由子类实现

    Args:
        date: 当前时间戳
        get: 获取因子数据的函数,格式为 get(factor_name) -> pd.Series
    """
    pass

使用示例

class MyStrategy(qka.Strategy):
    def __init__(self):
        super().__init__()
        self.ma_short = 5
        self.ma_long = 20

    def on_bar(self, date, get):
        close_prices = get('close')
        # 策略逻辑...

qka.Broker

虚拟交易经纪商类,管理资金、持仓和交易记录。

虚拟交易经纪商类

管理资金、持仓和交易记录,提供买入卖出操作接口。

属性:

名称 类型 描述
cash float

可用现金

positions Dict

持仓记录,格式: {symbol: {'size': 数量, 'avg_price': 平均成本}}

trade_history List

交易历史记录

timestamp

当前时间戳

trades DataFrame

交易记录DataFrame

Source code in qka/core/broker.py
class Broker:
    """
    虚拟交易经纪商类

    管理资金、持仓和交易记录,提供买入卖出操作接口。

    Attributes:
        cash (float): 可用现金
        positions (Dict): 持仓记录,格式: {symbol: {'size': 数量, 'avg_price': 平均成本}}
        trade_history (List): 交易历史记录
        timestamp: 当前时间戳
        trades (pd.DataFrame): 交易记录DataFrame
    """

    def __init__(self, initial_cash=100000.0):
        """
        初始化Broker类

        Args:
            initial_cash (float): 初始资金,默认10万元
        """
        self.cash = initial_cash  # 可用现金
        self.positions = {}       # 持仓记录,格式: {symbol: {'size': 数量, 'avg_price': 平均成本}}
        self.trade_history = []   # 交易历史记录
        self.timestamp = None     # 当前时间戳

        # 交易记录
        self.trades = pd.DataFrame(columns=[
            'cash', 'value', 'total',
            'positions', 'trades'
        ])

    def on_bar(self, date, get):
        """
        处理每个bar的数据,记录状态

        Args:
            date: 当前时间戳
            get: 获取因子数据的函数,格式为 get(factor_name) -> pd.Series
        """
        self.timestamp = date

        # 获取当前市场价格
        close_prices = get('close')
        market_prices = close_prices.to_dict() if hasattr(close_prices, 'to_dict') else {}

        # 记录状态到trades DataFrame
        if self.timestamp is None:
            return

        # 获取当日交易记录
        daily_trades = []
        for trade in self.trade_history:
            if trade['timestamp'] == self.timestamp:
                daily_trades.append(trade)

        # 计算持仓市值
        position_value = 0.0
        for symbol, position in self.positions.items():
            if market_prices and symbol in market_prices:
                # 使用市场价格计算市值
                position_value += position['size'] * market_prices[symbol]
            else:
                # 使用平均成本估算市值
                position_value += position['size'] * position['avg_price']

        # 计算总资产
        total_assets = self.cash + position_value

        # 记录状态
        state_data = {
            'cash': self.cash,
            'value': position_value,
            'total': total_assets,
            'positions': self.positions.copy(),
            'trades': daily_trades.copy()
        }

        # 添加到trades
        self.trades.loc[self.timestamp] = state_data

    def buy(self, symbol: str, price: float, size: int) -> bool:
        """
        买入操作

        Args:
            symbol (str): 交易标的代码
            price (float): 买入价格
            size (int): 买入数量

        Returns:
            bool: 交易是否成功
        """
        # 计算买入所需金额
        required_cash = price * size

        # 检查资金是否足够
        if self.cash < required_cash:
            print(f"资金不足!需要 {required_cash:.2f},当前可用 {self.cash:.2f}")
            return False

        # 执行买入操作
        self.cash -= required_cash

        # 更新持仓
        if symbol in self.positions:
            # 已有持仓,计算新的平均成本
            old_position = self.positions[symbol]
            old_size = old_position['size']
            old_avg_price = old_position['avg_price']
            old_total_value = old_size * old_avg_price
            new_total_value = old_total_value + required_cash
            new_size = old_size + size
            new_avg_price = new_total_value / new_size

            self.positions[symbol] = {
                'size': new_size,
                'avg_price': new_avg_price
            }
        else:
            # 新建持仓
            self.positions[symbol] = {
                'size': size,
                'avg_price': price
            }

        # 记录交易历史
        self.trade_history.append({
            'action': 'buy',
            'symbol': symbol,
            'price': price,
            'size': size,
            'timestamp': self.timestamp
        })

        print(f"买入成功: {symbol} {size}股 @ {price:.2f},花费 {required_cash:.2f}")
        return True

    def sell(self, symbol: str, price: float, size: int) -> bool:
        """
        卖出操作

        Args:
            symbol (str): 交易标的代码
            price (float): 卖出价格
            size (int): 卖出数量

        Returns:
            bool: 交易是否成功
        """
        # 检查是否有足够的持仓
        if symbol not in self.positions:
            print(f"没有 {symbol} 的持仓!")
            return False

        position = self.positions[symbol]
        if position['size'] < size:
            print(f"持仓不足!当前持有 {position['size']},尝试卖出 {size}")
            return False

        # 计算卖出所得金额
        sale_proceeds = price * size

        # 执行卖出操作
        self.cash += sale_proceeds

        # 更新持仓
        if position['size'] == size:
            # 全部卖出,删除持仓记录
            del self.positions[symbol]
        else:
            # 部分卖出,更新持仓数量
            self.positions[symbol]['size'] -= size

        # 记录交易历史
        self.trade_history.append({
            'action': 'sell',
            'symbol': symbol,
            'price': price,
            'size': size,
            'timestamp': self.timestamp
        })

        print(f"卖出成功: {symbol} {size}股 @ {price:.2f},获得 {sale_proceeds:.2f}")
        return True

    def get(self, factor: str, timestamp=None) -> Any:
        """
        从trades DataFrame中获取数据

        Args:
            factor (str): 列名,可选 'cash', 'value', 'total', 'positions', 'trades'
            timestamp: 时间戳,如果为None则使用当前时间戳

        Returns:
            Any: 对应列的数据,如果不存在则返回None
        """
        if timestamp is None:
            timestamp = self.timestamp

        if timestamp is None or timestamp not in self.trades.index:
            return None

        return self.trades.at[timestamp, factor]

__init__(initial_cash=100000.0)

初始化Broker类

参数:

名称 类型 描述 默认
initial_cash float

初始资金,默认10万元

100000.0
源代码位于: qka/core/broker.py
def __init__(self, initial_cash=100000.0):
    """
    初始化Broker类

    Args:
        initial_cash (float): 初始资金,默认10万元
    """
    self.cash = initial_cash  # 可用现金
    self.positions = {}       # 持仓记录,格式: {symbol: {'size': 数量, 'avg_price': 平均成本}}
    self.trade_history = []   # 交易历史记录
    self.timestamp = None     # 当前时间戳

    # 交易记录
    self.trades = pd.DataFrame(columns=[
        'cash', 'value', 'total',
        'positions', 'trades'
    ])

on_bar(date, get)

处理每个bar的数据,记录状态

参数:

名称 类型 描述 默认
date

当前时间戳

必需
get

获取因子数据的函数,格式为 get(factor_name) -> pd.Series

必需
源代码位于: qka/core/broker.py
def on_bar(self, date, get):
    """
    处理每个bar的数据,记录状态

    Args:
        date: 当前时间戳
        get: 获取因子数据的函数,格式为 get(factor_name) -> pd.Series
    """
    self.timestamp = date

    # 获取当前市场价格
    close_prices = get('close')
    market_prices = close_prices.to_dict() if hasattr(close_prices, 'to_dict') else {}

    # 记录状态到trades DataFrame
    if self.timestamp is None:
        return

    # 获取当日交易记录
    daily_trades = []
    for trade in self.trade_history:
        if trade['timestamp'] == self.timestamp:
            daily_trades.append(trade)

    # 计算持仓市值
    position_value = 0.0
    for symbol, position in self.positions.items():
        if market_prices and symbol in market_prices:
            # 使用市场价格计算市值
            position_value += position['size'] * market_prices[symbol]
        else:
            # 使用平均成本估算市值
            position_value += position['size'] * position['avg_price']

    # 计算总资产
    total_assets = self.cash + position_value

    # 记录状态
    state_data = {
        'cash': self.cash,
        'value': position_value,
        'total': total_assets,
        'positions': self.positions.copy(),
        'trades': daily_trades.copy()
    }

    # 添加到trades
    self.trades.loc[self.timestamp] = state_data

buy(symbol, price, size)

买入操作

参数:

名称 类型 描述 默认
symbol str

交易标的代码

必需
price float

买入价格

必需
size int

买入数量

必需

返回:

名称 类型 描述
bool bool

交易是否成功

源代码位于: qka/core/broker.py
def buy(self, symbol: str, price: float, size: int) -> bool:
    """
    买入操作

    Args:
        symbol (str): 交易标的代码
        price (float): 买入价格
        size (int): 买入数量

    Returns:
        bool: 交易是否成功
    """
    # 计算买入所需金额
    required_cash = price * size

    # 检查资金是否足够
    if self.cash < required_cash:
        print(f"资金不足!需要 {required_cash:.2f},当前可用 {self.cash:.2f}")
        return False

    # 执行买入操作
    self.cash -= required_cash

    # 更新持仓
    if symbol in self.positions:
        # 已有持仓,计算新的平均成本
        old_position = self.positions[symbol]
        old_size = old_position['size']
        old_avg_price = old_position['avg_price']
        old_total_value = old_size * old_avg_price
        new_total_value = old_total_value + required_cash
        new_size = old_size + size
        new_avg_price = new_total_value / new_size

        self.positions[symbol] = {
            'size': new_size,
            'avg_price': new_avg_price
        }
    else:
        # 新建持仓
        self.positions[symbol] = {
            'size': size,
            'avg_price': price
        }

    # 记录交易历史
    self.trade_history.append({
        'action': 'buy',
        'symbol': symbol,
        'price': price,
        'size': size,
        'timestamp': self.timestamp
    })

    print(f"买入成功: {symbol} {size}股 @ {price:.2f},花费 {required_cash:.2f}")
    return True

sell(symbol, price, size)

卖出操作

参数:

名称 类型 描述 默认
symbol str

交易标的代码

必需
price float

卖出价格

必需
size int

卖出数量

必需

返回:

名称 类型 描述
bool bool

交易是否成功

源代码位于: qka/core/broker.py
def sell(self, symbol: str, price: float, size: int) -> bool:
    """
    卖出操作

    Args:
        symbol (str): 交易标的代码
        price (float): 卖出价格
        size (int): 卖出数量

    Returns:
        bool: 交易是否成功
    """
    # 检查是否有足够的持仓
    if symbol not in self.positions:
        print(f"没有 {symbol} 的持仓!")
        return False

    position = self.positions[symbol]
    if position['size'] < size:
        print(f"持仓不足!当前持有 {position['size']},尝试卖出 {size}")
        return False

    # 计算卖出所得金额
    sale_proceeds = price * size

    # 执行卖出操作
    self.cash += sale_proceeds

    # 更新持仓
    if position['size'] == size:
        # 全部卖出,删除持仓记录
        del self.positions[symbol]
    else:
        # 部分卖出,更新持仓数量
        self.positions[symbol]['size'] -= size

    # 记录交易历史
    self.trade_history.append({
        'action': 'sell',
        'symbol': symbol,
        'price': price,
        'size': size,
        'timestamp': self.timestamp
    })

    print(f"卖出成功: {symbol} {size}股 @ {price:.2f},获得 {sale_proceeds:.2f}")
    return True

get(factor, timestamp=None)

从trades DataFrame中获取数据

参数:

名称 类型 描述 默认
factor str

列名,可选 'cash', 'value', 'total', 'positions', 'trades'

必需
timestamp

时间戳,如果为None则使用当前时间戳

None

返回:

名称 类型 描述
Any Any

对应列的数据,如果不存在则返回None

源代码位于: qka/core/broker.py
def get(self, factor: str, timestamp=None) -> Any:
    """
    从trades DataFrame中获取数据

    Args:
        factor (str): 列名,可选 'cash', 'value', 'total', 'positions', 'trades'
        timestamp: 时间戳,如果为None则使用当前时间戳

    Returns:
        Any: 对应列的数据,如果不存在则返回None
    """
    if timestamp is None:
        timestamp = self.timestamp

    if timestamp is None or timestamp not in self.trades.index:
        return None

    return self.trades.at[timestamp, factor]

使用示例

# 在策略中使用
class MyStrategy(qka.Strategy):
    def on_bar(self, date, get):
        close_prices = get('close')
        for symbol in close_prices.index:
            if self.should_buy(symbol, close_prices[symbol]):
                self.broker.buy(symbol, close_prices[symbol], 100)

模块导入方式

根据 qka/__init__.py 的配置,所有核心模块都可以直接从 qka 包导入:

import qka

# 直接使用
data = qka.Data(...)
backtest = qka.Backtest(...)
strategy = qka.Strategy(...)  # 作为基类
broker = qka.Broker(...)

相关链接