Skip to main content

AWS S3 Auto-Purge Solution Documentation

  • January 22, 2026
  • 0 replies
  • 2 views

Kmaheshwari
Netskope Employee
Forum|alt.badge.img+4

1. Overview

This solution implements a "read-once" or "time-limited" file storage system on AWS S3. It automatically deletes files from a specific S3 bucket 30 seconds after they are downloaded (accessed via the GetObject API).

Architecture Flow

  1. Connector Activity: Connector downloads a file from S3 (GetObject).
  2. Logging: AWS CloudTrail captures this specific Data Event.
  3. Trigger: Amazon EventBridge detects the GetObject event in the CloudTrail logs and triggers a Lambda function.
  4. Processing: The Lambda function waits for a configurable buffer period (30 seconds).
  5. Action: The Lambda issues a DeleteObject command to permanently remove the file.

2. Prerequisites (Critical)

By default, CloudTrail does not log S3 Data Events (file-level access). You must enable this for the automation to work.

How to Enable CloudTrail for S3 Data Events

  1. Navigate: Go to the AWS Console > CloudTrail > Trails.
  2. Select Trail: Open your existing management trail. (Create one if none exists).
  3. Edit Data Events: Scroll to the Data events section and click Edit.
  4. Add Event Type:
    • Data event type: Select S3.
    • Log selector template: Select Custom.
  5. Configure Selectors (To control costs):
    • Warning: Do not select "All buckets" unless you accept high costs.
    • Under Advanced event selectors:
      1. Field: resources.ARN | Operator: startsWith | Value: arn:aws:s3:::testpurgenetskope/
      2. Field: eventName | Operator: equals | Value: GetObject
  6. Save: Click Save changes.
    • Note: CloudTrail may take 5–15 minutes to start delivering logs.

3. Deployment Guide

The entire solution (IAM Roles, Lambda Function, and EventBridge Trigger) is defined in a single AWS CloudFormation template.

3.1 CloudFormation Template

Save the following code as purge-s3-delay-stack.yaml

AWSTemplateFormatVersion: '2010-09-09'

Description: 'Deploys a Lambda to delete S3 objects 30 seconds after access (GetObject).'



Parameters:

  S3BucketName:

    Type: String

    Default: 'testpurgenetskope'

    Description: The name of the S3 bucket to monitor.



Resources:

  # ------------------------------------------------------------------

  # 1. IAM Role: Permissions for Logging and S3 Deletion

  # ------------------------------------------------------------------

  LambdaExecutionRole:

    Type: AWS::IAM::Role

    Properties:

      AssumeRolePolicyDocument:

        Version: '2012-10-17'

        Statement:

          - Effect: Allow

            Principal:

              Service: lambda.amazonaws.com

            Action: sts:AssumeRole

      Policies:

        - PolicyName: S3DeleteAccess

          PolicyDocument:

            Version: '2012-10-17'

            Statement:

              - Effect: Allow

                Action: s3:DeleteObject

                Resource: !Sub 'arn:aws:s3:::${S3BucketName}/*'

      ManagedPolicyArns:

        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole



  # ------------------------------------------------------------------

  # 2. Lambda Function: Logic (Wait 30s -> Delete)

  # ------------------------------------------------------------------

  PurgeLambdaFunction:

    Type: AWS::Lambda::Function

    Properties:

      FunctionName: !Sub 's3-purge-handler-${S3BucketName}'

      Handler: index.lambda_handler

      Role: !GetAtt LambdaExecutionRole.Arn

      Runtime: python3.9

      Timeout: 60  # Timeout set to 60s to handle the 30s sleep

      Code:

        ZipFile: |

          import json

          import boto3

          import time

          import urllib.parse

          from botocore.exceptions import ClientError



          s3 = boto3.client('s3')



          def lambda_handler(event, context):

              print("Received event: " + json.dumps(event))



              try:

                  # Extract event details from EventBridge payload

                  record = event.get('detail', event)



                  if 'requestParameters' not in record:

                      print("Error: 'requestParameters' not found.")

                      return {'statusCode': 400, 'body': "Invalid event structure"}



                  bucket_name = record['requestParameters']['bucketName']

                  object_key = record['requestParameters']['key']

                  

                  # Decode URL (e.g., "My%20File.txt" -> "My File.txt")

                  object_key = urllib.parse.unquote_plus(object_key)



                  print(f"Trigger received for: {object_key}. Waiting 30 seconds...")

                  

                  # ---------------------------------------------------

                  # WAITING PERIOD

                  # ---------------------------------------------------

                  time.sleep(30)



                  print(f"Time up. Deleting: {object_key} from {bucket_name}")

                  

                  # Perform Deletion

                  s3.delete_object(Bucket=bucket_name, Key=object_key)



                  print(f"Success: Object deleted.")

                  return {

                      'statusCode': 200,

                      'body': json.dumps(f"Deleted {object_key}")

                  }



              except ClientError as e:

                  print(f"AWS Error: {e}")

                  raise e

              except Exception as e:

                  print(f"Unexpected Error: {e}")

                  raise e



  # ------------------------------------------------------------------

  # 3. Asynchronous Invocation Config

  # ------------------------------------------------------------------

  AsyncConfig:

    Type: AWS::Lambda::EventInvokeConfig

    Properties:

      FunctionName: !Ref PurgeLambdaFunction

      MaximumEventAgeInSeconds: 300

      MaximumRetryAttempts: 2

      Qualifier: $LATEST



  # ------------------------------------------------------------------

  # 4. EventBridge Rule: Filter for GetObject on Specific Bucket

  # ------------------------------------------------------------------

  S3AccessRule:

    Type: AWS::Events::Rule

    Properties:

      Description: !Sub 'Trigger Lambda when objects are accessed in ${S3BucketName}'

      State: ENABLED

      EventPattern:

        source:

          - aws.s3

        detail-type:

          - 'AWS API Call via CloudTrail'

        detail:

          eventSource:

            - s3.amazonaws.com

          eventName:

            - GetObject

          requestParameters:

            bucketName:

              - !Ref S3BucketName

      Targets:

        - Arn: !GetAtt PurgeLambdaFunction.Arn

          Id: 'PurgeLambdaTarget'



  # ------------------------------------------------------------------

  # 5. Lambda Permission: Allow EventBridge to Invoke

  # ------------------------------------------------------------------

  LambdaInvokePermission:

    Type: AWS::Lambda::Permission

    Properties:

      FunctionName: !Ref PurgeLambdaFunction

      Action: lambda:InvokeFunction

      Principal: events.amazonaws.com

      SourceArn: !GetAtt S3AccessRule.Arn


 

4. Testing & Verification

Method A: End-to-End Test

  1. Use the DatagenerationScript to upload to your specific bucket. It will replicate the same as Netskope Log Streaming scenario
  2. Set the Parameters in the Script according to your requirement.
  3. Provide the Samplefile provided with the documentation to replicate Netskope Log Streaming Scenario
  4. Wait: Wait approximately 15 minutes.
    • Note: CloudTrail logs are not instant. It takes time for the log to reach EventBridge.
  5. Verify: Refresh the bucket. The file should disappear.
import os

import time

import gzip

import logging

import boto3

from botocore.exceptions import BotoCoreError, ClientError

from concurrent.futures import ProcessPoolExecutor, wait

# ───── CONFIG ────────────────────────────────────────────────────────────────

INPUT_CSV     = '/home/ubuntu/Samplewebtx.csv'   #File Provided with the documentation to use

DATE_COL      = 'date'

TIME_COL      = 'time'

REPEAT_COUNT  = 3_2         # repeat each row

INTERVAL_SEC  = 15            # seconds between launches

WORKERS       = 2             # number of alternating workers

GZIP_LEVEL    = 1             # 1 = fastest compression

OUTPUT_DIR    = '/home/ubuntu/testwebtxfiles'

os.makedirs(OUTPUT_DIR, exist_ok=True)

# S3 destination (must exist, and your AWS creds must allow PutObject)

S3_BUCKET     = 'testpurgenetskope'

S3_PREFIX     = 'webtxstream/' 

# Total number of files to generate & upload before stopping:

TOTAL_FILES   = 2

# ────────────────────────────────────────────────────────────────────────────────

logging.basicConfig(

    level=logging.INFO,

    format='[%(asctime)s] %(levelname)s: %(message)s',

    datefmt='%H:%M:%S'

)

def load_source():

    """Load header and all rows from a space-delimited CSV."""

    with open(INPUT_CSV, 'r', encoding='utf-8') as f:

        header = f.readline().strip().split()

        if DATE_COL not in header or TIME_COL not in header:

            raise RuntimeError(f"Missing '{DATE_COL}' or '{TIME_COL}' in header: {header}")

        date_idx = header.index(DATE_COL)

        time_idx = header.index(TIME_COL)

        rows = [line.strip().split() for line in f if line.strip()]

    logging.info("Loaded %d rows from %s", len(rows), INPUT_CSV)

    return header, date_idx, time_idx, rows

def generate_and_upload(epoch: str, worker_id: int, header, date_idx, time_idx, rows):

    """Repeat, stamp, gzip, upload to S3, then delete the local file."""

    fname = f"WebTX_{epoch}_{worker_id}.csv.gz"

    out_path = os.path.join(OUTPUT_DIR, fname)

    s3_key = f"{S3_PREFIX.rstrip('/')}/{fname}"

    date_str = time.strftime('%Y-%m-%d')

    time_str = time.strftime('%H:%M:%S')

    # 1) Stream data into gzip

    with gzip.open(out_path, 'wt', encoding='utf-8', newline='', compresslevel=GZIP_LEVEL) as gz:

        gz.write(' '.join(header) + '\n')

        for _ in range(REPEAT_COUNT):

            for rec in rows:

                rec[date_idx] = date_str

                rec[time_idx] = time_str

                gz.write(' '.join(rec) + '\n')

    size_mb = os.path.getsize(out_path) / (1024*1024)

    logging.info("Worker %d: created %s (%.1f MB)", worker_id, fname, size_mb)

    # 2) Upload to S3 using low-level PutObject

    # We must read the file in binary mode ('rb') to pass it as the Body

    try:

        s3_client = boto3.client('s3')

        with open(out_path, 'rb') as f_obj:

            s3_client.put_object(

                Bucket=S3_BUCKET,

                Key=s3_key,

                Body=f_obj,

                ContentType='application/gzip'

            )

        logging.info("Worker %d: PutObject success s3://%s/%s", worker_id, S3_BUCKET, s3_key)

        

    except (BotoCoreError, ClientError) as e:

        logging.error("Worker %d: PutObject failed: %s", worker_id, e)

        return

    # 3) Cleanup

    try:

        os.remove(out_path)

        logging.info("Worker %d: removed local file", worker_id)

    except OSError as e:

        logging.warning("Worker %d: could not delete %s: %s", worker_id, out_path, e)

def main():

    header, date_idx, time_idx, rows = load_source()

    logging.info("Starting generation of %d files, 1 launch every %ds", TOTAL_FILES, INTERVAL_SEC)

    with ProcessPoolExecutor(max_workers=WORKERS) as executor:

        worker_id = 1

        futures = []

        for count in range(1, TOTAL_FILES + 1):

            epoch = str(int(time.time()))

            fut = executor.submit(

                generate_and_upload,

                epoch, worker_id,

                header, date_idx, time_idx, rows

            )

            futures.append(fut)

            logging.info("Launched worker %d for file %d at %s", worker_id, count, time.strftime('%H:%M:%S'))

            # alternate worker

            worker_id = 2 if worker_id == 1 else 1

            # wait before next launch

            time.sleep(INTERVAL_SEC)

        # wait for all in-flight tasks to complete before exit

        wait(futures)

    logging.info("Completed generation and upload of %d files. Exiting.", TOTAL_FILES)

if __name__ == '__main__':

    main()


 

Method B: Simulated Test (Instant)

Use this JSON payload to test the Lambda function directly in the AWS Console.

 

5. Troubleshooting

Symptom

Probable Cause

Fix

Lambda is never triggered

CloudTrail Data Events are disabled.

Follow the Prerequisites section to enable S3 logging for GetObject.

Permission Denied Error

IAM Role missing permissions.

Ensure the S3BucketName parameter in CloudFormation matches your actual bucket.

File deletes instantly

Sleep timer missing.

Verify time.sleep(30) is present in the Lambda code.

Delay > 15 mins

CloudTrail Latency.

This is normal behavior for CloudTrail-based triggers.