SQL Injection Prevention¶
This guide covers SQL injection attacks, prevention techniques, and secure coding practices across different programming languages and database systems.
🎯 What is SQL Injection?¶
SQL injection is a code injection technique that exploits security vulnerabilities in an application's software by inserting or "injecting" SQL statements via input fields.
How SQL Injection Works¶
-- Vulnerable query construction
SELECT * FROM users WHERE username = 'john' AND password = 'password'
-- Malicious input
username: admin' --
password: anything
-- Resulting query (bypasses authentication)
SELECT * FROM users WHERE username = 'admin' --' AND password = 'anything'
🚨 Common SQL Injection Scenarios¶
1. Authentication Bypass¶
// Vulnerable Java code
String query = "SELECT * FROM users WHERE username = '" + username +
"' AND password = '" + password + "'";
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(query);
// Malicious input
username: admin' OR '1'='1
password: anything
// Resulting query
SELECT * FROM users WHERE username = 'admin' OR '1'='1' AND password = 'anything'
2. Data Extraction¶
# Vulnerable Python code
def get_user_orders(user_id):
query = f"SELECT * FROM orders WHERE user_id = {user_id}"
cursor.execute(query)
return cursor.fetchall()
# Malicious input
user_id: 1 UNION SELECT username, password, email FROM users
# Resulting query
SELECT * FROM orders WHERE user_id = 1 UNION SELECT username, password, email FROM users
3. Data Modification¶
// Vulnerable C code
void update_user_password(int user_id, const char* new_password) {
char query[256];
sprintf(query, "UPDATE users SET password = '%s' WHERE user_id = %d",
new_password, user_id);
mysql_query(conn, query);
}
// Malicious input
new_password: '123' WHERE user_id = 1; --
user_id: 2
// Resulting query
UPDATE users SET password = '123' WHERE user_id = 1; -- WHERE user_id = 2
🛡️ Prevention Techniques¶
1. Parameterized Queries (Prepared Statements)¶
The most effective way to prevent SQL injection.
Java (JDBC)¶
// Good: Using PreparedStatement
public boolean authenticateUser(String username, String password) {
String query = "SELECT * FROM users WHERE username = ? AND password = ?";
try (PreparedStatement stmt = connection.prepareStatement(query)) {
stmt.setString(1, username);
stmt.setString(2, password);
ResultSet rs = stmt.executeQuery();
return rs.next(); // Returns true if user exists
} catch (SQLException e) {
logger.error("Authentication error", e);
return false;
}
}
// Better: With password hashing
public boolean authenticateUserSecure(String username, String password) {
String query = "SELECT password_hash, salt FROM users WHERE username = ?";
try (PreparedStatement stmt = connection.prepareStatement(query)) {
stmt.setString(1, username);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
String storedHash = rs.getString("password_hash");
String salt = rs.getString("salt");
String inputHash = hashPassword(password, salt);
return storedHash.equals(inputHash);
}
return false;
} catch (SQLException e) {
logger.error("Authentication error", e);
return false;
}
}
Python (DB-API)¶
# Good: Using parameterized queries
import sqlite3
from contextlib import contextmanager
@contextmanager
def get_db_connection():
conn = sqlite3.connect('database.db')
try:
yield conn
finally:
conn.close()
def authenticate_user(username, password):
query = "SELECT * FROM users WHERE username = ? AND password = ?"
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, (username, password))
return cursor.fetchone() is not None
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return False
# Better: With password hashing
import hashlib
import os
def authenticate_user_secure(username, password):
query = "SELECT password_hash, salt FROM users WHERE username = ?"
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, (username,))
result = cursor.fetchone()
if result:
stored_hash, salt = result
input_hash = hashlib.pbkdf2_hmac('sha256',
password.encode(),
salt.encode(),
100000).hex()
return stored_hash == input_hash
return False
except sqlite3.Error as e:
logger.error(f"Database error: {e}")
return False
C (MySQL)¶
// Good: Using prepared statements
#include <mysql/mysql.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int authenticate_user(MYSQL* conn, const char* username, const char* password) {
MYSQL_STMT* stmt;
MYSQL_BIND bind[2];
MYSQL_BIND result[1];
int user_count = 0;
// Prepare statement
const char* query = "SELECT COUNT(*) FROM users WHERE username = ? AND password = ?";
stmt = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt, query, strlen(query))) {
fprintf(stderr, "Statement prepare failed: %s\n", mysql_stmt_error(stmt));
return 0;
}
// Bind parameters
memset(bind, 0, sizeof(bind));
bind[0].buffer_type = MYSQL_TYPE_STRING;
bind[0].buffer = (char*)username;
bind[0].length = strlen(username);
bind[1].buffer_type = MYSQL_TYPE_STRING;
bind[1].buffer = (char*)password;
bind[1].length = strlen(password);
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, "Parameter bind failed: %s\n", mysql_stmt_error(stmt));
mysql_stmt_close(stmt);
return 0;
}
// Bind result
memset(result, 0, sizeof(result));
result[0].buffer_type = MYSQL_TYPE_LONG;
result[0].buffer = &user_count;
result[0].length = sizeof(user_count);
if (mysql_stmt_bind_result(stmt, result)) {
fprintf(stderr, "Result bind failed: %s\n", mysql_stmt_error(stmt));
mysql_stmt_close(stmt);
return 0;
}
// Execute statement
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, "Statement execute failed: %s\n", mysql_stmt_error(stmt));
mysql_stmt_close(stmt);
return 0;
}
// Fetch result
mysql_stmt_fetch(stmt);
mysql_stmt_close(stmt);
return user_count > 0;
}
2. ORM Frameworks¶
Object-Relational Mapping frameworks provide built-in SQL injection protection.
Java (JPA/Hibernate)¶
// Good: Using JPA with parameter binding
@Entity
@Table(name = "users")
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String username;
@Column(nullable = false)
private String passwordHash;
// getters and setters
}
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
Optional<User> findByUsername(String username);
}
@Service
public class AuthenticationService {
@Autowired
private UserRepository userRepository;
public boolean authenticate(String username, String password) {
Optional<User> userOpt = userRepository.findByUsername(username);
if (userOpt.isPresent()) {
User user = userOpt.get();
String inputHash = hashPassword(password);
return user.getPasswordHash().equals(inputHash);
}
return false;
}
}
Python (SQLAlchemy)¶
# Good: Using SQLAlchemy ORM
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from werkzeug.security import generate_password_hash, check_password_hash
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(80), unique=True, nullable=False)
password_hash = Column(String(120), nullable=False)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class UserService:
def __init__(self, database_url):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
def authenticate(self, username, password):
user = self.session.query(User).filter_by(username=username).first()
return user and user.check_password(password)
def create_user(self, username, password):
if self.session.query(User).filter_by(username=username).first():
raise ValueError("Username already exists")
user = User(username=username)
user.set_password(password)
self.session.add(user)
self.session.commit()
return user
3. Input Validation and Sanitization¶
Additional layer of protection.
Java Input Validation¶
import org.apache.commons.lang3.StringUtils;
import java.util.regex.Pattern;
public class InputValidator {
private static final Pattern USERNAME_PATTERN =
Pattern.compile("^[a-zA-Z0-9_]{3,20}$");
private static final Pattern PASSWORD_PATTERN =
Pattern.compile("^[a-zA-Z0-9!@#$%^&*]{8,50}$");
public static boolean isValidUsername(String username) {
return username != null &&
USERNAME_PATTERN.matcher(username).matches() &&
!StringUtils.containsAny(username, ";", "'", "\"", "--", "/*", "*/");
}
public static boolean isValidPassword(String password) {
return password != null &&
PASSWORD_PATTERN.matcher(password).matches() &&
!StringUtils.containsAny(password, ";", "'", "\"", "--", "/*", "*/");
}
public static String sanitizeInput(String input) {
if (input == null) {
return "";
}
// Remove potentially dangerous characters
return input.replaceAll("[';\"\\-\\*/]", "");
}
}
@Service
public class SecureAuthService {
@Autowired
private UserRepository userRepository;
public boolean authenticate(String username, String password) {
// Validate input
if (!InputValidator.isValidUsername(username) ||
!InputValidator.isValidPassword(password)) {
logger.warn("Invalid input format for username: {}", username);
return false;
}
// Use parameterized query
Optional<User> user = userRepository.findByUsername(username);
return user.map(u -> u.checkPassword(password)).orElse(false);
}
}
Python Input Validation¶
import re
from typing import Optional
class InputValidator:
USERNAME_PATTERN = re.compile(r'^[a-zA-Z0-9_]{3,20}$')
PASSWORD_PATTERN = re.compile(r'^[a-zA-Z0-9!@#$%^&*]{8,50}$')
DANGEROUS_CHARS = r"[;'\"\\\-*/]"
@staticmethod
def is_valid_username(username: str) -> bool:
if not username:
return False
if not InputValidator.USERNAME_PATTERN.match(username):
return False
if re.search(InputValidator.DANGEROUS_CHARS, username):
return False
return True
@staticmethod
def is_valid_password(password: str) -> bool:
if not password:
return False
if not InputValidator.PASSWORD_PATTERN.match(password):
return False
if re.search(InputValidator.DANGEROUS_CHARS, password):
return False
return True
@staticmethod
def sanitize_input(input_str: Optional[str]) -> str:
if not input_str:
return ""
# Remove potentially dangerous characters
return re.sub(InputValidator.DANGEROUS_CHARS, "", input_str)
class SecureAuthService:
def __init__(self, user_service: UserService):
self.user_service = user_service
def authenticate(self, username: str, password: str) -> bool:
# Validate input
if not InputValidator.is_valid_username(username):
logger.warning(f"Invalid username format: {username}")
return False
if not InputValidator.is_valid_password(password):
logger.warning("Invalid password format")
return False
# Use parameterized query
return self.user_service.authenticate(username, password)
4. Stored Procedures¶
Using stored procedures with parameter binding.
Oracle PL/SQL¶
-- Secure stored procedure
CREATE OR REPLACE PROCEDURE authenticate_user(
p_username IN VARCHAR2,
p_password IN VARCHAR2,
p_result OUT NUMBER
) AS
v_count NUMBER;
BEGIN
SELECT COUNT(*) INTO v_count
FROM users
WHERE username = p_username AND password_hash = p_password;
p_result := v_count;
EXCEPTION
WHEN OTHERS THEN
p_result := 0;
-- Log error for debugging
INSERT INTO error_log (error_time, error_message)
VALUES (SYSDATE, SQLERRM);
END;
/
Java calling stored procedure¶
public boolean authenticateWithStoredProcedure(String username, String password) {
String call = "{call authenticate_user(?, ?, ?)}";
try (CallableStatement stmt = connection.prepareCall(call)) {
stmt.setString(1, username);
stmt.setString(2, hashPassword(password));
stmt.registerOutParameter(3, Types.INTEGER);
stmt.execute();
int result = stmt.getInt(3);
return result > 0;
} catch (SQLException e) {
logger.error("Stored procedure error", e);
return false;
}
}
🔍 Detection and Testing¶
SQL Injection Testing¶
# SQL injection test cases
class SQLInjectionTests:
def test_authentication_bypass(self):
# Test cases for SQL injection
test_cases = [
("admin' OR '1'='1", "password"),
("admin' --", "anything"),
("admin' /*", "password"),
("' OR '1'='1", "' OR '1'='1"),
("'; DROP TABLE users; --", "anything"),
("' UNION SELECT * FROM users --", "anything")
]
for username, password in test_cases:
result = self.auth_service.authenticate(username, password)
assert not result, f"SQL injection vulnerability detected with: {username}"
def test_data_extraction(self):
# Test for data extraction attempts
malicious_inputs = [
"1 UNION SELECT username, password FROM users",
"1 UNION SELECT table_name FROM information_schema.tables",
"1' UNION SELECT column_name FROM information_schema.columns WHERE table_name='users' --"
]
for input_value in malicious_inputs:
try:
result = self.get_user_orders(input_value)
assert len(result) == 1, f"Data extraction vulnerability detected"
except Exception:
# Expected to fail for malicious input
pass
Automated Security Scanning¶
# Using sqlmap for SQL injection testing
sqlmap -u "http://example.com/login" --data="username=admin&password=test" --dbs
# Using OWASP ZAP for security scanning
zap-baseline.py -t http://example.com
# Using bandit for Python security scanning
bandit -r /path/to/python/code
📋 Security Checklist¶
Development Checklist¶
## SQL Injection Prevention Checklist
### Input Validation
- [ ] All user inputs are validated before processing
- [ ] Input length limits are enforced
- [ ] Dangerous characters are filtered or escaped
- [ ] White-listing is used instead of black-listing
### Database Access
- [ ] Parameterized queries are used for all database operations
- [ ] Prepared statements are properly implemented
- [ ] ORM frameworks are used when appropriate
- [ ] Stored procedures use parameter binding
### Authentication & Authorization
- [ ] Passwords are properly hashed (bcrypt, PBKDF2, Argon2)
- [ ] Database connections use least privilege principle
- [ ] Error messages don't reveal database information
- [ ] Authentication failures are logged but not exposed
### Code Review
- [ ] All SQL queries are reviewed for injection vulnerabilities
- [ ] Dynamic SQL construction is avoided
- [ ] String concatenation in queries is prohibited
- [ ] Security testing is included in test suite
Deployment Checklist¶
## Production Security Checklist
### Database Configuration
- [ ] Database user has minimal required permissions
- [ ] Database connections are encrypted
- [ ] Database error messages are generic
- [ ] Database access is logged and monitored
### Application Security
- [ ] Web Application Firewall (WAF) is configured
- [ ] Security headers are properly set
- [ ] Input validation is enforced at application level
- [ ] Regular security scans are performed
### Monitoring & Response
- [ ] SQL injection attempts are logged
- [ ] Security alerts are configured
- [ ] Incident response plan is in place
- [ ] Regular security updates are applied
📚 Related Resources¶
- Logic Errors - Errors in algorithmic thinking
- Runtime Errors - Errors during execution
- Code Review Checklist - Security review guidelines
- Best Practices - Secure coding principles
🔗 Language-Specific Security¶
- Java Common Mistakes - Java security issues
- Python Common Mistakes - Python security issues
- C Common Mistakes - C security issues
- Oracle Common Mistakes - Oracle security issues
🔗 Security Resources¶
- OWASP SQL Injection Prevention - OWASP guidelines
- CWE-89 SQL Injection - Common Weakness Enumeration
- Security Testing Guidelines - Security testing approaches