Add Python SDK

This commit is contained in:
Umeda 2024-02-21 13:44:07 +09:00
parent 28f5a337d6
commit 1453655e5b
14 changed files with 1631 additions and 0 deletions

View File

@ -0,0 +1,155 @@
---
title: "Tutorial: Automate your SP-API calls using Python SDK"
excerpt: "Tutorial to automate your SP-API calls with Python SDK with Login with Amazon (LWA) token exchange and authentication."
slug: "tutorial-automate-your-sp-api-calls-using-python-sdk"
category: 659db4609baec900382f6ebe
---
This tutorial provides you with all the required details to generate a Python SDK with Login with Amazon (LWA) token exchange and authentication, to build your application seamlessly. You will learn the prerequisites required to build the Python SDK and also view an example using the Selling Partner API for Orders and the Swagger Code Generator.
You can use this SDK to integrate Amazon marketplace features into your applications, including accessing product information, managing orders, handling shipments, and more.
## Tutorial
The following tutorial will help you set up your own Python SDK for automating SP-API calls.
### Prerequisites
To complete this tutorial, you need the following prerequisites:
* A hybrid or SP-API app in draft or published state
* Integrated development environment (IDE) software
* Python (version 3.6 or later)
* `swagger-codegen-cli-2.3` (or later) This tool is used to generate Python client code from the SP-API's Swagger definitions.
Before your application can connect to the Selling Partner API, you must register it, and it must be authorized by a selling partner. If you do not have a hybrid or SP-API app, follow the steps to [register as a developer](https://developer-docs.amazon.com/sp-api/docs/registering-as-a-developer), [register your application](https://developer-docs.amazon.com/sp-api/docs/registering-your-application), and [Authorizing Selling Partner API applications](https://developer-docs.amazon.com/sp-api/docs/authorizing-selling-partner-api-applications). Then, return to this tutorial.
Next, set up your workspace for the tutorial.
### Step 1. Set up your workspace
1. On your local drive, create a directory for this project, name it `SPAPI_Python_SDK`, and navigate to the new directory.
2. Clone the [client repo](https://github.com/amzn/selling-partner-api-models/tree/main/clients).
3. Download the following tools.
- IDE software (this walkthrough uses [Visual Studio](https://visualstudio.microsoft.com/) IDE on Windows OS)
- Python (version 3.6 or later). You can download this software from [python.org](https://www.python.org/downloads/).
4. Run the following command in your terminal to download the <span class="notranslate">Swagger Code Jar</span>:
```
`wget https://repo1.maven.org/maven2/io/swagger/swagger-codegen-cli/2.4.13/swagger-codegen-cli-2.4.13.jar -O swagger-codegen-cli.jar`
```
5. Copy `swagger-codegen-cli.jar` into your local directory `C:\\SPAPI_Python_SDK`.
6. Run the following command in your terminal to install the Python `backoff` library in your environment:
```
pip install backoff
```
7. In GitHub, go to <span class="notranslate"><https://github.com/amzn/selling-partner-api-models/tree/main/models></span> and run the following command to clone the <span class="notranslate">selling-partner-api-models</span> repository to your local directory `C:\\SPAPI_Python_SDK`.
```
git clone https://github.com/amzn/selling-partner-api-models
```
Now that you have completed the required setup, the next step is to generate the Python SDK with the authentication and authorization classes downloaded to your local directory `C:\\SPAPI_Python_SDK`.
### Step 2. Generate a Python client from Swagger definitions
1. Locate the Swagger JSON file for the [SP-API API model](https://github.com/amzn/selling-partner-api-models/tree/main/models) you want to use (for example, Orders API) from your local directory `C:\\SPAPI_Python_SDK`.
2. Run the following command in your terminal to generate client code. Make sure to replace the paths and API model with your settings.
```
java -jar /[path_to_swagger_jar]/swagger-codegen-cli.jar generate -l python -t /[path_to_mustach_resources]/resources/ -D packageName=swagger_client -o /[path_to_client_folder]/client/[SP-API_NAME] -i /[path_to_model_folder]/models/[SP-API_NAME]/SP-API.json
```
Now that you have generated a Python client, you need to integrate the authentication model.
### Step 3. Integrate the authentication module
1. Locate the `auth` and `spapi` client code folders in `C:\\SPAPI_Python_SDK`, the directory where the SDK was downloaded.
2. Update the paths in the following Python files: `spapiclient.py` and `LwaRequest.py`.
With authentication set up, you're now ready to set up the Python SDK package.
### Step 4. Set up the Python SDK package
1. Navigate to `C:\\SPAPI_Python_SDK` the directory where the SDK was generated.
2. Use the following code to create a `setup.py` file. This file is required for packaging your SDK. Make sure you replace information in the example with information for your package and dependencies.
```python
from setuptools import setup, find_packages
setup(
name='SellingPartnerAPIAuthAndAuthPython', # Replace with your package's name
version='1.0.0', # Replace with your package's version
package_dir={'': 'src'}, # Replace 'src' as necessary
packages=find_packages(where='src'),
install_requires=[line.strip() for line in open("requirements.txt", "r")],
description='A Python SDK for Amazon Selling Partner API',
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
url='TBD'
)
```
With the Python SDK set up, you're now ready to interact with the SP-API endpoints.
### Step 5. Create an instance of the Orders API and call an operation
The following is an example of how to use the Python SDK with the Orders API to make a `getOrders` request. Update the code with your information and then run the code in your terminal.
```python
if __name__ == "__main__":
from auth.credentials import SPAPIConfig
config = SPAPIConfig(
client_id="Your client-id",
client_secret="Your client-secret",
refresh_token="Your refresh-token",
region="NA", # Possible values NA, EU, FE, and SANDBOX
scope = None # Required for grant_type='client_credentials' ; Possible values "sellingpartnerapi::notifications" and "sellingpartnerapi::migration"
)
from spapi.spapiclient import SPAPIClient
# Create the API Client
print("Config and client initialized...")
api_client = SPAPIClient(config)
marketplace_ids = ["ATVPDKIKX0DER"]
created_after = "2024-01-19T00:00:00"
orders_api = api_client.get_api_client('OrdersV0Api')
orders_response = orders_api.get_orders(marketplace_ids=marketplace_ids, created_after=created_after)
print("Orders API Response:")
print(orders_response)
```
> 🚧 Caution
>
> Never commit this file to your version control system as it contains sensitive information. Please ensure these LWA credentials are stored securely in an encrypted format.
A status code of 200 means the API call was successful.
## Step 6. Connect to the Selling Partner API using the generated the Python SDK
Run the following commands in your terminal to build and install your SDK locally:
```bash
python3 setup.py sdist bdist_wheel
```
```bash
pip install dist/{YourPackageName}-1.0.0-py3-none-any.whl
```
Run the following test script in your terminal to test the Python SDK:
```bash
python test.py
```
A status code of 200 means the API call was successful.
## Conclusion
In this tutorial, you learned how to automate your SP-API calls using an SP-API SDK for Python. In the walkthrough, you learned how to set up your workspace, generate a Python SDK for Selling Partner API, connect to the Orders API, and make your first API call.

View File

@ -0,0 +1,18 @@
class LwaException(Exception):
def __init__(self, error_code, error_message, cause=None):
super().__init__(f"LWA Error - Code {error_code}, Message: {error_message}")
self.error_code = error_code
self.error_message = error_message
self.cause = cause
def __str__(self):
cause_str = f", Cause: {self.cause}" if self.cause else ""
return f"LWA Error - Code: {self.error_code}, Message: {self.error_message}{cause_str}"
def get_error_code(self):
return self.error_code
def get_error_message(self):
return self.error_message

View File

@ -0,0 +1,12 @@
from enum import Enum
class LwaExceptionErrorCode(Enum):
ACCESS_DENIED = "access_denied"
INVALID_GRANT = "invalid_grant"
INVALID_REQUEST = "invalid_request"
INVALID_SCOPE = "invalid_scope"
SERVER_ERROR = "server_error"
TEMPORARILY_UNAVAILABLE = "temporarily_unavailable"
UNAUTHORIZED_CLIENT = "unauthorized_client"
INVALID_CLIENT = "invalid_client"
OTHER = "other"

View File

@ -0,0 +1,96 @@
import requests
import time
import logging
import sys
#Update path
sys.path.append("path_to_folder/SellingPartnerAPIAuthAndAuthPython")
from auth.LwaException import LwaException
from auth.LwaExceptionErrorCode import LwaExceptionErrorCode
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AccessTokenCache:
def __init__(self, max_retries=3):
self.token_info = None
self.max_retries = max_retries
def get_lwa_access_token(self, client_id, client_secret, refresh_token=None, grant_type="refresh_token", scope=None):
if self.token_info and time.time() < self.token_info["expires_at"]:
return self.token_info["access_token"]
return self.request_new_token(client_id, client_secret, refresh_token, grant_type, scope)
def request_new_token(self, client_id, client_secret, refresh_token, grant_type, scope):
self.validate_token_request(grant_type, refresh_token, scope)
data = self.prepare_token_request_data(client_id, client_secret, refresh_token, grant_type, scope)
retries = 0
while retries <= self.max_retries:
try:
response = requests.post("https://api.amazon.com/auth/o2/token", data=data)
response.raise_for_status()
token_response = response.json()
token_response["expires_at"] = time.time() + token_response.get("expires_in", 1800) - 30
self.token_info = token_response
return token_response["access_token"]
except requests.RequestException as e:
if retries < self.max_retries:
retries += 1
time.sleep(2 ** retries)
continue
error_code = self.map_http_status_to_lwa_exception_code(e.response.status_code if e.response else None)
error_message = f"Token request failed with status code {e.response.status_code}: {e.response.text}" if e.response else "Token request failed."
logger.error(error_message)
raise LwaException(error_code, error_message) from e
def validate_token_request(self, grant_type, refresh_token, scope):
if grant_type == "refresh_token" and not refresh_token:
raise LwaException(LwaExceptionErrorCode.INVALID_REQUEST.value, "Refresh token must be provided for grant_type 'refresh_token'")
if grant_type == "client_credentials" and not scope:
raise LwaException(LwaExceptionErrorCode.INVALID_SCOPE.value, "Scope must be provided for grant_type 'client_credentials'")
def prepare_token_request_data(self, client_id, client_secret, refresh_token, grant_type, scope):
data = {
"client_id": client_id,
"client_secret": client_secret,
"grant_type": grant_type
}
if grant_type == "refresh_token":
if not refresh_token:
raise LwaException(LwaExceptionErrorCode.INVALID_REQUEST.value, "Refresh token must be provided for grant_type 'refresh_token'")
data["refresh_token"] = refresh_token
elif grant_type == "client_credentials":
if not scope:
raise LwaException(LwaExceptionErrorCode.INVALID_SCOPE.value, "Scope must be provided for grant_type 'client_credentials'")
data["scope"] = scope
return data
def is_retriable(self, error_code):
retriable_codes = [
LwaExceptionErrorCode.SERVER_ERROR.value,
LwaExceptionErrorCode.TEMPORARILY_UNAVAILABLE.value
]
return error_code in retriable_codes
def format_error_message(self, e):
return f"Token request failed with status code {e.response.status_code}: {e.response.text}" if e.response else f"Token request failed: {e}"
def map_http_status_to_lwa_exception_code(self, status_code):
if status_code is None:
return LWAExceptionErrorCode.SERVER_ERROR.value
if status_code == 400:
return LWAExceptionErrorCode.INVALID_REQUEST.value
if status_code == 401:
return LWAExceptionErrorCode.UNAUTHORIZED_CLIENT.value
if status_code == 403:
return LWAExceptionErrorCode.ACCESS_DENIED.value
if status_code == 500:
return LWAExceptionErrorCode.SERVER_ERROR.value
if status_code == 503:
return LWAExceptionErrorCode.TEMPORARILY_UNAVAILABLE.value
return LWAExceptionErrorCode.OTHER.value

View File

@ -0,0 +1,8 @@
class SPAPIConfig:
def __init__(self, client_id, client_secret, refresh_token, region="SANDBOX", access_token=None, scope=None):
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self.region = region
self.scope = scope
self.access_token = access_token # Initially empty, filled by LWA request method

View File

@ -0,0 +1,38 @@
arrow==1.2.3
attrs==23.1.0
boto3==1.26.151
botocore==1.29.151
bravado==11.0.3
bravado-core==5.17.1
cachetools==5.3.1
certifi==2023.5.7
charset-normalizer==3.1.0
chevron==0.14.0
confuse==2.0.1
fqdn==1.5.1
idna==3.4
isoduration==20.11.0
jmespath==1.0.1
jsonpointer==2.4
jsonref==1.1.0
jsonschema==4.18.2
jsonschema-specifications==2023.6.1
monotonic==1.6
msgpack==1.0.5
python-amazon-sp-api==1.0.2
python-dateutil==2.8.2
pytz==2023.3
PyYAML==6.0
referencing==0.29.1
requests==2.31.0
rfc3339-validator==0.1.4
rfc3987==1.3.8
rpds-py==0.8.10
s3transfer==0.6.1
simplejson==3.19.1
six==1.16.0
swagger-spec-validator==3.0.3
typing_extensions==4.7.1
uri-template==1.3.0
urllib3==1.26.16
webcolors==1.13

View File

@ -0,0 +1,214 @@
# coding: utf-8
{{>partial_header}}
from __future__ import absolute_import
import re # noqa: F401
# python 2 and python 3 compatibility library
import six
from {{packageName}}.api_client import ApiClient
{{#operations}}
class {{classname}}(object):
"""NOTE: This class is auto generated by the swagger code generator program.
Do not edit the class manually.
Ref: https://github.com/swagger-api/swagger-codegen
"""
def __init__(self, api_client=None):
if api_client is None:
api_client = ApiClient()
self.api_client = api_client
{{#operation}}
def {{operationId}}(self, {{#sortParamsByRequiredFlag}}{{#allParams}}{{#required}}{{paramName}}, {{/required}}{{/allParams}}{{/sortParamsByRequiredFlag}}**kwargs): # noqa: E501
"""{{#summary}}{{{.}}}{{/summary}}{{^summary}}{{operationId}}{{/summary}} # noqa: E501
{{#notes}}
{{{notes}}} # noqa: E501
{{/notes}}
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
{{#sortParamsByRequiredFlag}}
>>> thread = api.{{operationId}}({{#allParams}}{{#required}}{{paramName}}, {{/required}}{{/allParams}}async_req=True)
{{/sortParamsByRequiredFlag}}
{{^sortParamsByRequiredFlag}}
>>> thread = api.{{operationId}}({{#allParams}}{{#required}}{{paramName}}={{paramName}}_value, {{/required}}{{/allParams}}async_req=True)
{{/sortParamsByRequiredFlag}}
>>> result = thread.get()
:param async_req bool
{{#allParams}}
:param {{dataType}} {{paramName}}:{{#description}} {{{description}}}{{/description}}{{#required}} (required){{/required}}{{#optional}}(optional){{/optional}}
{{/allParams}}
:return: {{#returnType}}{{returnType}}{{/returnType}}{{^returnType}}None{{/returnType}}
If the method is called asynchronously,
returns the request thread.
"""
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
return self.{{operationId}}_with_http_info({{#sortParamsByRequiredFlag}}{{#allParams}}{{#required}}{{paramName}}, {{/required}}{{/allParams}}{{/sortParamsByRequiredFlag}}**kwargs) # noqa: E501
else:
(data) = self.{{operationId}}_with_http_info({{#sortParamsByRequiredFlag}}{{#allParams}}{{#required}}{{paramName}}, {{/required}}{{/allParams}}{{/sortParamsByRequiredFlag}}**kwargs) # noqa: E501
return data
def {{operationId}}_with_http_info(self, {{#sortParamsByRequiredFlag}}{{#allParams}}{{#required}}{{paramName}}, {{/required}}{{/allParams}}{{/sortParamsByRequiredFlag}}**kwargs): # noqa: E501
"""{{#summary}}{{{.}}}{{/summary}}{{^summary}}{{operationId}}{{/summary}} # noqa: E501
{{#notes}}
{{{notes}}} # noqa: E501
{{/notes}}
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
{{#sortParamsByRequiredFlag}}
>>> thread = api.{{operationId}}_with_http_info({{#allParams}}{{#required}}{{paramName}}, {{/required}}{{/allParams}}async_req=True)
{{/sortParamsByRequiredFlag}}
{{^sortParamsByRequiredFlag}}
>>> thread = api.{{operationId}}_with_http_info({{#allParams}}{{#required}}{{paramName}}={{paramName}}_value, {{/required}}{{/allParams}}async_req=True)
{{/sortParamsByRequiredFlag}}
>>> result = thread.get()
:param async_req bool
{{#allParams}}
:param {{dataType}} {{paramName}}:{{#description}} {{{description}}}{{/description}}{{#required}} (required){{/required}}{{#optional}}(optional{{#defaultValue}}, default to {{{.}}}{{/defaultValue}}){{/optional}}
{{/allParams}}
:return: {{#returnType}}{{returnType}}{{/returnType}}{{^returnType}}None{{/returnType}}
If the method is called asynchronously,
returns the request thread.
"""
all_params = [{{#allParams}}'{{paramName}}'{{#hasMore}}, {{/hasMore}}{{/allParams}}] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
all_params.append('_request_timeout')
params = locals()
for key, val in six.iteritems(params['kwargs']):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s'"
" to method {{operationId}}" % key
)
params[key] = val
del params['kwargs']
{{#allParams}}
{{#required}}
# verify the required parameter '{{paramName}}' is set
if self.api_client.client_side_validation and ('{{paramName}}' not in params or
params['{{paramName}}'] is None): # noqa: E501
raise ValueError("Missing the required parameter `{{paramName}}` when calling `{{operationId}}`") # noqa: E501
{{/required}}
{{/allParams}}
{{#allParams}}
{{#hasValidation}}
{{#maxLength}}
if self.api_client.client_side_validation and ('{{paramName}}' in params and
len(params['{{paramName}}']) > {{maxLength}}):
raise ValueError("Invalid value for parameter `{{paramName}}` when calling `{{operationId}}`, length must be less than or equal to `{{maxLength}}`") # noqa: E501
{{/maxLength}}
{{#minLength}}
if self.api_client.client_side_validation and ('{{paramName}}' in params and
len(params['{{paramName}}']) < {{minLength}}):
raise ValueError("Invalid value for parameter `{{paramName}}` when calling `{{operationId}}`, length must be greater than or equal to `{{minLength}}`") # noqa: E501
{{/minLength}}
{{#maximum}}
if self.api_client.client_side_validation and ('{{paramName}}' in params and params['{{paramName}}'] >{{#exclusiveMaximum}}={{/exclusiveMaximum}} {{maximum}}): # noqa: E501
raise ValueError("Invalid value for parameter `{{paramName}}` when calling `{{operationId}}`, must be a value less than {{^exclusiveMaximum}}or equal to {{/exclusiveMaximum}}`{{maximum}}`") # noqa: E501
{{/maximum}}
{{#minimum}}
if self.api_client.client_side_validation and ('{{paramName}}' in params and params['{{paramName}}'] <{{#exclusiveMinimum}}={{/exclusiveMinimum}} {{minimum}}): # noqa: E501
raise ValueError("Invalid value for parameter `{{paramName}}` when calling `{{operationId}}`, must be a value greater than {{^exclusiveMinimum}}or equal to {{/exclusiveMinimum}}`{{minimum}}`") # noqa: E501
{{/minimum}}
{{#pattern}}
if self.api_client.client_side_validation and ('{{paramName}}' in params and not re.search(r'{{{vendorExtensions.x-regex}}}', params['{{paramName}}']{{#vendorExtensions.x-modifiers}}{{#-first}}, flags={{/-first}}re.{{.}}{{^-last}} | {{/-last}}{{/vendorExtensions.x-modifiers}})): # noqa: E501
raise ValueError("Invalid value for parameter `{{paramName}}` when calling `{{operationId}}`, must conform to the pattern `{{{pattern}}}`") # noqa: E501
{{/pattern}}
{{#maxItems}}
if self.api_client.client_side_validation and ('{{paramName}}' in params and
len(params['{{paramName}}']) > {{maxItems}}):
raise ValueError("Invalid value for parameter `{{paramName}}` when calling `{{operationId}}`, number of items must be less than or equal to `{{maxItems}}`") # noqa: E501
{{/maxItems}}
{{#minItems}}
if self.api_client.client_side_validation and ('{{paramName}}' in params and
len(params['{{paramName}}']) < {{minItems}}):
raise ValueError("Invalid value for parameter `{{paramName}}` when calling `{{operationId}}`, number of items must be greater than or equal to `{{minItems}}`") # noqa: E501
{{/minItems}}
{{/hasValidation}}
{{#-last}}
{{/-last}}
{{/allParams}}
collection_formats = {}
path_params = {}
{{#pathParams}}
if '{{paramName}}' in params:
path_params['{{baseName}}'] = params['{{paramName}}']{{#isListContainer}} # noqa: E501
collection_formats['{{baseName}}'] = '{{collectionFormat}}'{{/isListContainer}} # noqa: E501
{{/pathParams}}
query_params = []
{{#queryParams}}
if '{{paramName}}' in params:
query_params.append(('{{baseName}}', params['{{paramName}}'])){{#isListContainer}} # noqa: E501
collection_formats['{{baseName}}'] = '{{collectionFormat}}'{{/isListContainer}} # noqa: E501
{{/queryParams}}
header_params = {}
{{#headerParams}}
if '{{paramName}}' in params:
header_params['{{baseName}}'] = params['{{paramName}}']{{#isListContainer}} # noqa: E501
collection_formats['{{baseName}}'] = '{{collectionFormat}}'{{/isListContainer}} # noqa: E501
{{/headerParams}}
form_params = []
local_var_files = {}
{{#formParams}}
if '{{paramName}}' in params:
{{#notFile}}form_params.append(('{{baseName}}', params['{{paramName}}'])){{/notFile}}{{#isFile}}local_var_files['{{baseName}}'] = params['{{paramName}}']{{/isFile}}{{#isListContainer}} # noqa: E501
collection_formats['{{baseName}}'] = '{{collectionFormat}}'{{/isListContainer}} # noqa: E501
{{/formParams}}
body_params = None
{{#bodyParam}}
if '{{paramName}}' in params:
body_params = params['{{paramName}}']
{{/bodyParam}}
{{#hasProduces}}
# HTTP header `Accept`
header_params['Accept'] = self.api_client.select_header_accept(
[{{#produces}}'{{{mediaType}}}'{{#hasMore}}, {{/hasMore}}{{/produces}}]) # noqa: E501
{{/hasProduces}}
{{#hasConsumes}}
# HTTP header `Content-Type`
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
[{{#consumes}}'{{{mediaType}}}'{{#hasMore}}, {{/hasMore}}{{/consumes}}]) # noqa: E501
{{/hasConsumes}}
# Authentication setting
auth_settings = [{{#authMethods}}'{{name}}'{{#hasMore}}, {{/hasMore}}{{/authMethods}}] # noqa: E501
return self.api_client.call_api(
'{{{path}}}', '{{httpMethod}}',
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type={{#returnType}}'{{returnType}}'{{/returnType}}{{^returnType}}None{{/returnType}}, # noqa: E501
auth_settings=auth_settings,
async_req=params.get('async_req'),
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)
{{/operation}}
{{/operations}}

View File

@ -0,0 +1,647 @@
# coding: utf-8
{{>partial_header}}
from __future__ import absolute_import
import datetime
import json
import mimetypes
from multiprocessing.pool import ThreadPool
import os
import re
import tempfile
# python 2 and python 3 compatibility library
import six
from six.moves.urllib.parse import quote
{{#tornado}}
import tornado.gen
{{/tornado}}
from {{packageName}}.configuration import Configuration
import {{modelPackage}}
from {{packageName}} import rest
class ApiClient(object):
"""Generic API client for Swagger client library builds.
Swagger generic API client. This client handles the client-
server communication, and is invariant across implementations. Specifics of
the methods and models for each application are generated from the Swagger
templates.
NOTE: This class is auto generated by the swagger code generator program.
Ref: https://github.com/swagger-api/swagger-codegen
Do not edit the class manually.
:param configuration: .Configuration object for this client
:param header_name: a header to pass when making calls to the API.
:param header_value: a header value to pass when making calls to
the API.
:param cookie: a cookie to include in the header when making calls
to the API
"""
PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types
NATIVE_TYPES_MAPPING = {
'int': int,
'long': int if six.PY3 else long, # noqa: F821
'float': float,
'str': str,
'bool': bool,
'date': datetime.date,
'datetime': datetime.datetime,
'object': object,
}
def __init__(self, configuration=None, header_name=None, header_value=None,
cookie=None):
if configuration is None:
configuration = Configuration()
self.configuration = configuration
# Use the pool property to lazily initialize the ThreadPool.
self._pool = None
self.rest_client = rest.RESTClientObject(configuration)
self.default_headers = {}
if header_name is not None:
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = '{{#httpUserAgent}}{{{.}}}{{/httpUserAgent}}{{^httpUserAgent}}Swagger-Codegen/{{{packageVersion}}}/python{{/httpUserAgent}}'
self.client_side_validation = configuration.client_side_validation
def __del__(self):
if self._pool is not None:
self._pool.close()
self._pool.join()
@property
def pool(self):
if self._pool is None:
self._pool = ThreadPool()
return self._pool
@property
def user_agent(self):
"""User agent for this API client"""
return self.default_headers['User-Agent']
@user_agent.setter
def user_agent(self, value):
self.default_headers['User-Agent'] = value
def set_default_header(self, header_name, header_value):
self.default_headers[header_name] = header_value
{{#tornado}}
@tornado.gen.coroutine
{{/tornado}}
{{#asyncio}}async {{/asyncio}}def __call_api(
self, resource_path, method, path_params=None,
query_params=None, header_params=None, body=None, post_params=None,
files=None, response_type=None, auth_settings=None,
_return_http_data_only=None, collection_formats=None,
_preload_content=True, _request_timeout=None):
config = self.configuration
# header parameters
header_params = header_params or {}
header_params.update(self.default_headers)
if self.configuration.access_token:
header_params['x-amz-access-token'] = self.configuration.access_token
if self.cookie:
header_params['Cookie'] = self.cookie
if header_params:
header_params = self.sanitize_for_serialization(header_params)
header_params = dict(self.parameters_to_tuples(header_params,
collection_formats))
# path parameters
if path_params:
path_params = self.sanitize_for_serialization(path_params)
path_params = self.parameters_to_tuples(path_params,
collection_formats)
for k, v in path_params:
# specified safe chars, encode everything
resource_path = resource_path.replace(
'{%s}' % k,
quote(str(v), safe=config.safe_chars_for_path_param)
)
# query parameters
if query_params:
query_params = self.sanitize_for_serialization(query_params)
query_params = self.parameters_to_tuples(query_params,
collection_formats)
# post parameters
if post_params or files:
post_params = self.prepare_post_parameters(post_params, files)
post_params = self.sanitize_for_serialization(post_params)
post_params = self.parameters_to_tuples(post_params,
collection_formats)
# auth setting
self.update_params_for_auth(header_params, query_params, auth_settings)
# body
if body:
body = self.sanitize_for_serialization(body)
# request url
url = self.configuration.host + resource_path
# perform request and return response
response_data = {{#asyncio}}await {{/asyncio}}{{#tornado}}yield {{/tornado}}self.request(
method, url, query_params=query_params, headers=header_params,
post_params=post_params, body=body,
_preload_content=_preload_content,
_request_timeout=_request_timeout)
self.last_response = response_data
return_data = response_data
if _preload_content:
# deserialize response data
if response_type:
return_data = self.deserialize(response_data, response_type)
else:
return_data = None
{{^tornado}}
if _return_http_data_only:
return (return_data)
else:
return (return_data, response_data.status,
response_data.getheaders())
{{/tornado}}
{{#tornado}}
if _return_http_data_only:
raise tornado.gen.Return(return_data)
else:
raise tornado.gen.Return((return_data, response_data.status,
response_data.getheaders()))
{{/tornado}}
def sanitize_for_serialization(self, obj):
"""Builds a JSON POST object.
If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime, datetime.date
convert to string in iso8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is swagger model, return the properties dict.
:param obj: The data to serialize.
:return: The serialized form of data.
"""
if obj is None:
return None
elif isinstance(obj, self.PRIMITIVE_TYPES):
return obj
elif isinstance(obj, list):
return [self.sanitize_for_serialization(sub_obj)
for sub_obj in obj]
elif isinstance(obj, tuple):
return tuple(self.sanitize_for_serialization(sub_obj)
for sub_obj in obj)
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
if isinstance(obj, dict):
obj_dict = obj
else:
# Convert model obj to dict except
# attributes `swagger_types`, `attribute_map`
# and attributes which value is not None.
# Convert attribute name to json key in
# model definition for request.
obj_dict = {obj.attribute_map[attr]: getattr(obj, attr)
for attr, _ in six.iteritems(obj.swagger_types)
if getattr(obj, attr) is not None}
return {key: self.sanitize_for_serialization(val)
for key, val in six.iteritems(obj_dict)}
def deserialize(self, response, response_type):
"""Deserializes response into an object.
:param response: RESTResponse object to be deserialized.
:param response_type: class literal for
deserialized object, or string of class name.
:return: deserialized object.
"""
# handle file downloading
# save response body into a tmp file and return the instance
if response_type == "file":
return self.__deserialize_file(response)
# fetch data from response object
try:
data = json.loads(response.data)
except ValueError:
data = response.data
return self.__deserialize(data, response_type)
def __deserialize(self, data, klass):
"""Deserializes dict, list, str into an object.
:param data: dict, list or str.
:param klass: class literal, or string of class name.
:return: object.
"""
if data is None:
return None
if type(klass) == str:
if klass.startswith('list['):
sub_kls = re.match(r'list\[(.*)\]', klass).group(1)
return [self.__deserialize(sub_data, sub_kls)
for sub_data in data]
if klass.startswith('dict('):
sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2)
return {k: self.__deserialize(v, sub_kls)
for k, v in six.iteritems(data)}
# convert str to class
if klass in self.NATIVE_TYPES_MAPPING:
klass = self.NATIVE_TYPES_MAPPING[klass]
else:
klass = getattr({{modelPackage}}, klass)
if klass in self.PRIMITIVE_TYPES:
return self.__deserialize_primitive(data, klass)
elif klass == object:
return self.__deserialize_object(data)
elif klass == datetime.date:
return self.__deserialize_date(data)
elif klass == datetime.datetime:
return self.__deserialize_datatime(data)
else:
return self.__deserialize_model(data, klass)
def call_api(self, resource_path, method,
path_params=None, query_params=None, header_params=None,
body=None, post_params=None, files=None,
response_type=None, auth_settings=None, async_req=None,
_return_http_data_only=None, collection_formats=None,
_preload_content=True, _request_timeout=None):
"""Makes the HTTP request (synchronous) and returns deserialized data.
To make an async request, set the async_req parameter.
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param auth_settings list: Auth Settings names for the request.
:param response: Response data type.
:param files dict: key -> filename, value -> filepath,
for `multipart/form-data`.
:param async_req bool: execute request asynchronously
:param _return_http_data_only: response data without head status code
and headers
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return:
If async_req parameter is True,
the request will be called asynchronously.
The method will return the request thread.
If parameter async_req is False or missing,
then the method will return the response directly.
"""
if not async_req:
return self.__call_api(resource_path, method,
path_params, query_params, header_params,
body, post_params, files,
response_type, auth_settings,
_return_http_data_only, collection_formats,
_preload_content, _request_timeout)
else:
thread = self.pool.apply_async(self.__call_api, (resource_path,
method, path_params, query_params,
header_params, body,
post_params, files,
response_type, auth_settings,
_return_http_data_only,
collection_formats,
_preload_content, _request_timeout))
return thread
def request(self, method, url, query_params=None, headers=None,
post_params=None, body=None, _preload_content=True,
_request_timeout=None):
"""Makes the HTTP request using RESTClient."""
if method == "GET":
return self.rest_client.GET(url,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
headers=headers)
elif method == "HEAD":
return self.rest_client.HEAD(url,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
headers=headers)
elif method == "OPTIONS":
return self.rest_client.OPTIONS(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
elif method == "POST":
return self.rest_client.POST(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
elif method == "PUT":
return self.rest_client.PUT(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
elif method == "PATCH":
return self.rest_client.PATCH(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
elif method == "DELETE":
return self.rest_client.DELETE(url,
query_params=query_params,
headers=headers,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
else:
raise ValueError(
"http method must be `GET`, `HEAD`, `OPTIONS`,"
" `POST`, `PATCH`, `PUT` or `DELETE`."
)
def parameters_to_tuples(self, params, collection_formats):
"""Get parameters as list of tuples, formatting collections.
:param params: Parameters as dict or list of two-tuples
:param dict collection_formats: Parameter collection formats
:return: Parameters as list of tuples, collections formatted
"""
new_params = []
if collection_formats is None:
collection_formats = {}
for k, v in six.iteritems(params) if isinstance(params, dict) else params: # noqa: E501
if k in collection_formats:
collection_format = collection_formats[k]
if collection_format == 'multi':
new_params.extend((k, value) for value in v)
else:
if collection_format == 'ssv':
delimiter = ' '
elif collection_format == 'tsv':
delimiter = '\t'
elif collection_format == 'pipes':
delimiter = '|'
else: # csv is the default
delimiter = ','
new_params.append(
(k, delimiter.join(str(value) for value in v)))
else:
new_params.append((k, v))
return new_params
def prepare_post_parameters(self, post_params=None, files=None):
"""Builds form parameters.
:param post_params: Normal form parameters.
:param files: File parameters.
:return: Form parameters with files.
"""
params = []
if post_params:
params = post_params
if files:
for k, v in six.iteritems(files):
if not v:
continue
file_names = v if type(v) is list else [v]
for n in file_names:
with open(n, 'rb') as f:
filename = os.path.basename(f.name)
filedata = f.read()
mimetype = (mimetypes.guess_type(filename)[0] or
'application/octet-stream')
params.append(
tuple([k, tuple([filename, filedata, mimetype])]))
return params
def select_header_accept(self, accepts):
"""Returns `Accept` based on an array of accepts provided.
:param accepts: List of headers.
:return: Accept (e.g. application/json).
"""
if not accepts:
return
accepts = [x.lower() for x in accepts]
if 'application/json' in accepts:
return 'application/json'
else:
return ', '.join(accepts)
def select_header_content_type(self, content_types):
"""Returns `Content-Type` based on an array of content_types provided.
:param content_types: List of content-types.
:return: Content-Type (e.g. application/json).
"""
if not content_types:
return 'application/json'
content_types = [x.lower() for x in content_types]
if 'application/json' in content_types or '*/*' in content_types:
return 'application/json'
else:
return content_types[0]
def update_params_for_auth(self, headers, querys, auth_settings):
"""Updates header and query params based on authentication setting.
:param headers: Header parameters dict to be updated.
:param querys: Query parameters tuple list to be updated.
:param auth_settings: Authentication setting identifiers list.
"""
if not auth_settings:
return
for auth in auth_settings:
auth_setting = self.configuration.auth_settings().get(auth)
if auth_setting:
if not auth_setting['value']:
continue
elif auth_setting['in'] == 'header':
headers[auth_setting['key']] = auth_setting['value']
elif auth_setting['in'] == 'query':
querys.append((auth_setting['key'], auth_setting['value']))
else:
raise ValueError(
'Authentication token must be in `query` or `header`'
)
def __deserialize_file(self, response):
"""Deserializes body to file
Saves response body into a file in a temporary folder,
using the filename from the `Content-Disposition` header if provided.
:param response: RESTResponse.
:return: file path.
"""
fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path)
os.close(fd)
os.remove(path)
content_disposition = response.getheader("Content-Disposition")
if content_disposition:
filename = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?',
content_disposition).group(1)
path = os.path.join(os.path.dirname(path), filename)
with open(path, {{^writeBinary}}"w"{{/writeBinary}}{{#writeBinary}}"wb"{{/writeBinary}}) as f:
f.write(response.data)
return path
def __deserialize_primitive(self, data, klass):
"""Deserializes string to primitive type.
:param data: str.
:param klass: class literal.
:return: int, long, float, str, bool.
"""
try:
return klass(data)
except UnicodeEncodeError:
return six.text_type(data)
except TypeError:
return data
def __deserialize_object(self, value):
"""Return a original value.
:return: object.
"""
return value
def __deserialize_date(self, string):
"""Deserializes string to date.
:param string: str.
:return: date.
"""
try:
from dateutil.parser import parse
return parse(string).date()
except ImportError:
return string
except ValueError:
raise rest.ApiException(
status=0,
reason="Failed to parse `{0}` as date object".format(string)
)
def __deserialize_datatime(self, string):
"""Deserializes string to datetime.
The string should be in iso8601 datetime format.
:param string: str.
:return: datetime.
"""
try:
from dateutil.parser import parse
return parse(string)
except ImportError:
return string
except ValueError:
raise rest.ApiException(
status=0,
reason=(
"Failed to parse `{0}` as datetime object"
.format(string)
)
)
def __hasattr(self, object, name):
return name in object.__class__.__dict__
def __deserialize_model(self, data, klass):
"""Deserializes list or dict to model.
:param data: dict, list.
:param klass: class literal.
:return: model object.
"""
if (not klass.swagger_types and
not self.__hasattr(klass, 'get_real_child_model')):
return data
kwargs = {}
if klass.swagger_types is not None:
for attr, attr_type in six.iteritems(klass.swagger_types):
if (data is not None and
klass.attribute_map[attr] in data and
isinstance(data, (list, dict))):
value = data[klass.attribute_map[attr]]
kwargs[attr] = self.__deserialize(value, attr_type)
instance = klass(**kwargs)
if (isinstance(instance, dict) and
klass.swagger_types is not None and
isinstance(data, dict)):
for key, value in data.items():
if key not in klass.swagger_types:
instance[key] = value
if self.__hasattr(instance, 'get_real_child_model'):
klass_name = instance.get_real_child_model(data)
if klass_name:
instance = self.__deserialize(data, klass_name)
return instance

View File

@ -0,0 +1,37 @@
# coding: utf-8
{{>partial_header}}
from __future__ import absolute_import
import unittest
import {{packageName}}
from {{apiPackage}}.{{classVarName}} import {{classname}} # noqa: E501
from {{packageName}}.rest import ApiException
class {{#operations}}Test{{classname}}(unittest.TestCase):
"""{{classname}} unit test stubs"""
def setUp(self):
self.api = {{apiPackage}}.{{classVarName}}.{{classname}}() # noqa: E501
def tearDown(self):
pass
{{#operation}}
def test_{{operationId}}(self):
"""Test case for {{{operationId}}}
{{#summary}}
{{{summary}}} # noqa: E501
{{/summary}}
"""
pass
{{/operation}}
{{/operations}}
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,271 @@
# coding: utf-8
{{>partial_header}}
from __future__ import absolute_import
import copy
import logging
import multiprocessing
import sys
import urllib3
import six
from six.moves import http_client as httplib
class Configuration(object):
"""NOTE: This class is auto generated by the swagger code generator program.
Ref: https://github.com/swagger-api/swagger-codegen
Do not edit the class manually.
"""
_default = None
def __init__(self):
"""Constructor"""
if self._default:
for key in self._default.__dict__.keys():
self.__dict__[key] = copy.copy(self._default.__dict__[key])
return
# Default Base url
self.host = "{{{basePath}}}"
# Temp file folder for downloading files
self.temp_folder_path = None
# Authentication Settings
# dict to store API key(s)
self.api_key = {}
# dict to store API prefix (e.g. Bearer)
self.api_key_prefix = {}
# function to refresh API key if expired
self.refresh_api_key_hook = None
# Username for HTTP basic authentication
self.username = ""
# Password for HTTP basic authentication
self.password = ""
{{#authMethods}}{{#isOAuth}}
# access token for OAuth
self.access_token = ""
{{/isOAuth}}{{/authMethods}}
# Logging Settings
self.logger = {}
self.logger["package_logger"] = logging.getLogger("{{packageName}}")
self.logger["urllib3_logger"] = logging.getLogger("urllib3")
# Log format
self.logger_format = '%(asctime)s %(levelname)s %(message)s'
# Log stream handler
self.logger_stream_handler = None
# Log file handler
self.logger_file_handler = None
# Debug file location
self.logger_file = None
# Debug switch
self.debug = False
# SSL/TLS verification
# Set this to false to skip verifying SSL certificate when calling API
# from https server.
self.verify_ssl = True
# Set this to customize the certificate file to verify the peer.
self.ssl_ca_cert = None
# client certificate file
self.cert_file = None
# client key file
self.key_file = None
# Set this to True/False to enable/disable SSL hostname verification.
self.assert_hostname = None
# urllib3 connection pool's maximum number of connections saved
# per pool. urllib3 uses 1 connection as default value, but this is
# not the best value when you are making a lot of possibly parallel
# requests to the same host, which is often the case here.
# cpu_count * 5 is used as default value to increase performance.
self.connection_pool_maxsize = multiprocessing.cpu_count() * 5
# Proxy URL
self.proxy = None
# Safe chars for path_param
self.safe_chars_for_path_param = ''
# Disable client side validation
self.client_side_validation = True
@classmethod
def set_default(cls, default):
cls._default = default
@property
def logger_file(self):
"""The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
"""
return self.__logger_file
@logger_file.setter
def logger_file(self, value):
"""The logger file.
If the logger_file is None, then add stream handler and remove file
handler. Otherwise, add file handler and remove stream handler.
:param value: The logger_file path.
:type: str
"""
self.__logger_file = value
if self.__logger_file:
# If set logging file,
# then add file handler and remove stream handler.
self.logger_file_handler = logging.FileHandler(self.__logger_file)
self.logger_file_handler.setFormatter(self.logger_formatter)
for _, logger in six.iteritems(self.logger):
logger.addHandler(self.logger_file_handler)
if self.logger_stream_handler:
logger.removeHandler(self.logger_stream_handler)
else:
# If not set logging file,
# then add stream handler and remove file handler.
self.logger_stream_handler = logging.StreamHandler()
self.logger_stream_handler.setFormatter(self.logger_formatter)
for _, logger in six.iteritems(self.logger):
logger.addHandler(self.logger_stream_handler)
if self.logger_file_handler:
logger.removeHandler(self.logger_file_handler)
@property
def debug(self):
"""Debug status
:param value: The debug status, True or False.
:type: bool
"""
return self.__debug
@debug.setter
def debug(self, value):
"""Debug status
:param value: The debug status, True or False.
:type: bool
"""
self.__debug = value
if self.__debug:
# if debug status is True, turn on debug logging
for _, logger in six.iteritems(self.logger):
logger.setLevel(logging.DEBUG)
# turn on httplib debug
httplib.HTTPConnection.debuglevel = 1
else:
# if debug status is False, turn off debug logging,
# setting log level to default `logging.WARNING`
for _, logger in six.iteritems(self.logger):
logger.setLevel(logging.WARNING)
# turn off httplib debug
httplib.HTTPConnection.debuglevel = 0
@property
def logger_format(self):
"""The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
return self.__logger_format
@logger_format.setter
def logger_format(self, value):
"""The logger format.
The logger_formatter will be updated when sets logger_format.
:param value: The format string.
:type: str
"""
self.__logger_format = value
self.logger_formatter = logging.Formatter(self.__logger_format)
def get_api_key_with_prefix(self, identifier):
"""Gets API key (with prefix if set).
:param identifier: The identifier of apiKey.
:return: The token for api key authentication.
"""
if self.refresh_api_key_hook:
self.refresh_api_key_hook(self)
key = self.api_key.get(identifier)
if key:
prefix = self.api_key_prefix.get(identifier)
if prefix:
return "%s %s" % (prefix, key)
else:
return key
def get_basic_auth_token(self):
"""Gets HTTP basic authentication header (string).
:return: The token for basic HTTP authentication.
"""
token = ""
if self.username or self.password:
token = urllib3.util.make_headers(
basic_auth=self.username + ':' + self.password
).get('authorization')
return token
def auth_settings(self):
"""Gets Auth Settings dict for api client.
:return: The Auth Settings information dict.
"""
return {
{{#authMethods}}
{{#isApiKey}}
'{{name}}':
{
'type': 'api_key',
'in': {{#isKeyInHeader}}'header'{{/isKeyInHeader}}{{#isKeyInQuery}}'query'{{/isKeyInQuery}},
'key': '{{keyParamName}}',
'value': self.get_api_key_with_prefix('{{keyParamName}}')
},
{{/isApiKey}}
{{#isBasic}}
'{{name}}':
{
'type': 'basic',
'in': 'header',
'key': 'Authorization',
'value': self.get_basic_auth_token()
},
{{/isBasic}}{{#isOAuth}}
'{{name}}':
{
'type': 'oauth2',
'in': 'header',
'key': 'Authorization',
'value': 'Bearer ' + self.access_token
},
{{/isOAuth}}{{/authMethods}}
}
def to_debug_report(self):
"""Gets the essential information for debugging.
:return: The report for debugging.
"""
return "Python SDK Debug Report:\n"\
"OS: {env}\n"\
"Python Version: {pyversion}\n"\
"Version of the API: {{version}}\n"\
"SDK Package Version: {{packageVersion}}".\
format(env=sys.platform, pyversion=sys.version)

View File

@ -0,0 +1,63 @@
import os
import sys
import logging
import backoff
from requests.exceptions import HTTPError
#Update Path
sys.path.append('path_to_folder/SellingPartnerAPIAuthAndAuthPython/client/orders')
from swagger_client.configuration import Configuration
from swagger_client.api_client import ApiClient
#Update Path
sys.path.append('path_to_folder/SellingPartnerAPIAuthAndAuthPython')
from auth.LwaRequest import AccessTokenCache
logging.basicConfig(level=logging.INFO)
def is_rate_limit_error(e):
"""Check if the exception is a rate limit error (HTTP 429)."""
return isinstance(e, HTTPError) and e.response.status_code == 429
class SPAPIClient:
region_to_endpoint = {
"NA": "https://sellingpartnerapi-na.amazon.com",
"EU": "https://sellingpartnerapi-eu.amazon.com",
"FE": "https://sellingpartnerapi-fe.amazon.com",
"SANDBOX": "https://sandbox.sellingpartnerapi-na.amazon.com"
}
def __init__(self, config):
self.config = config
self.api_base_url = self.region_to_endpoint.get(config.region)
self.access_token_cache = AccessTokenCache()
self.api_client = None
self._initialize_client()
def _initialize_client(self):
logging.debug("Initializing API Client...")
access_token = self.access_token_cache.get_lwa_access_token(
client_id=self.config.client_id,
client_secret=self.config.client_secret,
refresh_token=self.config.refresh_token
)
configuration = Configuration()
configuration.host = self.api_base_url
configuration.access_token = access_token
self.api_client = ApiClient(configuration=configuration)
@backoff.on_exception(backoff.expo,
HTTPError,
giveup=is_rate_limit_error,
max_tries=5,
on_giveup=lambda e: logging.error(f"Too Many Retries: {e}"))
def get_api_client(self, api_name):
try:
module = __import__('swagger_client.api', fromlist=[api_name])
api_class = getattr(module, api_name)
return api_class(self.api_client)
except AttributeError:
raise Exception(f"API client for {api_name} not found.")

View File

@ -0,0 +1,27 @@
if __name__ == "__main__":
print("Starting the Script...")
from auth.credentials import SPAPIConfig
# User inputs their credentials in the config
config = SPAPIConfig(
client_id="Your Client-id",
client_secret="Your Client-secret",
refresh_token="Your Refresh-token",
region="SANDBOX", # Possible values NA, EU, FE, and SANDBOX
scope = None # Required for grant_type='client_credentials' ; Possible values "sellingpartnerapi::notifications" and "sellingpartnerapi::migration"
)
from spapi.spapiclient import SPAPIClient
# Create the API Client
print("Config and client initialized...")
api_client = SPAPIClient(config)
marketplace_ids = ["ATVPDKIKX0DER"]
created_after = "2024-01-19T12:34:56.789012"
orders_api = api_client.get_api_client('OrdersV0Api')
orders_response = orders_api.get_orders(marketplace_ids=marketplace_ids, created_after=created_after)
print("Orders API Response:")
print(orders_response)

View File

@ -0,0 +1,13 @@
import unittest
from auth.LwaException import LwaException
class TestLwaException(unittest.TestCase):
def test_exception_creation(self):
error = LwaException("invalid_client", "Invalid client ID")
self.assertEqual(str(error), "LWA Error - Code: invalid_client, Message: Invalid client ID")
# Add more test cases as needed
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,32 @@
import unittest
from unittest.mock import patch
from auth.LwaRequest import AccessTokenCache
from auth.LwaException import LwaException
class TestAccessTokenCache(unittest.TestCase):
@patch('auth.LwaRequest.requests.post')
def test_token_retrieval_success(self, mock_post):
# Mock a successful token response
mock_response = mock_post.return_value
mock_response.raise_for_status.side_effect = None
mock_response.json.return_value = {"access_token": "test_token", "expires_in": 3600}
token_cache = AccessTokenCache()
token = token_cache.get_lwa_access_token("client_id", "client_secret", "refresh_token")
self.assertEqual(token, "test_token")
@patch('auth.LwaRequest.requests.post')
def test_token_retrieval_failure(self, mock_post):
# Mock a failing token response
mock_post.side_effect = Exception("Network error")
token_cache = AccessTokenCache()
with self.assertRaises(LwaException):
token_cache.get_lwa_access_token("client_id", "client_secret", "refresh_token")
# Add more test cases as needed
if __name__ == '__main__':
unittest.main()