from django.conf import settings
from payments.providers.base import ProviderBase
from typing import Optional, Iterable, Tuple, Dict
from django.conf import settings
from django.db.models import Q
from django.http import HttpRequest
from payments.const import Status, Result, RefundResult
from payments.models import Transaction, Frame
from payments.dto import Purchase
from payments.schema import PurchaseSchema
import stripe
from dateutil.relativedelta import relativedelta
import datetime
class StripeProvider(ProviderBase):
id = "stripe"
def __init__(self):
super().__init__()
self.api_key = settings.STRIPE_SECRET_KEY
def setup(self, transaction: Transaction, purchase: Purchase, request: HttpRequest) -> str:
payment_method= request.POST.get('payment_method')
if payment_method in ["googlepay", "applepay"]:
payment_method = "card"
request_payload = {'expires_at':int((datetime.datetime.now() + relativedelta(minutes=60)).timestamp()),
'payment_method_types': [payment_method],
'locale': purchase.user_details.language_code,
'payment_intent_data':{'metadata':{'order_number':purchase.merchant_reference}},
'line_items':[{
'price_data': {
'currency': 'eur',
'product_data': {
'name': purchase.description
},
'unit_amount': int(purchase.amount * 100),
},
'quantity': 1,
}],
'mode': 'payment',
'success_url': self.callback_url(token=transaction.token),
'cancel_url': self.callback_url(action='cancel', token=transaction.token)}
stripe_session = stripe.checkout.Session.create(**request_payload, api_key=self.api_key)
transaction.provider_reference = stripe_session.id
transaction.status = Status.PENDING
transaction.save()
Frame.objects.create(transaction=transaction, request=request_payload, response=stripe_session.to_dict_recursive())
return stripe_session.url
def process(self, transaction: Transaction, request: HttpRequest) -> Result:
return self.query(transaction)
def query(self, transaction):
session = stripe.checkout.Session.retrieve(transaction.provider_reference, api_key=self.api_key)
"""
open
The checkout session is still in progress. Payment processing has not started
complete
The checkout session is complete. Payment processing may still be in progress
expired
The checkout session has expired. No further processing will occur
"""
Frame.objects.create(transaction=transaction, request=None, response=session.to_dict_recursive())
result = Result.UNFINISHED
if session.status =='complete':
if session.payment_status == 'paid':
transaction.status = Status.CONFIRMED
result = Result.SUCCESS
else:
transaction.status = Status.REJECTED
result = Result.FAILURE
elif session.status == 'expired':
transaction.status = Status.TIMEOUT
result = Result.FAILURE
transaction.save()
return result
def check_pending_transactions(self) -> Iterable[Tuple[Result, Transaction]]:
for transaction in Transaction.objects.filter(
Q(provider=self.id, status=Status.PENDING)
):
yield self.query(transaction), transaction
class StripeProviderLT(StripeProvider):
id = "stripe_lt"
def __init__(self):
super().__init__()
self.api_key = settings.STRIPE_SECRET_KEY_LT