Python实现TradingView中的PivotHigh和PivotLow 指标

直接上代码:

"""枢轴点 最高价"""
def PivotHigh(df, left, right=0):
    right = right if right else left
    df['pivot'] = 0.0
    for i in range(len(df)):
        if i >= left+right:
            rolling = df['High'][i-right-left:i+1].values
            m = max(rolling)
            #print(GetTime(df['Time'][i], "%m-%d %H:%M"), df['High'][i-right], m, rolling)
            if df['High'][i-right] == m:
                df['pivot'].values[i] = m
    return df['pivot']

"""枢轴点 最低价"""
def PivotLow(df, left, right=0):
    right = right if right else left
    df['rollingLow'] = df['Low'].rolling(left+right).min()
    df['pivot'] = 0.0
    for i in range(len(df)):
        if i >= left+right:
            rolling = df['Low'][i-right-left:i+1].values
            m = min(rolling)
            if df['Low'][i-right] == m:
                df['pivot'].values[i] = m
    return df['pivot']

使用:

import pandas as pd
df = pd.DataFrame(r) #r为K线数据

df['PivotHigh'] = PivotHigh(df.copy(), 10, 10)
df['PivotLow'] = PivotLow(df.copy(), 10, 10)
标签: tradingview

苏慕白 发布于  2021-12-20 19:45 

【原生代码】Pyrhon3实现VWAP成交量加权平均线 指标

1. VWAP成交量加权平均线算法:

公式: (price * Volume) / Volume


详细解释:
当日累积((High+Low+Close) / 3 * Volume)
再除以当日累积交易量


当日是指每天早上8点重置累积的量(TradingView中是这样)
有需求可以自行看代码自行修改重置的时间


2. 代码实现:

impot time

"""字典转数组"""
def GetSrc(r, name):
    if name == 'o+h+l+c':
        src = [ (v['Open'] + v['High'] + v['Low'] + v['Close']) / 4 for v in r ]

    elif name == 'h+l+c':
        src = [ (v['High'] + v['Low'] + v['Close']) / 3 for v in r ]

    elif name == 'h+l':
        src = [ (v['High'] + v['Low']) / 2 for v in r ]

    else:
        src = [ v[name] for v in r ] if name else r

    return src

""" 时间戳转换"""
def GetTime(x, style="%H:%M:%S"):
    x = int(str(x)[:10])
    time_local = time.localtime(x)
    dt = time.strftime(style,time_local)
    return dt

"""成交量加权平均线"""
def VWAP(r, dayTime='08:00', name='h+l+c',):
    src = GetSrc(r, name)
    emas = [0 for i in range(len(src))]  # 创造一个和cps一样大小的集合

    srcVol,Vol = 0, 0
    for i in range(len(src)):
        if GetTime(r[i]['Time'], "%H:%M") == dayTime: #每天早上八点重置
            Vol = r[i]['Volume']
            srcVol = src[i] * r[i]['Volume']
        else:
            Vol += r[i]['Volume']
            srcVol += src[i] * r[i]['Volume']
        emas[i] = srcVol / Vol

    return emas

3. 变量r 结构图如下:

{
    {
        'Time': 0, #时间戳
        'Close': 0,
        'Open': 0,
        'High': 0,
        'Low': 0,
        'Volume': 0, #成交量
    }
}

4. 调用:

vwap = VWAP(r)

结果基本和TradingView一致


苏慕白 发布于  2021-11-20 20:46 

【原生代码】Python3 实现TradingView中的STDEV标准差 指标

TradingView Pine原版代码

plot(stdev(close, 5))

//the same on pine
isZero(val, eps) => abs(val) <= eps

SUM(fst, snd) =>
    EPS = 1e-10
    res = fst + snd
    if isZero(res, EPS)
        res := 0
    else
        if not isZero(res, 1e-4)
            res := res
        else
            15

pine_stdev(src, length) =>
    avg = sma(src, length)
    sumOfSquareDeviations = 0.0
    for i = 0 to length - 1
        sum = SUM(src[i], -avg)
        sumOfSquareDeviations := sumOfSquareDeviations + sum * sum

    stdev = sqrt(sumOfSquareDeviations / length)
plot(pine_stdev(close, 5))

Python3实现:

import math  #求平方根

"""字典转数组"""
def GetSrc(r, name):
    if name == 'o+h+l+c':
        src = [ (v['Open'] + v['High'] + v['Low'] + v['Close']) / 4 for v in r ]

    elif name == 'h+l+c':
        src = [ (v['High'] + v['Low'] + v['Close']) / 3 for v in r ]

    elif name == 'h+l':
        src = [ (v['High'] + v['Low']) / 2 for v in r ]

    else:
        src = [ v[name] for v in r ] if name else r

    return src

#移动平均线
def SMA(r, days, name=0):
    cps = GetSrc(r, name)
    emas = [0 for i in range(len(cps))]  # 创造一个和cps一样大小的集合

    for i in range(len(cps)):
        if i < days-1:
            emas[i] = 0
        else:
            ma = 0
            for i2 in range(i-days,i):
                i2 += 1
                ma += cps[i2]
            emas[i] = ma / days
    return emas

def STDEV_isZero(val, eps):
    return abs(val) <= eps

def STDEV_SUM(fst, snd):
    res = fst + snd
    if STDEV_isZero(res, 1e-10):
        res = 0
    elif STDEV_isZero(res, 1e-4):
        res = 15
    return res

def STDEV(r, days, name=0):
    src = GetSrc(r, name)
    avg = SMA(src, days)
    stdev = [0 for i in range(len(src))]
    for i in range(len(src)):
        if i > days:
            sumOfSquareDeviations = 0
            for i2 in range(days):
                i3 = i-i2
                sum = STDEV_SUM(src[i3], -avg[i])
                sumOfSquareDeviations = sumOfSquareDeviations + sum * sum
            stdev[i] = math.sqrt(sumOfSquareDeviations / days)

    return stdev

调用方法:STDEV(r, 5, "Close")
结果基本一致

或者使用Talib里的库:

import talib
talib.STDDEV(df['Close'].values, timeperiod=200)

结果也是一样的


苏慕白 发布于  2021-11-17 20:57 

Python3计算TradingView中的linreg线性回归曲线 指标

线性回归曲线。最符合用户指定时间区间内价格的曲线。 使用最小二乘法计算。该函数结果使用以下公式计算:linreg = intercept + slope *(length - 1 - offset),其中length是y参数,offset是z参数,intercept和slope是用源系列最小二乘法计算的值 (x参数)。

代码:

import talib as tl
tl.LINEARREG(df['Close'].values, 10)  #线性回归   

结果和TradingVeiw计算的一致,但是少了个offset参数


苏慕白 发布于  2021-11-17 18:01 

【原生代码】Python3 计算DI、ADX趋向指标 指标

1.引用:

import pandas as pd
import numpy as np

2.代码

#RSI中使用的移动平均线。 它是指数加权移动平均线,alpha加权值 = 1 /长度
def RMA(r, days, name=0):
    cps = [ v[name] for v in r ] if name else r
    rmas = [0 for i in range(len(cps))]  # 创造一个和cps一样大小的集合
    alpha = 1 / days

    for i in range(len(cps)):
        if i > days-1:
            if rmas[i-1] and not np.isnan(rmas[i-1]):
                rmas[i] = alpha * cps[i] + (1 - alpha) * rmas[i-1]
            else:
                ma = 0
                for i2 in range(i-days,i):  #求平均值
                    ma += cps[i2+1]
                rmas[i] = ma / days

    return rmas

#其他MA类型看上一篇文章
def ATR(r, days, ma='sma'):
    tr = [0]
    for i in range(len(r)):
        if i:
            tr.append(max(r[i]['High']-r[i]['Low'], abs(r[i]['High'] - r[i-1]['Close']), abs(r[i]['Low'] - r[i-1]['Close'])))

    if ma == 'rma':
        return RMA(tr, days)

    if ma == 'ema':
        return EMA(tr, days)

    if ma == 'sma':
        return SMA(tr, days)

    if not ma:
        return tr

"""DI"""
def DI(r, days):
    df = pd.DataFrame(r)
    df['up'] = df['High'] - df['High'].shift(1)
    df['down'] = -(df['Low'] - df['Low'].shift(1))
    df['truerange'] = RMA(ATR(r, days=0, ma=0), days)
    df['plus'] = RMA([df['up'][i] if df['up'][i] > df['down'][i] and df['up'][i]> 0 else 0 for i in range(len(df['up']))], days) / df['truerange']
    df['minus'] = RMA([df['down'][i] if df['down'][i] > df['up'][i] and df['down'][i]> 0 else 0 for i in range(len(df['up']))], days) / df['truerange']

    df['plus'] = df['plus'].apply(float) * 100
    df['minus'] = df['minus'].apply(float) * 100
    return df['plus'], df['minus']

"""ADX趋向指标"""
def ADX(r, diDays, adxDays):
    df = pd.DataFrame(r)
    df['plus'], df['minus'] = DI(r, diDays)
    df['sum'] = df['plus'] + df['minus']
    df['adx'] = [abs(df['plus'][i] - df['minus'][i]) / (1 if df['sum'][i] == 0 else df['sum'][i]) for i in range(len(df['sum']))]
    df['adx'] = RMA(df['adx'].values, adxDays)
    df['adx'] = df['adx'].apply(float) * 100

    return df['adx'], df['plus'], df['minus']

3.使用例子

指标计算的值我都和TradingView对比过,放心使用~
r为K线数据

df['adx'], df['plus'], df['minus'] = ADX(r, 14, 14)

图片alt


苏慕白 发布于  2021-11-13 21:16 

ADX(DI)过滤开仓信号 指标

1. ADX指标可视化

//@version=5
indicator("Average Directional Index", shorttitle="ADX", format=format.price, precision=2, timeframe="", timeframe_gaps=true)
adxlen = input(14, title="ADX Smoothing")
dilen = input(14, title="DI Length")
dirmov(len) =>
    up = ta.change(high)
    down = -ta.change(low)
    plusDM = na(up) ? na : (up > down and up > 0 ? up : 0)
    minusDM = na(down) ? na : (down > up and down > 0 ? down : 0)
    truerange = ta.rma(ta.tr, len)
    plus = fixnan(100 * ta.rma(plusDM, len) / truerange)
    minus = fixnan(100 * ta.rma(minusDM, len) / truerange)
    [plus, minus]
adx(dilen, adxlen) =>
    [plus, minus] = dirmov(dilen)
    sum = plus + minus
    adx = 100 * ta.rma(math.abs(plus - minus) / (sum == 0 ? 1 : sum), adxlen)
    [adx, plus, minus]

[sig, up, down] = adx(dilen, adxlen)
plot(sig, color=color.red, title="ADX")
plot(up, color=color.blue, title="up")
plot(down, color=color.green, title="up")

hline(27)

图片alt

2. 代码过滤逻辑

说是ADX,其实只用到了ADX中的DI指标值

DI参数:19
做多:
up > down and up > 27


苏慕白 发布于  2021-11-11 22:28 

【原生代码】Python3 实现ATR、MA、EMA、SMMA、RMA、TEMA指标的计算 指标

1. 参数说明

r:K线数据,字典或者数组
days:指标长度
name:使用哪一个字段,填'Close'即可,如果不填则代表r是数组而不是字典

变量r 字典结构图如下:

{
    {
        'Time': 0,
        'Close': 0,
        'Open': 0,
        'High': 0,
        'Low': 0,
        'Volume': 0,
    }
}

2. ATR 真实波动幅度 (需配合下面的指标)

def ATR(r, days, ma='sma'):
    tr = [0]
    for i in range(len(r)):
        if i:
            tr.append(max(r[i]['High']-r[i]['Low'], abs(r[i]['High'] - r[i-1]['Close']), abs(r[i]['Low'] - r[i-1]['Close'])))

    if ma == 'rma':
        return RMA(tr, days)

    if ma == 'ema':
        return EMA(tr, days)

    return SMA(tr, days)

3. MA 移动平均线

def SMA(r, days, name=0):
    cps = [ v[name] for v in r ] if name else r
    emas = cps.copy()  # 创造一个和cps一样大小的集合

    for i in range(len(cps)):
        if i < days-1:
            emas[i] = 0
        else:
            ma = 0
            for i2 in range(i-days,i):
                i2 += 1
                ma += cps[i2]
            emas[i] = ma / days
    return emas

4. EMA 指数移动平均线

def EMA(r, days, name=0):

    if name == 'o+h+l+c':
        cps = [ (v['Open'] + v['High'] + v['Low'] + v['Close']) / 4 for v in r ]

    elif name == 'h+l+c':
        cps = [ (v['High'] + v['Low'] + v['Close']) / 3 for v in r ]

    elif name == 'h+l':
        cps = [ (v['High'] + v['Low']) / 2 for v in r ]

    else:
        cps = [ v[name] for v in r ] if name else r

    emas = [0 for i in range(len(cps))]  # 创造一个和cps一样大小的集合
    for i in range(len(cps)):
        if i == 0:
            emas[i] = cps[i]
        if i > 0:
            emas[i] = ((days - 1) * emas[i - 1] + 2 * cps[i]) / (days + 1)

    emas = [v for v in emas]
    return emas

5. SMMA 平滑移动平均线

def SMMA(r, days, name=0):
    cps = [ v[name] for v in r ] if name else r
    emas = [0 for i in range(len(cps))]  # 创造一个和cps一样大小的集合
    ma = 0
    for i in range(len(cps)):
        if i < days:
            ma += cps[i]
            emas[i] = 0

        else:
            if emas[i-1] == 0:
                emas[i] = ma / days
            else:
                emas[i] = (emas[i-1] * (days - 1) + cps[i]) / days

    emas = [v for v in emas]
    return emas

6. RMA,RSI中使用的移动平均线,它是指数加权移动平均线

def RMA(r, days, name=0):
    cps = [ v[name] for v in r ] if name else r
    rmas = [0 for i in range(len(cps))]  # 创造一个和cps一样大小的集合
    alpha = 1 / days

    for i in range(len(cps)):
        if i < days-1:
            rmas[i] = 0
        else:
            if rmas[i-1]:
                rmas[i] = alpha * cps[i] + (1 - alpha) * rmas[i-1]
            else:
                ma = 0
                for i2 in range(i-days,i):  #求平均值
                    ma += cps[i2+1]
                rmas[i] = ma / days
    return rmas

7. TEMA 三重指数平滑移动平均线

def TEMA(r, days, name='Close'):
    v2 = EMA(r, days, name)
    v3 = EMA(v2, days)
    v4 = EMA(v3, days)

    emas = []
    for i in range(len(v2)):
        ok = 3 * (v2[i] - v3[i]) + v4[i]
        emas.append(ok)
    return emas

苏慕白 发布于  2021-11-1 19:51