Oanda REST API

Oandaのデモ口座を開設し、Pythonで為替レートを取得しようと思ったものの、githubで公開されていたstreaming.py がPython3で動かなかったので、少し手を入れてみた。(既にforkされ誰かがPython3用に改良していたかもしれないが、確認はしていない。)

import requests
import json

from optparse import OptionParser

def connect_to_stream():
        """
        Environment           <Domain>
        fxTrade               stream-fxtrade.oanda.com
        fxTrade Practice      stream-fxpractice.oanda.com
        sandbox               stream-sandbox.oanda.com
        """
        # Replace the following variables with your personal ones
        domain = 'stream-fxpractice.oanda.com'
        access_token = 'アクセストークン'
        account_id = 'アカウントID'
        instruments = "GBP_USD"

        try:
                s = requests.Session()
                url = "https://" + domain + "/v1/prices"
                headers = {'Authorization' : 'Bearer ' + access_token,
                # 'X-Accept-Datetime-Format' : 'unix'
                }
                params = {'instruments' : instruments, 'accountId' : account_id}
                req = requests.Request('GET', url, headers = headers, params = params)
                pre = req.prepare()
                resp = s.send(pre, stream = True, verify = False)
                return resp
        except Exception as e:
                s.close()
                print("Caught exception when connecting to stream\n" + str(e) )

def demo(displayHeartbeat):
        response = connect_to_stream()
        if response.status_code != 200:
                print(response.text)
                return
        for line in response.iter_lines(1):
                if line:
                        try:
                                msg = json.loads(line.decode('utf8'))
                        except Exception as e:
                                print("Caught exception when converting message into json\n" + str(e))
                                return

                        if displayHeartbeat:
                                print(line)
                        else:
                                if msg.get("instrument"):
                                        print(line)
                                if msg.get("tick"):
                                        print(line)

def main():
        usage = "usage: %prog [options]"
        parser = OptionParser(usage)
        parser.add_option("-b", "--displayHeartBeat", dest = "verbose", action = "store_true", help = "Display HeartBeat in streaming data")
        displayHeartbeat = False
        (options, args) = parser.parse_args()
        if len(args) > 1:
                parser.error("incorrect number of arguments")
        if options.verbose:
                displayHeartbeat = True
        demo(displayHeartbeat)

if __name__ == "__main__":
        main()