1. Overview
This solution implements a "read-once" or "time-limited" file storage system on AWS S3. It automatically deletes files from a specific S3 bucket 30 seconds after they are downloaded (accessed via the GetObject API).
Architecture Flow
- Connector Activity: Connector downloads a file from S3 (GetObject).
- Logging: AWS CloudTrail captures this specific Data Event.
- Trigger: Amazon EventBridge detects the GetObject event in the CloudTrail logs and triggers a Lambda function.
- Processing: The Lambda function waits for a configurable buffer period (30 seconds).
- Action: The Lambda issues a DeleteObject command to permanently remove the file.
2. Prerequisites (Critical)
By default, CloudTrail does not log S3 Data Events (file-level access). You must enable this for the automation to work.
How to Enable CloudTrail for S3 Data Events
- Navigate: Go to the AWS Console > CloudTrail > Trails.
- Select Trail: Open your existing management trail. (Create one if none exists).
- Edit Data Events: Scroll to the Data events section and click Edit.
- Add Event Type:
- Data event type: Select S3.
- Log selector template: Select Custom.
- Configure Selectors (To control costs):
- Warning: Do not select "All buckets" unless you accept high costs.
- Under Advanced event selectors:
- Field: resources.ARN | Operator: startsWith | Value: arn:aws:s3:::testpurgenetskope/
- Field: eventName | Operator: equals | Value: GetObject
- Save: Click Save changes.
- Note: CloudTrail may take 5–15 minutes to start delivering logs.
3. Deployment Guide
The entire solution (IAM Roles, Lambda Function, and EventBridge Trigger) is defined in a single AWS CloudFormation template.
3.1 CloudFormation Template
Save the following code as purge-s3-delay-stack.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Deploys a Lambda to delete S3 objects 30 seconds after access (GetObject).'
Parameters:
S3BucketName:
Type: String
Default: 'testpurgenetskope'
Description: The name of the S3 bucket to monitor.
Resources:
# ------------------------------------------------------------------
# 1. IAM Role: Permissions for Logging and S3 Deletion
# ------------------------------------------------------------------
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: S3DeleteAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: s3:DeleteObject
Resource: !Sub 'arn:aws:s3:::${S3BucketName}/*'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
# ------------------------------------------------------------------
# 2. Lambda Function: Logic (Wait 30s -> Delete)
# ------------------------------------------------------------------
PurgeLambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub 's3-purge-handler-${S3BucketName}'
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Runtime: python3.9
Timeout: 60 # Timeout set to 60s to handle the 30s sleep
Code:
ZipFile: |
import json
import boto3
import time
import urllib.parse
from botocore.exceptions import ClientError
s3 = boto3.client('s3')
def lambda_handler(event, context):
print("Received event: " + json.dumps(event))
try:
# Extract event details from EventBridge payload
record = event.get('detail', event)
if 'requestParameters' not in record:
print("Error: 'requestParameters' not found.")
return {'statusCode': 400, 'body': "Invalid event structure"}
bucket_name = record['requestParameters']['bucketName']
object_key = record['requestParameters']['key']
# Decode URL (e.g., "My%20File.txt" -> "My File.txt")
object_key = urllib.parse.unquote_plus(object_key)
print(f"Trigger received for: {object_key}. Waiting 30 seconds...")
# ---------------------------------------------------
# WAITING PERIOD
# ---------------------------------------------------
time.sleep(30)
print(f"Time up. Deleting: {object_key} from {bucket_name}")
# Perform Deletion
s3.delete_object(Bucket=bucket_name, Key=object_key)
print(f"Success: Object deleted.")
return {
'statusCode': 200,
'body': json.dumps(f"Deleted {object_key}")
}
except ClientError as e:
print(f"AWS Error: {e}")
raise e
except Exception as e:
print(f"Unexpected Error: {e}")
raise e
# ------------------------------------------------------------------
# 3. Asynchronous Invocation Config
# ------------------------------------------------------------------
AsyncConfig:
Type: AWS::Lambda::EventInvokeConfig
Properties:
FunctionName: !Ref PurgeLambdaFunction
MaximumEventAgeInSeconds: 300
MaximumRetryAttempts: 2
Qualifier: $LATEST
# ------------------------------------------------------------------
# 4. EventBridge Rule: Filter for GetObject on Specific Bucket
# ------------------------------------------------------------------
S3AccessRule:
Type: AWS::Events::Rule
Properties:
Description: !Sub 'Trigger Lambda when objects are accessed in ${S3BucketName}'
State: ENABLED
EventPattern:
source:
- aws.s3
detail-type:
- 'AWS API Call via CloudTrail'
detail:
eventSource:
- s3.amazonaws.com
eventName:
- GetObject
requestParameters:
bucketName:
- !Ref S3BucketName
Targets:
- Arn: !GetAtt PurgeLambdaFunction.Arn
Id: 'PurgeLambdaTarget'
# ------------------------------------------------------------------
# 5. Lambda Permission: Allow EventBridge to Invoke
# ------------------------------------------------------------------
LambdaInvokePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref PurgeLambdaFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt S3AccessRule.Arn
4. Testing & Verification
Method A: End-to-End Test
- Use the DatagenerationScript to upload to your specific bucket. It will replicate the same as Netskope Log Streaming scenario
- Set the Parameters in the Script according to your requirement.
- Provide the Samplefile provided with the documentation to replicate Netskope Log Streaming Scenario
- Wait: Wait approximately 15 minutes.
- Note: CloudTrail logs are not instant. It takes time for the log to reach EventBridge.
- Verify: Refresh the bucket. The file should disappear.
import os
import time
import gzip
import logging
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from concurrent.futures import ProcessPoolExecutor, wait
# ───── CONFIG ────────────────────────────────────────────────────────────────
INPUT_CSV = '/home/ubuntu/Samplewebtx.csv' #File Provided with the documentation to use
DATE_COL = 'date'
TIME_COL = 'time'
REPEAT_COUNT = 3_2 # repeat each row
INTERVAL_SEC = 15 # seconds between launches
WORKERS = 2 # number of alternating workers
GZIP_LEVEL = 1 # 1 = fastest compression
OUTPUT_DIR = '/home/ubuntu/testwebtxfiles'
os.makedirs(OUTPUT_DIR, exist_ok=True)
# S3 destination (must exist, and your AWS creds must allow PutObject)
S3_BUCKET = 'testpurgenetskope'
S3_PREFIX = 'webtxstream/'
# Total number of files to generate & upload before stopping:
TOTAL_FILES = 2
# ────────────────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%H:%M:%S'
)
def load_source():
"""Load header and all rows from a space-delimited CSV."""
with open(INPUT_CSV, 'r', encoding='utf-8') as f:
header = f.readline().strip().split()
if DATE_COL not in header or TIME_COL not in header:
raise RuntimeError(f"Missing '{DATE_COL}' or '{TIME_COL}' in header: {header}")
date_idx = header.index(DATE_COL)
time_idx = header.index(TIME_COL)
rows = [line.strip().split() for line in f if line.strip()]
logging.info("Loaded %d rows from %s", len(rows), INPUT_CSV)
return header, date_idx, time_idx, rows
def generate_and_upload(epoch: str, worker_id: int, header, date_idx, time_idx, rows):
"""Repeat, stamp, gzip, upload to S3, then delete the local file."""
fname = f"WebTX_{epoch}_{worker_id}.csv.gz"
out_path = os.path.join(OUTPUT_DIR, fname)
s3_key = f"{S3_PREFIX.rstrip('/')}/{fname}"
date_str = time.strftime('%Y-%m-%d')
time_str = time.strftime('%H:%M:%S')
# 1) Stream data into gzip
with gzip.open(out_path, 'wt', encoding='utf-8', newline='', compresslevel=GZIP_LEVEL) as gz:
gz.write(' '.join(header) + '\n')
for _ in range(REPEAT_COUNT):
for rec in rows:
rec[date_idx] = date_str
rec[time_idx] = time_str
gz.write(' '.join(rec) + '\n')
size_mb = os.path.getsize(out_path) / (1024*1024)
logging.info("Worker %d: created %s (%.1f MB)", worker_id, fname, size_mb)
# 2) Upload to S3 using low-level PutObject
# We must read the file in binary mode ('rb') to pass it as the Body
try:
s3_client = boto3.client('s3')
with open(out_path, 'rb') as f_obj:
s3_client.put_object(
Bucket=S3_BUCKET,
Key=s3_key,
Body=f_obj,
ContentType='application/gzip'
)
logging.info("Worker %d: PutObject success s3://%s/%s", worker_id, S3_BUCKET, s3_key)
except (BotoCoreError, ClientError) as e:
logging.error("Worker %d: PutObject failed: %s", worker_id, e)
return
# 3) Cleanup
try:
os.remove(out_path)
logging.info("Worker %d: removed local file", worker_id)
except OSError as e:
logging.warning("Worker %d: could not delete %s: %s", worker_id, out_path, e)
def main():
header, date_idx, time_idx, rows = load_source()
logging.info("Starting generation of %d files, 1 launch every %ds", TOTAL_FILES, INTERVAL_SEC)
with ProcessPoolExecutor(max_workers=WORKERS) as executor:
worker_id = 1
futures = []
for count in range(1, TOTAL_FILES + 1):
epoch = str(int(time.time()))
fut = executor.submit(
generate_and_upload,
epoch, worker_id,
header, date_idx, time_idx, rows
)
futures.append(fut)
logging.info("Launched worker %d for file %d at %s", worker_id, count, time.strftime('%H:%M:%S'))
# alternate worker
worker_id = 2 if worker_id == 1 else 1
# wait before next launch
time.sleep(INTERVAL_SEC)
# wait for all in-flight tasks to complete before exit
wait(futures)
logging.info("Completed generation and upload of %d files. Exiting.", TOTAL_FILES)
if __name__ == '__main__':
main()
Method B: Simulated Test (Instant)
Use this JSON payload to test the Lambda function directly in the AWS Console.
5. Troubleshooting
| Symptom | Probable Cause | Fix |
| Lambda is never triggered | CloudTrail Data Events are disabled. | Follow the Prerequisites section to enable S3 logging for GetObject. |
| Permission Denied Error | IAM Role missing permissions. | Ensure the S3BucketName parameter in CloudFormation matches your actual bucket. |
| File deletes instantly | Sleep timer missing. | Verify time.sleep(30) is present in the Lambda code. |
| Delay > 15 mins | CloudTrail Latency. | This is normal behavior for CloudTrail-based triggers. |




