Oanda REST API
Oandaのデモ口座を開設し、Pythonで為替レートを取得しようと思ったものの、githubで公開されていたstreaming.py がPython3で動かなかったので、少し手を入れてみた。(既にforkされ誰かがPython3用に改良していたかもしれないが、確認はしていない。)
import requests import json from optparse import OptionParser def connect_to_stream(): """ Environment <Domain> fxTrade stream-fxtrade.oanda.com fxTrade Practice stream-fxpractice.oanda.com sandbox stream-sandbox.oanda.com """ # Replace the following variables with your personal ones domain = 'stream-fxpractice.oanda.com' access_token = 'アクセストークン' account_id = 'アカウントID' instruments = "GBP_USD" try: s = requests.Session() url = "https://" + domain + "/v1/prices" headers = {'Authorization' : 'Bearer ' + access_token, # 'X-Accept-Datetime-Format' : 'unix' } params = {'instruments' : instruments, 'accountId' : account_id} req = requests.Request('GET', url, headers = headers, params = params) pre = req.prepare() resp = s.send(pre, stream = True, verify = False) return resp except Exception as e: s.close() print("Caught exception when connecting to stream\n" + str(e) ) def demo(displayHeartbeat): response = connect_to_stream() if response.status_code != 200: print(response.text) return for line in response.iter_lines(1): if line: try: msg = json.loads(line.decode('utf8')) except Exception as e: print("Caught exception when converting message into json\n" + str(e)) return if displayHeartbeat: print(line) else: if msg.get("instrument"): print(line) if msg.get("tick"): print(line) def main(): usage = "usage: %prog [options]" parser = OptionParser(usage) parser.add_option("-b", "--displayHeartBeat", dest = "verbose", action = "store_true", help = "Display HeartBeat in streaming data") displayHeartbeat = False (options, args) = parser.parse_args() if len(args) > 1: parser.error("incorrect number of arguments") if options.verbose: displayHeartbeat = True demo(displayHeartbeat) if __name__ == "__main__": main()