diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index ee5c721a5..a76afdb46 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -5,7 +5,7 @@ name: Python package on: push: - branches: [ master, V0.9.51 ] + branches: [ master, V0.9.52 ] pull_request: branches: [ master ] diff --git a/czsc/__init__.py b/czsc/__init__.py index 9505feac6..15f7fa06f 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -115,6 +115,10 @@ get_trading_dates, ) +from czsc.utils.trade import ( + adjust_holding_weights, +) + # streamlit 量化分析组件 from czsc.utils.st_components import ( show_daily_return, @@ -140,6 +144,8 @@ show_strategies_symbol, show_strategies_dailys, show_holds_backtest, + show_symbols_corr, + show_feature_returns, ) from czsc.utils.bi_info import ( @@ -166,6 +172,9 @@ rolling_tanh, feature_adjust, normalize_corr, + feature_to_weight, + feature_returns, + feature_sectional_corr, ) @@ -177,10 +186,10 @@ ) -__version__ = "0.9.51" +__version__ = "0.9.52" __author__ = "zengbin93" __email__ = "zeng_bin8888@163.com" -__date__ = "20240512" +__date__ = "20240526" def welcome(): diff --git a/czsc/connectors/research.py b/czsc/connectors/research.py index 812744b81..01720e8b9 100644 --- a/czsc/connectors/research.py +++ b/czsc/connectors/research.py @@ -11,6 +11,8 @@ import czsc import glob import pandas as pd +from datetime import datetime + # 投研共享数据的本地缓存路径,需要根据实际情况修改 cache_path = os.environ.get("czsc_research_cache", r"D:\CZSC投研数据") @@ -65,5 +67,20 @@ def get_raw_bars(symbol, freq, sdt, edt, fq="前复权", **kwargs): kline = kline[(kline["dt"] >= pd.to_datetime(sdt)) & (kline["dt"] <= pd.to_datetime(edt))] if kline.empty: return [] - _bars = czsc.resample_bars(kline, freq, raw_bars=raw_bars, base_freq="1分钟") + + df = kline.copy() + if symbol in ["SFIC9001", "SFIF9001", "SFIH9001"]: + # 股指:仅保留 09:31 - 11:30, 13:01 - 15:00 + # 历史遗留问题,股指有一段时间,收盘时间是 15:15 + dt1 = datetime.strptime("09:31:00", "%H:%M:%S") + dt2 = datetime.strptime("11:30:00", "%H:%M:%S") + c1 = (df["dt"].dt.time >= dt1.time()) & (df["dt"].dt.time <= dt2.time()) + + dt3 = datetime.strptime("13:01:00", "%H:%M:%S") + dt4 = datetime.strptime("15:00:00", "%H:%M:%S") + c2 = (df["dt"].dt.time >= dt3.time()) & (df["dt"].dt.time <= dt4.time()) + + df = df[c1 | c2].copy().reset_index(drop=True) + + _bars = czsc.resample_bars(df, freq, raw_bars=raw_bars, base_freq="1分钟") return _bars diff --git a/czsc/connectors/tq_connector.py b/czsc/connectors/tq_connector.py index d803fa30e..1aa1f6dd6 100644 --- a/czsc/connectors/tq_connector.py +++ b/czsc/connectors/tq_connector.py @@ -17,6 +17,64 @@ from tqsdk import TqApi, TqAuth, TqSim, TqBacktest, TargetPosTask, BacktestFinished, TqAccount, TqKq # noqa +def format_kline(df, freq=Freq.F1): + """对分钟K线进行格式化""" + freq = Freq(freq) + rows = df.to_dict("records") + raw_bars = [] + for i, row in enumerate(rows): + bar = RawBar( + symbol=row["symbol"], + id=i, + freq=freq, + dt=datetime.fromtimestamp(row["datetime"] / 1e9) + timedelta(minutes=1), + open=row["open"], + close=row["close"], + high=row["high"], + low=row["low"], + vol=row["volume"], + amount=row["volume"] * row["close"], + ) + raw_bars.append(bar) + return raw_bars + + +def create_symbol_trader(api: TqApi, symbol, **kwargs): + """创建一个品种的 CzscTrader, 回测与实盘场景同样适用 + + :param api: TqApi, 天勤API实例 + :param symbol: str, 合约代码,要求符合天勤的规范 + :param kwargs: dict, 其他参数 + + - sdt: str, 开始日期 + - files_position: list[str], 策略配置文件路径 + - adj_type: str, 复权类型,可选值:'F', 'B', 'N',默认为 'F',前复权 + + """ + adj_type = kwargs.get("adj_type", "F") + files_position = kwargs.get("files_position") + tactic = czsc.CzscJsonStrategy(symbol=symbol, files_position=files_position) + kline = api.get_kline_serial(symbol, int(tactic.base_freq.strip("分钟")) * 60, data_length=10000, adj_type=adj_type) + quote = api.get_quote(symbol) + raw_bars = format_kline(kline, freq=tactic.base_freq) + if kwargs.get("sdt"): + sdt = pd.to_datetime(kwargs.get("sdt")).date() + else: + sdt = (pd.Timestamp.now() - pd.Timedelta(days=1)).date() + trader = tactic.init_trader(raw_bars, sdt=sdt) + target_pos = TargetPosTask(api, quote.underlying_symbol) + + meta = { + "symbol": symbol, + "kline": kline, + "quote": quote, + "trader": trader, + "base_freq": tactic.base_freq, + "target_pos": target_pos, + } + return meta + + # https://doc.shinnytech.com/tqsdk/latest/usage/mddatas.html 代码规则 symbols = [ # https://www.jiaoyixingqiu.com/shouxufei/jiaoyisuo/SHFE diff --git a/czsc/features/utils.py b/czsc/features/utils.py index 3d7b4bd78..826aa48a8 100644 --- a/czsc/features/utils.py +++ b/czsc/features/utils.py @@ -3,6 +3,7 @@ import pandas as pd from sklearn.linear_model import LinearRegression from sklearn.preprocessing import minmax_scale, scale, maxabs_scale, robust_scale +from loguru import logger def is_event_feature(df, col, **kwargs): @@ -51,8 +52,8 @@ def rolling_rank(df: pd.DataFrame, col, window=300, min_periods=100, new_col=Non if kwargs.get("copy", False): df = df.copy() - min_periods = kwargs.get('min_periods', 2) - new_col = new_col if new_col else f'{col}_rank' + min_periods = kwargs.get("min_periods", 2) + new_col = new_col if new_col else f"{col}_rank" df[new_col] = df[col].rolling(window=window, min_periods=min_periods).rank(pct=True) df[new_col] = df[new_col].fillna(0) return df @@ -70,9 +71,11 @@ def rolling_norm(df: pd.DataFrame, col, window=300, min_periods=100, new_col=Non if kwargs.get("copy", False): df = df.copy() - min_periods = kwargs.get('min_periods', 2) - new_col = new_col if new_col else f'{col}_norm' - df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: (x[-1] - x.mean()) / x.std(), raw=True) + min_periods = kwargs.get("min_periods", 2) + new_col = new_col if new_col else f"{col}_norm" + df[new_col] = ( + df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: (x[-1] - x.mean()) / x.std(), raw=True) + ) df[new_col] = df[new_col].fillna(0) return df @@ -89,12 +92,12 @@ def rolling_qcut(df: pd.DataFrame, col, window=300, min_periods=100, new_col=Non if kwargs.get("copy", False): df = df.copy() - q = kwargs.get('q', 10) - min_periods = kwargs.get('min_periods', q) - new_col = new_col if new_col else f'{col}_qcut' + q = kwargs.get("q", 10) + min_periods = kwargs.get("min_periods", q) + new_col = new_col if new_col else f"{col}_qcut" def __qcut_func(x): - return pd.qcut(x, q=q, labels=False, duplicates='drop')[-1] + return pd.qcut(x, q=q, labels=False, duplicates="drop")[-1] df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(__qcut_func, raw=True) df[new_col] = df[new_col].fillna(-1) @@ -118,32 +121,37 @@ def rolling_compare(df, col1, col2, window=300, min_periods=100, new_col=None, * min_periods: int 最小计算周期 """ - window = kwargs.get('window', 300) - min_periods = kwargs.get('min_periods', 2) - new_col = new_col if new_col else f'compare_{col1}_{col2}' - method = kwargs.get('method', 'sub') - assert method in ['sub', 'divide', 'lr_intercept', 'lr_coef'], "method 必须为 sub, divide, lr_intercept, lr_coef 中的一种" + window = kwargs.get("window", 300) + min_periods = kwargs.get("min_periods", 2) + new_col = new_col if new_col else f"compare_{col1}_{col2}" + method = kwargs.get("method", "sub") + assert method in [ + "sub", + "divide", + "lr_intercept", + "lr_coef", + ], "method 必须为 sub, divide, lr_intercept, lr_coef 中的一种" for i in range(len(df)): - dfi = df.loc[i - window + 1:i, [col1, col2]] + dfi = df.loc[i - window + 1 : i, [col1, col2]] dfi = dfi.copy() if i < min_periods: df.loc[i, new_col] = 0 continue - if method == 'sub': + if method == "sub": df.loc[i, new_col] = dfi[col1].sub(dfi[col2]).mean() - elif method == 'divide': + elif method == "divide": df.loc[i, new_col] = dfi[col1].divide(dfi[col2]).mean() - elif method == 'lr_intercept': + elif method == "lr_intercept": x = dfi[col2].values.reshape(-1, 1) y = dfi[col1].values.reshape(-1, 1) reg = LinearRegression().fit(x, y) df.loc[i, new_col] = reg.intercept_[0] - elif method == 'lr_coef': + elif method == "lr_coef": x = dfi[col2].values.reshape(-1, 1) y = dfi[col1].values.reshape(-1, 1) reg = LinearRegression().fit(x, y) @@ -166,20 +174,24 @@ def rolling_scale(df: pd.DataFrame, col: str, window=300, min_periods=100, new_c df = df.copy() df = df.sort_values("dt", ascending=True).reset_index(drop=True) - new_col = new_col if new_col else f'{col}_scale' + new_col = new_col if new_col else f"{col}_scale" method = kwargs.get("method", "scale") method_map = { "scale": scale, "minmax_scale": minmax_scale, "maxabs_scale": maxabs_scale, - "robust_scale": robust_scale + "robust_scale": robust_scale, } assert method in method_map, f"method must be one of {list(method_map.keys())}" scale_method = method_map[method] if method == "minmax_scale": - df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: minmax_scale(x, feature_range=(-1, 1))[-1]) + df[new_col] = ( + df[col] + .rolling(window=window, min_periods=min_periods) + .apply(lambda x: minmax_scale(x, feature_range=(-1, 1))[-1]) + ) else: df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: scale_method(x)[-1]) @@ -200,9 +212,9 @@ def rolling_tanh(df: pd.DataFrame, col: str, window=300, min_periods=100, new_co """ if kwargs.get("copy", False): df = df.copy() - new_col = new_col if new_col else f'{col}_tanh' + new_col = new_col if new_col else f"{col}_tanh" df = df.sort_values("dt", ascending=True).reset_index(drop=True) - df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: np.tanh(scale(x))[-1]) # type: ignore + df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: np.tanh(scale(x))[-1]) # type: ignore df[new_col] = df[new_col].fillna(0) return df @@ -226,31 +238,32 @@ def rolling_slope(df: pd.DataFrame, col: str, window=300, min_periods=100, new_c - std/mean: 使用序列的 std/mean 计算斜率 - snr: 使用序列的 snr 计算斜率 """ - method = kwargs.get('method', 'linear') - new_col = new_col if new_col else f'{col}_slope_{method}' + method = kwargs.get("method", "linear") + new_col = new_col if new_col else f"{col}_slope_{method}" - if method == 'linear': + if method == "linear": # 使用线性回归计算斜率 def __lr_slope(x): return LinearRegression().fit(list(range(len(x))), x).coef_[0] + df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(__lr_slope, raw=True) - elif method == 'std/mean': + elif method == "std/mean": # 用 window 内 std 的变化率除以 mean 的变化率,来衡量序列的斜率 # 如果 std/mean > 0, 则表示序列的斜率在变大,反之则表示序列的斜率在变小 - df['temp_std'] = df[col].rolling(window=window, min_periods=min_periods).std().pct_change(window) - df['temp_mean'] = df[col].rolling(window=window, min_periods=min_periods).mean().pct_change(window) - df[new_col] = np.where(df['temp_mean'] != 0, df['temp_std'] / df['temp_mean'], 0) + df["temp_std"] = df[col].rolling(window=window, min_periods=min_periods).std().pct_change(window) + df["temp_mean"] = df[col].rolling(window=window, min_periods=min_periods).mean().pct_change(window) + df[new_col] = np.where(df["temp_mean"] != 0, df["temp_std"] / df["temp_mean"], 0) # 加入变化率的正负号 df[new_col] = df[new_col] * np.sign(df[col].pct_change(window)) - df.drop(['temp_std', 'temp_mean'], axis=1, inplace=True) + df.drop(["temp_std", "temp_mean"], axis=1, inplace=True) - elif method == 'snr': + elif method == "snr": # 用 window 内的信噪比变化率来衡量序列的斜率 df[new_col] = df[col].diff(window) / df[col].diff().abs().rolling(window=window, min_periods=min_periods).sum() else: - raise ValueError(f'Unknown method: {method}') + raise ValueError(f"Unknown method: {method}") df[new_col] = df[new_col].fillna(0) return df @@ -283,22 +296,22 @@ def normalize_corr(df: pd.DataFrame, fcol, ycol=None, **kwargs): if kwargs.get("copy", False): df = df.copy() - df = df.sort_values(['symbol', 'dt'], ascending=True).reset_index(drop=True) + df = df.sort_values(["symbol", "dt"], ascending=True).reset_index(drop=True) for symbol, dfg in df.groupby("symbol"): - dfg['ycol'] = dfg['price'].pct_change().shift(-1) + dfg["ycol"] = dfg["price"].pct_change().shift(-1) if mode.lower() == "rolling": - dfg['corr_sign'] = np.sign(dfg[fcol].rolling(window=window, min_periods=min_periods).corr(dfg['ycol'])) - dfg[fcol] = (dfg['corr_sign'].shift(3) * dfg[fcol]).fillna(0) + dfg["corr_sign"] = np.sign(dfg[fcol].rolling(window=window, min_periods=min_periods).corr(dfg["ycol"])) + dfg[fcol] = (dfg["corr_sign"].shift(3) * dfg[fcol]).fillna(0) elif mode.lower() == "simple": - corr_sign = np.sign(dfg[fcol].corr(dfg['ycol'])) + corr_sign = np.sign(dfg[fcol].corr(dfg["ycol"])) dfg[fcol] = corr_sign * dfg[fcol] else: raise ValueError(f"Unknown mode: {mode}") - df.loc[df['symbol'] == symbol, fcol] = dfg[fcol] + df.loc[df["symbol"] == symbol, fcol] = dfg[fcol] return df @@ -315,15 +328,16 @@ def feature_adjust_V230101(df: pd.DataFrame, fcol, **kwargs): min_periods = kwargs.get("min_periods", 200) df = df.copy().sort_values("dt", ascending=True).reset_index(drop=True) - df['n1b'] = df['price'].shift(-1) / df['price'] - 1 - df['corr'] = df[fcol].rolling(window=window, min_periods=min_periods).corr(df['n1b']) - df['corr'] = df['corr'].shift(5).fillna(0) + df["n1b"] = df["price"].shift(-1) / df["price"] - 1 + df["corr"] = df[fcol].rolling(window=window, min_periods=min_periods).corr(df["n1b"]) + df["corr"] = df["corr"].shift(5).fillna(0) - df = rolling_scale(df, col=fcol, window=window, min_periods=min_periods, - new_col='weight', method='maxabs_scale', copy=True) - df['weight'] = df['weight'] * np.sign(df['corr']) + df = rolling_scale( + df, col=fcol, window=window, min_periods=min_periods, new_col="weight", method="maxabs_scale", copy=True + ) + df["weight"] = df["weight"] * np.sign(df["corr"]) - df.drop(['n1b', 'corr'], axis=1, inplace=True) + df.drop(["n1b", "corr"], axis=1, inplace=True) return df @@ -340,14 +354,14 @@ def feature_adjust_V240323(df: pd.DataFrame, fcol, **kwargs): min_periods = kwargs.get("min_periods", 200) df = df.copy().sort_values("dt", ascending=True).reset_index(drop=True) - df['n1b'] = df['price'].shift(-1) / df['price'] - 1 - df['corr'] = df[fcol].rolling(window=window, min_periods=min_periods).corr(df['n1b']) - df['corr'] = df['corr'].shift(5).fillna(0) + df["n1b"] = df["price"].shift(-1) / df["price"] - 1 + df["corr"] = df[fcol].rolling(window=window, min_periods=min_periods).corr(df["n1b"]) + df["corr"] = df["corr"].shift(5).fillna(0) - df = rolling_tanh(df, col=fcol, window=window, min_periods=min_periods, new_col='weight') - df['weight'] = df['weight'] * np.sign(df['corr']) + df = rolling_tanh(df, col=fcol, window=window, min_periods=min_periods, new_col="weight") + df["weight"] = df["weight"] * np.sign(df["corr"]) - df.drop(['n1b', 'corr'], axis=1, inplace=True) + df.drop(["n1b", "corr"], axis=1, inplace=True) return df @@ -379,3 +393,141 @@ def feature_adjust(df: pd.DataFrame, fcol, method, **kwargs): return feature_adjust_V240323(df, fcol, **kwargs) else: raise ValueError(f"Unknown method: {method}") + + +def feature_to_weight(df, factor, positive, **kwargs): + """时序因子转换为持仓权重 + + 处理流程: + + 1. 缩尾处理:去除极端值 + 2. scale 缩放,均值为0 + 3. maxabs_scale 缩放至 [-1, 1] + + :param df: pd.DataFrame, 包含因子列的数据 + :param factor: str, 因子列名 + :param positive: bool, 因子是否为正向因子 + :param kwargs: + + - window: int, 计算窗口长度,默认为1000 + - min_periods: int, 最小计算窗口长度,默认为100 + - q_threshold: float, 缩尾阈值,默认为0.05 + + """ + window = kwargs.get("window", 1000) + min_periods = kwargs.get("min_periods", 100) + q_threshold = kwargs.get("q_threshold", 0.05) + assert df["symbol"].nunique() == 1, "必须按品种计算权重" + + # 缩尾处理 + df["upper"] = df[factor].rolling(window, min_periods).quantile(1 - q_threshold) + df["lower"] = df[factor].rolling(window, min_periods).quantile(q_threshold) + df[factor] = df[factor].clip(lower=df["lower"], upper=df["upper"]) + + # scale 缩放,均值为0 + df["norm"] = df[factor].rolling(window, min_periods).apply(lambda x: scale(x)[-1]) + + # maxabs_scale 缩放至 [-1, 1] + df["weight"] = df["norm"].rolling(window, min_periods).apply(lambda x: maxabs_scale(x)[-1]) + df["weight"] = df["weight"].fillna(0) + if not positive: + df["weight"] = -df["weight"] + + df.drop(["upper", "lower", "norm"], axis=1, inplace=True) + return df + + +def feature_returns(df, factor, target="n1b", **kwargs): + """计算因子特征截面收益率 + + :param df: pd.DataFrame, 必须包含 dt、symbol、factor, target 列 + :param factor: str, 因子列名 + :param target: str, 预测目标收益率列名 + :param kwargs: + + - fit_intercept: bool, 是否拟合截距项,默认为 False + + :return: pd.DataFrame, 新增 returns 列 + """ + from sklearn.linear_model import LinearRegression + + df = df.copy() + fit_intercept = kwargs.get("fit_intercept", False) + + ret = [] + for dt, dfg in df.groupby("dt"): + dfg = dfg.copy().dropna(subset=[factor, target]) + if dfg.empty or len(dfg) < 5: + ret.append([dt, 0]) + logger.warning(f"{dt} has no enough data, only {len(dfg)} rows") + continue + + x = dfg[factor].values.reshape(-1, 1) + y = dfg[target].values.reshape(-1, 1) + model = LinearRegression(fit_intercept=fit_intercept).fit(x, y) + ret.append([dt, model.coef_[0][0]]) + + dft = pd.DataFrame(ret, columns=["dt", "returns"]) + return dft + + +def feature_sectional_corr(df, factor, target="n1b", method="pearson", **kwargs): + """计算因子特征截面相关性(IC) + + :param df:数据,DateFrame格式 + :param factor:因子列名,一般采用F#开头的列 + :param target:目标列名,一般为n1b + :param method:{'pearson', 'kendall', 'spearman'} or callable + + * pearson : standard correlation coefficient + * kendall : Kendall Tau correlation coefficient + * spearman : Spearman rank correlation + * callable: callable with input two 1d ndarrays and returning a float + + :return:df,res: 前者是每日相关系数结果,后者是每日相关系数的统计结果 + """ + from czsc.utils import single_linear + + df = df.copy() + corr = [] + for dt, dfg in df.groupby("dt"): + dfg = dfg.copy().dropna(subset=[factor, target]) + + if dfg.empty or len(dfg) < 5: + corr.append([dt, 0]) + logger.warning(f"{dt} has no enough data, only {len(dfg)} rows") + else: + c = dfg[factor].corr(dfg[target], method=method) + corr.append([dt, c]) + + dft = pd.DataFrame(corr, columns=["dt", "corr"]) + + res = { + "factor": factor, + "target": target, + "method": method, + "IC均值": 0, + "IC标准差": 0, + "ICIR": 0, + "IC胜率": 0, + "累计IC回归R2": 0, + "累计IC回归斜率": 0, + } + if dft.empty: + return dft, res + + dft = dft[~dft["ic"].isnull()].copy() + ic_avg = dft["ic"].mean() + ic_std = dft["ic"].std() + + res["IC均值"] = round(ic_avg, 4) + res["IC标准差"] = round(ic_std, 4) + res["ICIR"] = round(ic_avg / ic_std, 4) if ic_std != 0 else 0 + if ic_avg > 0: + res["IC胜率"] = round(len(dft[dft["ic"] > 0]) / len(dft), 4) + else: + res["IC胜率"] = round(len(dft[dft["ic"] < 0]) / len(dft), 4) + + lr_ = single_linear(y=dft["ic"].cumsum().to_list()) + res.update({"累计IC回归R2": lr_["r2"], "累计IC回归斜率": lr_["slope"]}) + return dft, res diff --git a/czsc/fsa/__init__.py b/czsc/fsa/__init__.py index 860baeeaf..cfb253134 100644 --- a/czsc/fsa/__init__.py +++ b/czsc/fsa/__init__.py @@ -61,11 +61,13 @@ def push_card(card: str, key: str) -> None: """ api_send = f"https://open.feishu.cn/open-apis/bot/v2/hook/{key}" data = {"msg_type": "interactive", "card": card} + response = requests.post(url=api_send, json=data) + try: - response = requests.post(url=api_send, json=data) assert response.json()["StatusMessage"] == "success" except Exception as e: logger.error(f"推送消息失败: {e}") + logger.error(response.json()) def read_feishu_sheet(spread_sheet_token: str, sheet_id: str = None, **kwargs): @@ -156,23 +158,18 @@ def update_spreadsheet(df: pd.DataFrame, spreadsheet_token: str, sheet_id: str, feishu_app_secret: 飞书APP的app_secret :return: """ - fsf = SpreadSheets(app_id=kwargs['feishu_app_id'], app_secret=kwargs['feishu_app_secret']) + fsf = SpreadSheets(app_id=kwargs["feishu_app_id"], app_secret=kwargs["feishu_app_secret"]) data = { - "valueRanges": [ - { - "range": f"{sheet_id}!A1:Z1", - "values": [list(df)] - }, { - "range": f"{sheet_id}!A2:Z5000", - "values": df.values.tolist() - } - ] + "valueRanges": [ + {"range": f"{sheet_id}!A1:Z1", "values": [list(df)]}, + {"range": f"{sheet_id}!A2:Z5000", "values": df.values.tolist()}, + ] } try: fsf.delete_values(spreadsheet_token, sheet_id) b = fsf.update_values(spreadsheet_token, data) - if b and b['code'] == 0: + if b and b["code"] == 0: logger.success("更新飞书表格成功") return 1 else: diff --git a/czsc/signals/__init__.py b/czsc/signals/__init__.py index b9d304470..2c9ed7613 100644 --- a/czsc/signals/__init__.py +++ b/czsc/signals/__init__.py @@ -213,6 +213,7 @@ pos_holds_V240428, pos_stop_V240428, pos_take_V240428, + pos_stop_V240331, ) @@ -258,6 +259,7 @@ pressure_support_V240222, pressure_support_V240402, pressure_support_V240406, + pressure_support_V240530, ) diff --git a/czsc/signals/pos.py b/czsc/signals/pos.py index c80854dc2..353e29669 100644 --- a/czsc/signals/pos.py +++ b/czsc/signals/pos.py @@ -8,7 +8,7 @@ from czsc.analyze import CZSC from collections import OrderedDict from czsc.traders.base import CzscTrader -from czsc.utils import create_single_signal +from czsc.utils import create_single_signal, get_sub_elements from czsc.objects import Operate, Direction, Mark from czsc.signals.tas import update_ma_cache @@ -670,3 +670,68 @@ def pos_take_V240428(cat: CzscTrader, **kwargs) -> OrderedDict: v1 = "空头止盈" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def pos_stop_V240331(cat: CzscTrader, **kwargs) -> OrderedDict: + """根据最近N根K线的最高最低价止损,追踪止损,贡献者:谢磊 + + 参数模板:"{pos_name}_{freq1}#{n}_止损V240331" + + **信号逻辑:** + + 以多头止损为例,计算过程如下: + + 1. 从多头开仓点开始,在给定的K线周期 freq1 上获取最近 N 根K线,记为 bars; + 2. 计算 bars 中的最低价,记为 ll; + 3. 如果当前价格 low < ll,则多头止损。 + + 空头止损逻辑同理。 + + **信号列表:** + + - Signal('SMA5多头_15分钟#10_止损V240331_多头止损_任意_任意_0') + - Signal('SMA5空头_15分钟#10_止损V240331_空头止损_任意_任意_0') + + :param cat: CzscTrader对象 + :param kwargs: 参数字典 + + - pos_name: str,开仓信号的名称 + - freq1: str,给定的K线周期 + - n: int,观察的K线数量,默认为 10,表示观察前10根K线 + + :return: OrderedDict + """ + pos_name = kwargs["pos_name"] + n = int(kwargs.get("n", 10)) + freq1 = kwargs["freq1"] + k1, k2, k3 = f"{pos_name}_{freq1}#{n}_止损V240331".split("_") + v1 = "其他" + + # 如果没有持仓策略,则不产生信号 + if not hasattr(cat, "positions"): + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + # pos_ = [x for x in cat.positions if x.name == pos_name][0] + pos_ = cat.get_position(pos_name) + # 如果 pos 没有操作记录,或者最后一次操作是平仓,则不产生信号 + if len(pos_.operates) == 0 or pos_.operates[-1]["op"] in [Operate.SE, Operate.LE]: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + c: CZSC = cat.kas[freq1] + op = pos_.operates[-1] + bars = get_sub_elements(c.bars_raw, di=1, n=n + 1) + _bar = bars[-1] + + # 多头止损逻辑:当前价格低于前n根K线的最低价 + if op["op"] == Operate.LO: + ll = min([x.low for x in bars[:-1]]) + if _bar.low < ll and _bar.id > op["bid"]: + v1 = "多头止损" + + # 空头止损逻辑:当前价格高于前n根K线的最高价 + if op["op"] == Operate.SO: + hh = max([x.high for x in bars[:-1]]) + if _bar.high > hh and _bar.id > op["bid"]: + v1 = "空头止损" + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) diff --git a/czsc/signals/zdy.py b/czsc/signals/zdy.py index c03bbd193..6a7527c57 100644 --- a/czsc/signals/zdy.py +++ b/czsc/signals/zdy.py @@ -1343,3 +1343,85 @@ def pressure_support_V240406(c: CZSC, **kwargs) -> OrderedDict: v1 = "支撑位" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def pressure_support_V240530(c: CZSC, **kwargs) -> OrderedDict: + """支撑压力线辅助V240530 + + 参数模板:"{freq}_D{di}W{w}N{n}_支撑压力V240530" + + **信号逻辑:** + + 对于给定K线,判断是否存在支撑压力线,判断逻辑如下: + + 1. 寻找关键K线的高低点,关键K线为最近w根K线中与其他K线重叠次数最多的K线 + 2. 当前K线收盘价在关键K线最高价的正负5个价位左右,认为是压力位;反之,认为是支撑位 + + **信号列表:** + + - Signal('60分钟_D1W20N5_支撑压力V240530_支撑位_任意_任意_0') + - Signal('60分钟_D1W20N5_支撑压力V240530_压力位_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + di = int(kwargs.get("di", 1)) + w = int(kwargs.get("w", 20)) + n = int(kwargs.get("n", 5)) + assert w > 10, "参数 w 必须大于10" + + freq = c.freq.value + k1, k2, k3 = f"{freq}_D{di}W{w}N{n}_支撑压力V240530".split("_") + v1 = "其他" + if len(c.bars_raw) < w + 10: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = get_sub_elements(c.bars_raw, di=di, n=w) + + # 获取关键K线的高低点 + bar_overlap_count = {} + for i in range(len(bars)): + bar = bars[i] + overlap_count = 0 + for j in range(len(bars)): + if i == j: + continue + bar2 = bars[j] + # 判断两根K线是否重叠 + if max(bar.low, bar2.low) < min(bar.high, bar2.high): + overlap_count += 1 + bar_overlap_count[i] = overlap_count + + # 如果最大重叠次数小于总数的一半,认为没有关键K线 + if max(bar_overlap_count.values()) < 0.5 * w: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + key_bar = bars[max(bar_overlap_count, key=bar_overlap_count.get)] + # 获取窗口内的 unique price 列表 + prices = [y for x in c.bars_raw for y in [x.open, x.close, x.high, x.low]] + prices = sorted(list(set(prices))) + high_idx = prices.index(key_bar.high) + low_idx = prices.index(key_bar.low) + # 处理边界情况 + if high_idx < n or low_idx < n: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + if high_idx + n >= len(prices) or low_idx + n >= len(prices): + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + # 判断压力位:当前收盘价在关键K线最高价的正负5个价位左右 + pressure_h = prices[high_idx + n] + pressure_l = prices[high_idx - n] + + if pressure_h > bars[-1].close > pressure_l: + v1 = "压力位" + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + # 判断支撑位:当前收盘价在关键K线最低价的正负5个价位左右 + support_h = prices[low_idx + n] + support_l = prices[low_idx - n] + if support_h > bars[-1].close > support_l: + v1 = "支撑位" + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) diff --git a/czsc/utils/st_components.py b/czsc/utils/st_components.py index c29279802..98c186d97 100644 --- a/czsc/utils/st_components.py +++ b/czsc/utils/st_components.py @@ -7,6 +7,7 @@ import plotly.express as px import statsmodels.api as sm import plotly.graph_objects as go +from deprecated import deprecated from sklearn.linear_model import LinearRegression @@ -223,6 +224,7 @@ def show_sectional_ic(df, x_col, y_col, method="pearson", **kwargs): st.plotly_chart(fig, use_container_width=True) +@deprecated("请使用 czsc.show_feature_returns") def show_factor_returns(df, x_col, y_col): """使用 streamlit 展示因子收益率 @@ -252,49 +254,68 @@ def show_factor_returns(df, x_col, y_col): col2.plotly_chart(fig, use_container_width=True) -def show_factor_layering(df, x_col, y_col="n1b", **kwargs): - """使用 streamlit 绘制因子截面分层收益率图 +def show_feature_returns(df, factor, target="n1b", **kwargs): + """使用 streamlit 展示因子收益率 - :param df: 因子数据 - :param x_col: 因子列名 - :param y_col: 收益列名 + :param df: pd.DataFrame, 必须包含 dt、symbol、factor, target 列 + :param factor: str, 因子列名 + :param target: str, 预测目标收益率列名 :param kwargs: - - n: 分层数量,默认为10 - - long: 多头组合,例如 "第10层" - - short: 空头组合,例如 "第01层" + - fit_intercept: bool, 是否拟合截距,默认为 False + - fig_title: str, 图表标题,默认为 "因子收益率分析" """ - n = kwargs.get("n", 10) - if df[y_col].max() > 100: # 收益率单位为BP, 转换为万分之一 - df[y_col] = df[y_col] / 10000 + assert "dt" in df.columns, "时间列必须为 dt" + assert "symbol" in df.columns, "标的列必须为 symbol" + assert factor in df.columns, f"因子列 {factor} 不存在" + assert target in df.columns, f"目标列 {target} 不存在" - df = czsc.feture_cross_layering(df, x_col, n=n) + fit_intercept = kwargs.get("fit_intercept", False) - mr = df.groupby(["dt", f"{x_col}分层"])[y_col].mean().reset_index() - mrr = mr.pivot(index="dt", columns=f"{x_col}分层", values=y_col).fillna(0) + dft = czsc.feature_returns(df, factor, target, fit_intercept=fit_intercept) + dft.columns = ["dt", "因子收益率"] + dft["累计收益率"] = dft["因子收益率"].cumsum() - tabs = st.tabs(["分层收益率", "多空组合"]) - with tabs[0]: - czsc.show_daily_return(mrr) + fig_title = kwargs.get("fig_title", "因子截面收益率分析") - with tabs[1]: - layering_cols = mrr.columns.to_list() - with st.form(key="factor_form"): - col1, col2 = st.columns(2) - long = col1.multiselect("多头组合", layering_cols, default=[], key="factor_long") - short = col2.multiselect("空头组合", layering_cols, default=[], key="factor_short") - submit = st.form_submit_button("多空组合快速测试") + # 将因子逐K收益率 和 因子累计收益率 分左右轴,绘制在一张图上 + fig = go.Figure() + fig.add_trace(go.Bar(x=dft["dt"], y=dft["因子收益率"], name="因子收益率", yaxis="y")) + fig.add_trace( + go.Scatter( + x=dft["dt"], y=dft["累计收益率"], mode="lines", name="累计收益率", yaxis="y2", line=dict(color="red") + ) + ) + fig.update_layout( + yaxis=dict(title="因子收益率"), + yaxis2=dict(title="累计收益率", overlaying="y", side="right"), + title=fig_title, + margin=dict(l=0, r=0, b=0), + ) + st.plotly_chart(fig, use_container_width=True) - if not submit: - st.warning("请设置多空组合") - st.stop() - dfr = mrr.copy() - dfr["多头"] = dfr[long].mean(axis=1) - dfr["空头"] = -dfr[short].mean(axis=1) - dfr["多空"] = (dfr["多头"] + dfr["空头"]) / 2 - czsc.show_daily_return(dfr[["多头", "空头", "多空"]]) +def show_factor_layering(df, factor, target="n1b", **kwargs): + """使用 streamlit 绘制因子分层收益率图 + + :param df: 因子数据 + :param factor: 因子列名 + :param target: 收益列名 + :param kwargs: + + - n: 分层数量,默认为10 + + """ + n = kwargs.get("n", 10) + df = czsc.feture_cross_layering(df, factor, n=n) + + mr = df.groupby(["dt", f"{factor}分层"])[target].mean().reset_index() + mrr = mr.pivot(index="dt", columns=f"{factor}分层", values=target).fillna(0) + if "第00层" in mrr.columns: + mrr.drop(columns=["第00层"], inplace=True) + + czsc.show_daily_return(mrr, stat_hold_days=False) def show_symbol_factor_layering(df, x_col, y_col="n1b", **kwargs): @@ -1304,3 +1325,27 @@ def show_holds_backtest(df, **kwargs): if kwargs.get("show_monthly_return", True): st.write("月度累计收益") czsc.show_monthly_return(daily, ret_col="return", sub_title="") + + +def show_symbols_corr(df, factor, target="n1b", method="pearson", **kwargs): + """展示品种相关性分析 + + :param df: pd.DataFrame, 数据源,columns=['dt', 'symbol', factor, target] + :param factor: str, 因子名称 + :param target: str, 目标列名称 + :param method: str, 相关性计算方法,默认为 pearson + :param kwargs: + + - fig_title: str, 图表标题 + """ + dfc = df.copy().sort_values(["dt", "symbol"]).reset_index(drop=True) + dfr = ( + dfc.groupby("symbol") + .apply(lambda x: x[factor].corr(x[target], method=method), include_groups=False) + .reset_index() + ) + dfr.columns = ["symbol", "corr"] + dfr = dfr.sort_values("corr", ascending=False) + fig_title = kwargs.get("fig_title", f"{factor} 在品种上的相关性分布") + fig = px.bar(dfr, x="symbol", y="corr", title=fig_title, orientation="v") + st.plotly_chart(fig, use_container_width=True) diff --git a/czsc/utils/trade.py b/czsc/utils/trade.py index ec168106c..d04b23bf6 100644 --- a/czsc/utils/trade.py +++ b/czsc/utils/trade.py @@ -157,6 +157,10 @@ def update_tbars(da: pd.DataFrame, event_col: str) -> None: def resample_to_daily(df: pd.DataFrame, sdt=None, edt=None, only_trade_date=True): """将非日线数据转换为日线数据,以便进行日线级别的分析 + 使用场景: + + 1. 将周频选股结果转换为日线级别,以便进行日线级别的分析 + 函数执行逻辑: 1. 首先,函数接收一个数据框`df`,以及可选的开始日期`sdt`,结束日期`edt`,和一个布尔值`only_trade_date`。 @@ -202,3 +206,38 @@ def resample_to_daily(df: pd.DataFrame, sdt=None, edt=None, only_trade_date=True dfr = pd.concat(results, ignore_index=True) return dfr + + +def adjust_holding_weights(df, hold_periods=1, **kwargs): + """根据 hold_periods 调整截面数据的 weight 列,固定间隔调仓 + + 使用场景: + + 1. 截面选品种,固定持仓周期为 hold_periods,每隔 hold_periods 个周期调整一次仓位 + + :param df: pd.DataFrame, 截面数据, 至少包含 dt, symbol, weight, n1b 列 + + **注意:** df 中必须有原始交易中每个时刻的持仓数据,不要求时间等间隔拆分,但是 n1b 要能代表两个交易时刻之间的收益率 + + :param hold_periods: int, 固定持仓周期,大于等于1;1 表示每个交易周期调整一次仓位 + :return: pd.DataFrame + """ + assert hold_periods >= 1, "hold_periods 必须大于等于1" + if hold_periods == 1: + return df.copy() + + df = df.copy() + + # 每隔 hold_periods 个交易日调整一次仓位,获取调整期的时间列表 adjust_dts + dts = sorted(df["dt"].unique().tolist()) + adjust_dts = dts[::hold_periods] + + # 在 adjust_dts 上获取每个品种的权重,并且在 dts 上进行前向填充 + dfs = pd.pivot_table(df, index="dt", columns="symbol", values="weight").fillna(0) + dfs = dfs[dfs.index.isin(adjust_dts)] + dfs = dfs.reindex(dts, method="ffill").fillna(0).reset_index() + + # 从原始数据中获取 n1b 列,然后将 weight 列与 n1b 列进行合并 + dfw1 = pd.melt(dfs, id_vars="dt", value_vars=dfs.columns.to_list(), var_name="symbol", value_name="weight") + dfw1 = pd.merge(df[["dt", "symbol", "n1b"]], dfw1, on=["dt", "symbol"], how="left") + return dfw1 diff --git a/examples/signals_dev/merged/__init__.py b/examples/signals_dev/merged/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/signals_dev/bar_break_V240428.py b/examples/signals_dev/merged/bar_break_V240428.py similarity index 100% rename from examples/signals_dev/bar_break_V240428.py rename to examples/signals_dev/merged/bar_break_V240428.py diff --git a/examples/signals_dev/bar_plr_V240427.py b/examples/signals_dev/merged/bar_plr_V240427.py similarity index 100% rename from examples/signals_dev/bar_plr_V240427.py rename to examples/signals_dev/merged/bar_plr_V240427.py diff --git a/examples/signals_dev/bar_polyfit_V240428.py b/examples/signals_dev/merged/bar_polyfit_V240428.py similarity index 100% rename from examples/signals_dev/bar_polyfit_V240428.py rename to examples/signals_dev/merged/bar_polyfit_V240428.py diff --git a/examples/signals_dev/cxt_second_bs_V240524.py b/examples/signals_dev/merged/cxt_second_bs_V240524.py similarity index 100% rename from examples/signals_dev/cxt_second_bs_V240524.py rename to examples/signals_dev/merged/cxt_second_bs_V240524.py diff --git a/examples/signals_dev/pressure_support_V240222.py b/examples/signals_dev/merged/pressure_support_V240222.py similarity index 100% rename from examples/signals_dev/pressure_support_V240222.py rename to examples/signals_dev/merged/pressure_support_V240222.py diff --git a/examples/signals_dev/pressure_support_V240402.py b/examples/signals_dev/merged/pressure_support_V240402.py similarity index 100% rename from examples/signals_dev/pressure_support_V240402.py rename to examples/signals_dev/merged/pressure_support_V240402.py diff --git a/examples/signals_dev/pressure_support_V240406.py b/examples/signals_dev/merged/pressure_support_V240406.py similarity index 100% rename from examples/signals_dev/pressure_support_V240406.py rename to examples/signals_dev/merged/pressure_support_V240406.py diff --git a/examples/signals_dev/merged/pressure_support_V240530.py b/examples/signals_dev/merged/pressure_support_V240530.py new file mode 100644 index 000000000..00adb3755 --- /dev/null +++ b/examples/signals_dev/merged/pressure_support_V240530.py @@ -0,0 +1,102 @@ +import numpy as np +from collections import OrderedDict +from czsc.analyze import CZSC +from czsc.utils import create_single_signal, get_sub_elements +from loguru import logger as log + + +def pressure_support_V240530(c: CZSC, **kwargs) -> OrderedDict: + """支撑压力线辅助V240530 + + 参数模板:"{freq}_D{di}W{w}N{n}_支撑压力V240530" + + **信号逻辑:** + + 对于给定K线,判断是否存在支撑压力线,判断逻辑如下: + + 1. 寻找关键K线的高低点,关键K线为最近w根K线中与其他K线重叠次数最多的K线 + 2. 当前K线收盘价在关键K线最高价的正负5个价位左右,认为是压力位;反之,认为是支撑位 + + **信号列表:** + + - Signal('60分钟_D1W20N5_支撑压力V240530_支撑位_任意_任意_0') + - Signal('60分钟_D1W20N5_支撑压力V240530_压力位_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + di = int(kwargs.get("di", 1)) + w = int(kwargs.get("w", 20)) + n = int(kwargs.get("n", 5)) + assert w > 10, "参数 w 必须大于10" + + freq = c.freq.value + k1, k2, k3 = f"{freq}_D{di}W{w}N{n}_支撑压力V240530".split("_") + v1 = "其他" + if len(c.bars_raw) < w + 10: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = get_sub_elements(c.bars_raw, di=di, n=w) + + # 获取关键K线的高低点 + bar_overlap_count = {} + for i in range(len(bars)): + bar = bars[i] + overlap_count = 0 + for j in range(len(bars)): + if i == j: + continue + bar2 = bars[j] + # 判断两根K线是否重叠 + if max(bar.low, bar2.low) < min(bar.high, bar2.high): + overlap_count += 1 + bar_overlap_count[i] = overlap_count + + # 如果最大重叠次数小于总数的一半,认为没有关键K线 + if max(bar_overlap_count.values()) < 0.5 * w: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + key_bar = bars[max(bar_overlap_count, key=bar_overlap_count.get)] + # 获取窗口内的 unique price 列表 + prices = [y for x in c.bars_raw for y in [x.open, x.close, x.high, x.low]] + prices = sorted(list(set(prices))) + high_idx = prices.index(key_bar.high) + low_idx = prices.index(key_bar.low) + # 处理边界情况 + if high_idx < n or low_idx < n: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + if high_idx + n >= len(prices) or low_idx + n >= len(prices): + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + # 判断压力位:当前收盘价在关键K线最高价的正负5个价位左右 + pressure_h = prices[high_idx + n] + pressure_l = prices[high_idx - n] + + if pressure_h > bars[-1].close > pressure_l: + v1 = "压力位" + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + # 判断支撑位:当前收盘价在关键K线最低价的正负5个价位左右 + support_h = prices[low_idx + n] + support_l = prices[low_idx - n] + if support_h > bars[-1].close > support_l: + v1 = "支撑位" + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def check(): + from czsc.connectors import research + from czsc.traders.base import check_signals_acc + + symbols = research.get_symbols("A股主要指数") + bars = research.get_raw_bars(symbols[0], "15分钟", "20181101", "20210101", fq="前复权") + + signals_config = [{"name": pressure_support_V240530, "freq": "60分钟"}] + check_signals_acc(bars, signals_config=signals_config, height="780px", delta_days=5) # type: ignore + + +if __name__ == "__main__": + check() diff --git a/examples/signals_dev/tas_macd_bc_V240307.py b/examples/signals_dev/merged/tas_macd_bc_V240307.py similarity index 100% rename from examples/signals_dev/tas_macd_bc_V240307.py rename to examples/signals_dev/merged/tas_macd_bc_V240307.py