Skip to content

SQL Injection Prevention

This guide covers SQL injection attacks, prevention techniques, and secure coding practices across different programming languages and database systems.

🎯 What is SQL Injection?

SQL injection is a code injection technique that exploits security vulnerabilities in an application's software by inserting or "injecting" SQL statements via input fields.

How SQL Injection Works

-- Vulnerable query construction
SELECT * FROM users WHERE username = 'john' AND password = 'password'

-- Malicious input
username: admin' --
password: anything

-- Resulting query (bypasses authentication)
SELECT * FROM users WHERE username = 'admin' --' AND password = 'anything'

🚨 Common SQL Injection Scenarios

1. Authentication Bypass

// Vulnerable Java code
String query = "SELECT * FROM users WHERE username = '" + username + 
              "' AND password = '" + password + "'";
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(query);

// Malicious input
username: admin' OR '1'='1
password: anything

// Resulting query
SELECT * FROM users WHERE username = 'admin' OR '1'='1' AND password = 'anything'

2. Data Extraction

# Vulnerable Python code
def get_user_orders(user_id):
    query = f"SELECT * FROM orders WHERE user_id = {user_id}"
    cursor.execute(query)
    return cursor.fetchall()

# Malicious input
user_id: 1 UNION SELECT username, password, email FROM users

# Resulting query
SELECT * FROM orders WHERE user_id = 1 UNION SELECT username, password, email FROM users

3. Data Modification

// Vulnerable C code
void update_user_password(int user_id, const char* new_password) {
    char query[256];
    sprintf(query, "UPDATE users SET password = '%s' WHERE user_id = %d", 
            new_password, user_id);
    mysql_query(conn, query);
}

// Malicious input
new_password: '123' WHERE user_id = 1; --
user_id: 2

// Resulting query
UPDATE users SET password = '123' WHERE user_id = 1; -- WHERE user_id = 2

🛡️ Prevention Techniques

1. Parameterized Queries (Prepared Statements)

The most effective way to prevent SQL injection.

Java (JDBC)

// Good: Using PreparedStatement
public boolean authenticateUser(String username, String password) {
    String query = "SELECT * FROM users WHERE username = ? AND password = ?";

    try (PreparedStatement stmt = connection.prepareStatement(query)) {
        stmt.setString(1, username);
        stmt.setString(2, password);

        ResultSet rs = stmt.executeQuery();
        return rs.next(); // Returns true if user exists
    } catch (SQLException e) {
        logger.error("Authentication error", e);
        return false;
    }
}

// Better: With password hashing
public boolean authenticateUserSecure(String username, String password) {
    String query = "SELECT password_hash, salt FROM users WHERE username = ?";

    try (PreparedStatement stmt = connection.prepareStatement(query)) {
        stmt.setString(1, username);

        ResultSet rs = stmt.executeQuery();
        if (rs.next()) {
            String storedHash = rs.getString("password_hash");
            String salt = rs.getString("salt");
            String inputHash = hashPassword(password, salt);
            return storedHash.equals(inputHash);
        }
        return false;
    } catch (SQLException e) {
        logger.error("Authentication error", e);
        return false;
    }
}

Python (DB-API)

# Good: Using parameterized queries
import sqlite3
from contextlib import contextmanager

@contextmanager
def get_db_connection():
    conn = sqlite3.connect('database.db')
    try:
        yield conn
    finally:
        conn.close()

def authenticate_user(username, password):
    query = "SELECT * FROM users WHERE username = ? AND password = ?"

    try:
        with get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, (username, password))
            return cursor.fetchone() is not None
    except sqlite3.Error as e:
        logger.error(f"Database error: {e}")
        return False

# Better: With password hashing
import hashlib
import os

def authenticate_user_secure(username, password):
    query = "SELECT password_hash, salt FROM users WHERE username = ?"

    try:
        with get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, (username,))
            result = cursor.fetchone()

            if result:
                stored_hash, salt = result
                input_hash = hashlib.pbkdf2_hmac('sha256', 
                                               password.encode(), 
                                               salt.encode(), 
                                               100000).hex()
                return stored_hash == input_hash
            return False
    except sqlite3.Error as e:
        logger.error(f"Database error: {e}")
        return False

C (MySQL)

// Good: Using prepared statements
#include <mysql/mysql.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int authenticate_user(MYSQL* conn, const char* username, const char* password) {
    MYSQL_STMT* stmt;
    MYSQL_BIND bind[2];
    MYSQL_BIND result[1];
    int user_count = 0;

    // Prepare statement
    const char* query = "SELECT COUNT(*) FROM users WHERE username = ? AND password = ?";
    stmt = mysql_stmt_init(conn);

    if (mysql_stmt_prepare(stmt, query, strlen(query))) {
        fprintf(stderr, "Statement prepare failed: %s\n", mysql_stmt_error(stmt));
        return 0;
    }

    // Bind parameters
    memset(bind, 0, sizeof(bind));
    bind[0].buffer_type = MYSQL_TYPE_STRING;
    bind[0].buffer = (char*)username;
    bind[0].length = strlen(username);

    bind[1].buffer_type = MYSQL_TYPE_STRING;
    bind[1].buffer = (char*)password;
    bind[1].length = strlen(password);

    if (mysql_stmt_bind_param(stmt, bind)) {
        fprintf(stderr, "Parameter bind failed: %s\n", mysql_stmt_error(stmt));
        mysql_stmt_close(stmt);
        return 0;
    }

    // Bind result
    memset(result, 0, sizeof(result));
    result[0].buffer_type = MYSQL_TYPE_LONG;
    result[0].buffer = &user_count;
    result[0].length = sizeof(user_count);

    if (mysql_stmt_bind_result(stmt, result)) {
        fprintf(stderr, "Result bind failed: %s\n", mysql_stmt_error(stmt));
        mysql_stmt_close(stmt);
        return 0;
    }

    // Execute statement
    if (mysql_stmt_execute(stmt)) {
        fprintf(stderr, "Statement execute failed: %s\n", mysql_stmt_error(stmt));
        mysql_stmt_close(stmt);
        return 0;
    }

    // Fetch result
    mysql_stmt_fetch(stmt);
    mysql_stmt_close(stmt);

    return user_count > 0;
}

2. ORM Frameworks

Object-Relational Mapping frameworks provide built-in SQL injection protection.

Java (JPA/Hibernate)

// Good: Using JPA with parameter binding
@Entity
@Table(name = "users")
public class User {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String username;

    @Column(nullable = false)
    private String passwordHash;

    // getters and setters
}

@Repository
public interface UserRepository extends JpaRepository<User, Long> {
    Optional<User> findByUsername(String username);
}

@Service
public class AuthenticationService {
    @Autowired
    private UserRepository userRepository;

    public boolean authenticate(String username, String password) {
        Optional<User> userOpt = userRepository.findByUsername(username);

        if (userOpt.isPresent()) {
            User user = userOpt.get();
            String inputHash = hashPassword(password);
            return user.getPasswordHash().equals(inputHash);
        }
        return false;
    }
}

Python (SQLAlchemy)

# Good: Using SQLAlchemy ORM
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from werkzeug.security import generate_password_hash, check_password_hash

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True, nullable=False)
    password_hash = Column(String(120), nullable=False)

    def set_password(self, password):
        self.password_hash = generate_password_hash(password)

    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

class UserService:
    def __init__(self, database_url):
        self.engine = create_engine(database_url)
        Base.metadata.create_all(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.session = Session()

    def authenticate(self, username, password):
        user = self.session.query(User).filter_by(username=username).first()
        return user and user.check_password(password)

    def create_user(self, username, password):
        if self.session.query(User).filter_by(username=username).first():
            raise ValueError("Username already exists")

        user = User(username=username)
        user.set_password(password)
        self.session.add(user)
        self.session.commit()
        return user

3. Input Validation and Sanitization

Additional layer of protection.

Java Input Validation

import org.apache.commons.lang3.StringUtils;
import java.util.regex.Pattern;

public class InputValidator {
    private static final Pattern USERNAME_PATTERN = 
        Pattern.compile("^[a-zA-Z0-9_]{3,20}$");
    private static final Pattern PASSWORD_PATTERN = 
        Pattern.compile("^[a-zA-Z0-9!@#$%^&*]{8,50}$");

    public static boolean isValidUsername(String username) {
        return username != null && 
               USERNAME_PATTERN.matcher(username).matches() &&
               !StringUtils.containsAny(username, ";", "'", "\"", "--", "/*", "*/");
    }

    public static boolean isValidPassword(String password) {
        return password != null && 
               PASSWORD_PATTERN.matcher(password).matches() &&
               !StringUtils.containsAny(password, ";", "'", "\"", "--", "/*", "*/");
    }

    public static String sanitizeInput(String input) {
        if (input == null) {
            return "";
        }

        // Remove potentially dangerous characters
        return input.replaceAll("[';\"\\-\\*/]", "");
    }
}

@Service
public class SecureAuthService {
    @Autowired
    private UserRepository userRepository;

    public boolean authenticate(String username, String password) {
        // Validate input
        if (!InputValidator.isValidUsername(username) || 
            !InputValidator.isValidPassword(password)) {
            logger.warn("Invalid input format for username: {}", username);
            return false;
        }

        // Use parameterized query
        Optional<User> user = userRepository.findByUsername(username);
        return user.map(u -> u.checkPassword(password)).orElse(false);
    }
}

Python Input Validation

import re
from typing import Optional

class InputValidator:
    USERNAME_PATTERN = re.compile(r'^[a-zA-Z0-9_]{3,20}$')
    PASSWORD_PATTERN = re.compile(r'^[a-zA-Z0-9!@#$%^&*]{8,50}$')
    DANGEROUS_CHARS = r"[;'\"\\\-*/]"

    @staticmethod
    def is_valid_username(username: str) -> bool:
        if not username:
            return False

        if not InputValidator.USERNAME_PATTERN.match(username):
            return False

        if re.search(InputValidator.DANGEROUS_CHARS, username):
            return False

        return True

    @staticmethod
    def is_valid_password(password: str) -> bool:
        if not password:
            return False

        if not InputValidator.PASSWORD_PATTERN.match(password):
            return False

        if re.search(InputValidator.DANGEROUS_CHARS, password):
            return False

        return True

    @staticmethod
    def sanitize_input(input_str: Optional[str]) -> str:
        if not input_str:
            return ""

        # Remove potentially dangerous characters
        return re.sub(InputValidator.DANGEROUS_CHARS, "", input_str)

class SecureAuthService:
    def __init__(self, user_service: UserService):
        self.user_service = user_service

    def authenticate(self, username: str, password: str) -> bool:
        # Validate input
        if not InputValidator.is_valid_username(username):
            logger.warning(f"Invalid username format: {username}")
            return False

        if not InputValidator.is_valid_password(password):
            logger.warning("Invalid password format")
            return False

        # Use parameterized query
        return self.user_service.authenticate(username, password)

4. Stored Procedures

Using stored procedures with parameter binding.

Oracle PL/SQL

-- Secure stored procedure
CREATE OR REPLACE PROCEDURE authenticate_user(
    p_username IN VARCHAR2,
    p_password IN VARCHAR2,
    p_result OUT NUMBER
) AS
    v_count NUMBER;
BEGIN
    SELECT COUNT(*) INTO v_count
    FROM users
    WHERE username = p_username AND password_hash = p_password;

    p_result := v_count;

EXCEPTION
    WHEN OTHERS THEN
        p_result := 0;
        -- Log error for debugging
        INSERT INTO error_log (error_time, error_message)
        VALUES (SYSDATE, SQLERRM);
END;
/

Java calling stored procedure

public boolean authenticateWithStoredProcedure(String username, String password) {
    String call = "{call authenticate_user(?, ?, ?)}";

    try (CallableStatement stmt = connection.prepareCall(call)) {
        stmt.setString(1, username);
        stmt.setString(2, hashPassword(password));
        stmt.registerOutParameter(3, Types.INTEGER);

        stmt.execute();

        int result = stmt.getInt(3);
        return result > 0;
    } catch (SQLException e) {
        logger.error("Stored procedure error", e);
        return false;
    }
}

🔍 Detection and Testing

SQL Injection Testing

# SQL injection test cases
class SQLInjectionTests:
    def test_authentication_bypass(self):
        # Test cases for SQL injection
        test_cases = [
            ("admin' OR '1'='1", "password"),
            ("admin' --", "anything"),
            ("admin' /*", "password"),
            ("' OR '1'='1", "' OR '1'='1"),
            ("'; DROP TABLE users; --", "anything"),
            ("' UNION SELECT * FROM users --", "anything")
        ]

        for username, password in test_cases:
            result = self.auth_service.authenticate(username, password)
            assert not result, f"SQL injection vulnerability detected with: {username}"

    def test_data_extraction(self):
        # Test for data extraction attempts
        malicious_inputs = [
            "1 UNION SELECT username, password FROM users",
            "1 UNION SELECT table_name FROM information_schema.tables",
            "1' UNION SELECT column_name FROM information_schema.columns WHERE table_name='users' --"
        ]

        for input_value in malicious_inputs:
            try:
                result = self.get_user_orders(input_value)
                assert len(result) == 1, f"Data extraction vulnerability detected"
            except Exception:
                # Expected to fail for malicious input
                pass

Automated Security Scanning

# Using sqlmap for SQL injection testing
sqlmap -u "http://example.com/login" --data="username=admin&password=test" --dbs

# Using OWASP ZAP for security scanning
zap-baseline.py -t http://example.com

# Using bandit for Python security scanning
bandit -r /path/to/python/code

📋 Security Checklist

Development Checklist

## SQL Injection Prevention Checklist

### Input Validation
- [ ] All user inputs are validated before processing
- [ ] Input length limits are enforced
- [ ] Dangerous characters are filtered or escaped
- [ ] White-listing is used instead of black-listing

### Database Access
- [ ] Parameterized queries are used for all database operations
- [ ] Prepared statements are properly implemented
- [ ] ORM frameworks are used when appropriate
- [ ] Stored procedures use parameter binding

### Authentication & Authorization
- [ ] Passwords are properly hashed (bcrypt, PBKDF2, Argon2)
- [ ] Database connections use least privilege principle
- [ ] Error messages don't reveal database information
- [ ] Authentication failures are logged but not exposed

### Code Review
- [ ] All SQL queries are reviewed for injection vulnerabilities
- [ ] Dynamic SQL construction is avoided
- [ ] String concatenation in queries is prohibited
- [ ] Security testing is included in test suite

Deployment Checklist

## Production Security Checklist

### Database Configuration
- [ ] Database user has minimal required permissions
- [ ] Database connections are encrypted
- [ ] Database error messages are generic
- [ ] Database access is logged and monitored

### Application Security
- [ ] Web Application Firewall (WAF) is configured
- [ ] Security headers are properly set
- [ ] Input validation is enforced at application level
- [ ] Regular security scans are performed

### Monitoring & Response
- [ ] SQL injection attempts are logged
- [ ] Security alerts are configured
- [ ] Incident response plan is in place
- [ ] Regular security updates are applied

🔗 Language-Specific Security

🔗 Security Resources