Add attendee type management and staff codes
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -23,6 +23,8 @@ from flask_babel import Babel, gettext as _, ngettext, force_locale
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.lib.units import mm
|
||||
from reportlab.pdfgen import canvas
|
||||
import openpyxl
|
||||
from openpyxl.styles import Font, PatternFill, Alignment
|
||||
from mysql.connector import Error
|
||||
|
||||
from config import Config
|
||||
@@ -116,6 +118,7 @@ ENGLISH_TRANSLATIONS = {
|
||||
'dashboard': 'Dashboard',
|
||||
'logout': 'Logout',
|
||||
'login': 'Login',
|
||||
'organizer_login': 'Organizer Login',
|
||||
'register': 'Register',
|
||||
'events': 'Events',
|
||||
'attendees': 'Attendees',
|
||||
@@ -296,6 +299,12 @@ ENGLISH_TRANSLATIONS = {
|
||||
'copy': 'Copy',
|
||||
'delete': 'Delete',
|
||||
'confirm_delete_type': 'Are you sure you want to delete this attendee type?',
|
||||
'edit_attendee_type': 'Edit Attendee Type',
|
||||
'back_to_types': 'Back to Types',
|
||||
'type_details': 'Type Details',
|
||||
'save_changes': 'Save Changes',
|
||||
'edit': 'Edit',
|
||||
'cannot_delete_type_with_attendees': 'Cannot delete: %d attendees assigned to this type',
|
||||
'no_attendee_types_yet': 'No attendee types defined yet.',
|
||||
'create_first_type': 'Create the first type',
|
||||
'back_to_event': 'Back to Event',
|
||||
@@ -583,6 +592,22 @@ def generate_attendee_code():
|
||||
conn.close()
|
||||
|
||||
|
||||
def generate_staff_code():
|
||||
"""Generate a unique 10-character alphanumeric staff code."""
|
||||
chars = string.ascii_uppercase + string.digits
|
||||
while True:
|
||||
code = ''.join(random.choices(chars, k=10))
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT id FROM organizers WHERE staff_code = %s", (code,))
|
||||
if not cursor.fetchone():
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return code
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
def generate_type_code():
|
||||
"""Generate a unique 10-character alphanumeric attendee type code."""
|
||||
chars = string.ascii_uppercase + string.digits
|
||||
@@ -648,6 +673,11 @@ def verify_recaptcha(recaptcha_response):
|
||||
# Rate limiting for failed login attempts
|
||||
failed_login_attempts = {} # {ip_address: [(timestamp, count), ...]}
|
||||
|
||||
# Rate limiting for failed personal page attempts (30-minute block)
|
||||
failed_personal_page_attempts = {} # {ip_address: [(timestamp, count), ...]}
|
||||
PERSONAL_PAGE_BLOCK_DURATION = 30 * 60 # 30 minutes in seconds
|
||||
PERSONAL_PAGE_MAX_ATTEMPTS = 5 # Max failed attempts before blocking
|
||||
|
||||
def is_ip_blocked(ip_address):
|
||||
"""Check if an IP address is blocked due to too many failed login attempts."""
|
||||
if ip_address not in failed_login_attempts:
|
||||
@@ -671,6 +701,43 @@ def is_ip_blocked(ip_address):
|
||||
return False
|
||||
|
||||
|
||||
def is_ip_blocked_for_personal_page(ip_address):
|
||||
"""Check if an IP address is blocked due to too many failed personal page attempts."""
|
||||
if ip_address not in failed_personal_page_attempts:
|
||||
return False
|
||||
|
||||
# Clean up old entries (older than 30 minutes)
|
||||
current_time = datetime.now()
|
||||
failed_personal_page_attempts[ip_address] = [
|
||||
(ts, count) for ts, count in failed_personal_page_attempts[ip_address]
|
||||
if (current_time - ts).total_seconds() < PERSONAL_PAGE_BLOCK_DURATION
|
||||
]
|
||||
|
||||
# Check if still blocked
|
||||
if len(failed_personal_page_attempts[ip_address]) >= PERSONAL_PAGE_MAX_ATTEMPTS:
|
||||
return True
|
||||
|
||||
# Remove empty entries
|
||||
if not failed_personal_page_attempts[ip_address]:
|
||||
del failed_personal_page_attempts[ip_address]
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def record_failed_personal_page_attempt(ip_address):
|
||||
"""Record a failed personal page attempt for an IP address."""
|
||||
if ip_address not in failed_personal_page_attempts:
|
||||
failed_personal_page_attempts[ip_address] = []
|
||||
|
||||
failed_personal_page_attempts[ip_address].append((datetime.now(), 1))
|
||||
|
||||
|
||||
def clear_failed_personal_page_attempts(ip_address):
|
||||
"""Clear failed personal page attempts after successful access."""
|
||||
if ip_address in failed_personal_page_attempts:
|
||||
del failed_personal_page_attempts[ip_address]
|
||||
|
||||
|
||||
def record_failed_login(ip_address):
|
||||
"""Record a failed login attempt for an IP address."""
|
||||
if ip_address not in failed_login_attempts:
|
||||
@@ -685,6 +752,82 @@ def clear_failed_logins(ip_address):
|
||||
del failed_login_attempts[ip_address]
|
||||
|
||||
|
||||
def ensure_staff_code_column():
|
||||
"""Ensure the staff_code column exists in organizers table."""
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'organizers' AND COLUMN_NAME = 'staff_code'
|
||||
""", (Config.DB_NAME,))
|
||||
if not cursor.fetchone():
|
||||
cursor.execute("ALTER TABLE organizers ADD COLUMN staff_code VARCHAR(10) UNIQUE DEFAULT NULL")
|
||||
conn.commit()
|
||||
print("Added staff_code column to organizers table")
|
||||
cursor.close()
|
||||
conn.close()
|
||||
except Error as e:
|
||||
print(f"Error ensuring staff_code column: {e}")
|
||||
|
||||
|
||||
def ensure_all_organizers_have_staff_code():
|
||||
"""Ensure all existing organizers have a staff_code."""
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("SELECT id FROM organizers WHERE staff_code IS NULL")
|
||||
organizers_without_code = cursor.fetchall()
|
||||
for org in organizers_without_code:
|
||||
staff_code = generate_staff_code()
|
||||
cursor.execute("UPDATE organizers SET staff_code = %s WHERE id = %s", (staff_code, org['id']))
|
||||
if organizers_without_code:
|
||||
conn.commit()
|
||||
print(f"Generated staff_codes for {len(organizers_without_code)} organizers")
|
||||
cursor.close()
|
||||
conn.close()
|
||||
except Error as e:
|
||||
print(f"Error generating staff codes: {e}")
|
||||
|
||||
|
||||
def ensure_staff_staff_code_column():
|
||||
"""Ensure the staff_code column exists in staff table."""
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'staff' AND COLUMN_NAME = 'staff_code'
|
||||
""", (Config.DB_NAME,))
|
||||
if not cursor.fetchone():
|
||||
cursor.execute("ALTER TABLE staff ADD COLUMN staff_code VARCHAR(10) UNIQUE DEFAULT NULL")
|
||||
conn.commit()
|
||||
print("Added staff_code column to staff table")
|
||||
cursor.close()
|
||||
conn.close()
|
||||
except Error as e:
|
||||
print(f"Error ensuring staff staff_code column: {e}")
|
||||
|
||||
|
||||
def ensure_all_staff_have_staff_code():
|
||||
"""Ensure all existing staff members have a staff_code."""
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
cursor.execute("SELECT id FROM staff WHERE staff_code IS NULL")
|
||||
staff_without_code = cursor.fetchall()
|
||||
for s in staff_without_code:
|
||||
staff_code = generate_staff_code()
|
||||
cursor.execute("UPDATE staff SET staff_code = %s WHERE id = %s", (staff_code, s['id']))
|
||||
if staff_without_code:
|
||||
conn.commit()
|
||||
print(f"Generated staff_codes for {len(staff_without_code)} staff members")
|
||||
cursor.close()
|
||||
conn.close()
|
||||
except Error as e:
|
||||
print(f"Error generating staff codes: {e}")
|
||||
|
||||
|
||||
def allowed_file(filename):
|
||||
"""Check if file extension is allowed."""
|
||||
return '.' in filename and filename.rsplit('.', 1)[1].lower() in Config.ALLOWED_EXTENSIONS
|
||||
@@ -1001,6 +1144,157 @@ def generate_badge_pdf(attendee, event=None):
|
||||
return buffer
|
||||
|
||||
|
||||
def generate_rectangular_badges_pdf(attendees, event=None, attendee_types_map=None):
|
||||
"""Generate an A4 PDF with rectangular labels.
|
||||
|
||||
Label dimensions: 80mm x 50mm
|
||||
Layout: 2 columns x 5 rows = 10 badges per A4 page.
|
||||
Each badge contains: attendee type (black bar), name, organization, role, and QR code.
|
||||
"""
|
||||
buffer = io.BytesIO()
|
||||
c = canvas.Canvas(buffer, pagesize=A4)
|
||||
width, height = A4 # 595.28 x 841.89 points
|
||||
|
||||
# Label dimensions: 80mm x 50mm
|
||||
label_width_mm = 80
|
||||
label_height_mm = 50
|
||||
label_width = label_width_mm * mm
|
||||
label_height = label_height_mm * mm
|
||||
|
||||
badges_per_row = 2
|
||||
badges_per_col = 5
|
||||
badges_per_page = badges_per_row * badges_per_col
|
||||
|
||||
# Page margins
|
||||
margin_left_mm = 15
|
||||
margin_right_mm = 15
|
||||
margin_top_mm = 12
|
||||
margin_bottom_mm = 12
|
||||
|
||||
# Calculate spacing
|
||||
available_width = width - (margin_left_mm * mm) - (margin_right_mm * mm)
|
||||
available_height = height - (margin_top_mm * mm) - (margin_bottom_mm * mm)
|
||||
|
||||
# Gap between labels
|
||||
gap_x = (available_width - badges_per_row * label_width) / (badges_per_row + 1)
|
||||
gap_y = (available_height - badges_per_col * label_height) / (badges_per_col + 1)
|
||||
|
||||
# Badge positions (starting from top-left)
|
||||
badge_positions = []
|
||||
for row in range(badges_per_col):
|
||||
for col in range(badges_per_row):
|
||||
x = margin_left_mm * mm + gap_x + col * (label_width + gap_x)
|
||||
y = height - margin_top_mm * mm - gap_y - row * (label_height + gap_y) - label_height
|
||||
badge_positions.append((x, y))
|
||||
|
||||
# Draw badges for each attendee
|
||||
for i, attendee in enumerate(attendees):
|
||||
badge_index = i % badges_per_page
|
||||
|
||||
# New page if needed
|
||||
if badge_index == 0 and i > 0:
|
||||
c.showPage()
|
||||
|
||||
if badge_index < len(badge_positions):
|
||||
x, y = badge_positions[badge_index]
|
||||
|
||||
# Draw label border (light gray)
|
||||
c.setStrokeColorRGB(0.7, 0.7, 0.7)
|
||||
c.setLineWidth(0.5)
|
||||
c.rect(x, y, label_width, label_height, fill=False, stroke=True)
|
||||
|
||||
# Get attendee type name from mapping
|
||||
attendee_type_id = attendee.get('attendee_type_id')
|
||||
attendee_type_name = ''
|
||||
if attendee_type_id and attendee_types_map:
|
||||
type_info = attendee_types_map.get(attendee_type_id, {})
|
||||
attendee_type_name = type_info.get('name', '') or ''
|
||||
|
||||
# Draw black bar at top with attendee type in white
|
||||
if attendee_type_name:
|
||||
bar_height = 8 * mm
|
||||
c.setFillColorRGB(0, 0, 0)
|
||||
c.rect(x, y + label_height - bar_height, label_width, bar_height, fill=True, stroke=False)
|
||||
c.setFillColorRGB(1, 1, 1)
|
||||
c.setFont("Helvetica-Bold", 16)
|
||||
c.drawCentredString(x + label_width / 2, y + label_height - bar_height + 1.5 * mm, attendee_type_name[:20])
|
||||
|
||||
# Draw attendee name (20pt)
|
||||
first_name = attendee.get('first_name', '') or ''
|
||||
last_name = attendee.get('last_name', '') or ''
|
||||
full_name = f"{first_name} {last_name}".strip()
|
||||
|
||||
font_size = 20
|
||||
max_name_width = label_width - 2 * mm
|
||||
name_width = c.stringWidth(full_name, "Helvetica-Bold", font_size)
|
||||
if name_width > max_name_width:
|
||||
# Scale down to fit
|
||||
font_size = int(font_size * max_name_width / name_width)
|
||||
|
||||
c.setFont("Helvetica-Bold", font_size)
|
||||
c.setFillColorRGB(0, 0, 0)
|
||||
|
||||
name_y = y + label_height - 22 * mm
|
||||
c.drawCentredString(x + label_width / 2, name_y, full_name)
|
||||
|
||||
# Draw organization
|
||||
org = attendee.get('organisation', '') or ''
|
||||
c.setFont("Helvetica", 14)
|
||||
org_y = name_y - 8 * mm
|
||||
if org:
|
||||
c.drawCentredString(x + label_width / 2, org_y, org[:22])
|
||||
|
||||
# Draw role
|
||||
role = attendee.get('role', '') or ''
|
||||
c.setFont("Helvetica", 14)
|
||||
if role:
|
||||
role_y = org_y - 7 * mm
|
||||
c.drawCentredString(x + label_width / 2, role_y, role[:22])
|
||||
|
||||
# Generate QR code at bottom-right
|
||||
attendee_code = attendee.get('attendee_code', '')
|
||||
if attendee_code:
|
||||
qr_size = 12 * mm
|
||||
qr_x = x + label_width - qr_size - 2 * mm
|
||||
qr_y = y + 10 * mm
|
||||
|
||||
qr = qrcode.QRCode(version=1, box_size=2, border=0)
|
||||
qr.add_data(attendee_code)
|
||||
qr.make(fit=True)
|
||||
qr_img = qr.make_image(fill_color="black", back_color="white")
|
||||
|
||||
qr_buffer = io.BytesIO()
|
||||
qr_img.save(qr_buffer, format='PNG')
|
||||
qr_buffer.seek(0)
|
||||
|
||||
from reportlab.lib.utils import ImageReader
|
||||
qr_reader = ImageReader(qr_buffer)
|
||||
c.drawImage(qr_reader, qr_x, qr_y, width=qr_size, height=qr_size)
|
||||
|
||||
# Draw black bar at bottom with event name and start date
|
||||
bar_height = 8 * mm
|
||||
c.setFillColorRGB(0, 0, 0)
|
||||
c.rect(x, y, label_width, bar_height, fill=True, stroke=False)
|
||||
c.setFillColorRGB(1, 1, 1)
|
||||
c.setFont("Helvetica-Bold", 14)
|
||||
|
||||
event_name = (event.get('name', '') or '') if event else ''
|
||||
start_time = event.get('start_time', '') if event else ''
|
||||
event_date_str = ''
|
||||
if start_time:
|
||||
if hasattr(start_time, 'strftime'):
|
||||
event_date_str = start_time.strftime('%d-%m-%Y')
|
||||
else:
|
||||
event_date_str = str(start_time)[:10]
|
||||
|
||||
bar_text = f"{event_name} — {event_date_str}" if event_date_str else event_name
|
||||
c.drawCentredString(x + label_width / 2, y + bar_height / 2 - 1.5 * mm, bar_text)
|
||||
|
||||
c.save()
|
||||
buffer.seek(0)
|
||||
return buffer
|
||||
|
||||
|
||||
def send_badge_email(attendee_email, attendee_name, event_name, pdf_buffer):
|
||||
"""Send email with badge PDF attachment."""
|
||||
subject = f"Your Badge for {event_name}"
|
||||
@@ -1052,6 +1346,65 @@ def index():
|
||||
return render_template('index.html')
|
||||
|
||||
|
||||
@app.route('/staff/<staff_code>')
|
||||
def staff_login(staff_code):
|
||||
"""Staff/organizer login via staff_code (no password required)."""
|
||||
client_ip = request.remote_addr
|
||||
|
||||
# Check if IP is blocked (reuse login rate limiting)
|
||||
if is_ip_blocked(client_ip):
|
||||
flash('Too many failed login attempts. Please try again later (blocked for 1 hour).')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
# First check if it's an organizer
|
||||
cursor.execute("SELECT * FROM organizers WHERE staff_code = %s", (staff_code,))
|
||||
organizer = cursor.fetchone()
|
||||
|
||||
if organizer:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
clear_failed_logins(client_ip)
|
||||
|
||||
# Log in the organizer
|
||||
session['user_id'] = organizer['id']
|
||||
session['user_type'] = 'organizer'
|
||||
session['event_id'] = None
|
||||
|
||||
flash(f'Welcome back, {organizer["name"]}!')
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
# Check if it's a staff member
|
||||
cursor.execute("SELECT * FROM staff WHERE staff_code = %s", (staff_code,))
|
||||
staff_member = cursor.fetchone()
|
||||
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
if not staff_member:
|
||||
record_failed_login(client_ip)
|
||||
flash('Invalid staff link.')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
# Clear failed login attempts on successful staff code login
|
||||
clear_failed_logins(client_ip)
|
||||
|
||||
# Log in the staff member
|
||||
session['user_id'] = staff_member['id']
|
||||
session['user_type'] = 'staff'
|
||||
session['event_id'] = staff_member['event_id']
|
||||
|
||||
flash(f'Welcome back, {staff_member["first_name"]}!')
|
||||
return redirect(url_for('staff_dashboard'))
|
||||
|
||||
except Error as e:
|
||||
flash(f'Database error: {e}')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
|
||||
# Routes - Auth
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
def login():
|
||||
@@ -1255,9 +1608,10 @@ def register_organizer():
|
||||
|
||||
# Create organizer
|
||||
password_hash = hash_password(password)
|
||||
staff_code = generate_staff_code()
|
||||
cursor.execute(
|
||||
"INSERT INTO organizers (email, password_hash, name) VALUES (%s, %s, %s)",
|
||||
(email, password_hash, name)
|
||||
"INSERT INTO organizers (email, password_hash, name, staff_code) VALUES (%s, %s, %s, %s)",
|
||||
(email, password_hash, name, staff_code)
|
||||
)
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
@@ -1670,6 +2024,17 @@ def create_breakout_session(event_id):
|
||||
flash(f'Database error: {e}')
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
# Check if event is single-day
|
||||
is_single_day = False
|
||||
prefilled_date = ''
|
||||
if event.get('start_time') and event.get('end_time'):
|
||||
start = event['start_time']
|
||||
end = event['end_time']
|
||||
if hasattr(start, 'date') and hasattr(end, 'date'):
|
||||
is_single_day = start.date() == end.date()
|
||||
if is_single_day:
|
||||
prefilled_date = start.strftime('%Y-%m-%d')
|
||||
|
||||
if request.method == 'POST':
|
||||
name = request.form.get('name', '').strip()
|
||||
description = request.form.get('description', '').strip()
|
||||
@@ -1722,7 +2087,7 @@ def create_breakout_session(event_id):
|
||||
except Error as e:
|
||||
flash(f'Database error: {e}')
|
||||
|
||||
return render_template('organizer/create_breakout_session.html', event=event)
|
||||
return render_template('organizer/create_breakout_session.html', event=event, prefilled_date=prefilled_date)
|
||||
|
||||
|
||||
@app.route('/organizer/breakout-session/<int:session_id>/edit', methods=['GET', 'POST'])
|
||||
@@ -2298,6 +2663,72 @@ def manage_attendee_types(event_id):
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
|
||||
@app.route('/organizer/event/<int:event_id>/attendee-type/<int:type_id>/edit', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def edit_attendee_type(event_id, type_id):
|
||||
"""Edit an attendee type."""
|
||||
if session.get('user_type') != 'organizer':
|
||||
flash('Access denied.')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
||||
event = cursor.fetchone()
|
||||
|
||||
if not event:
|
||||
flash('Event not found.')
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
cursor.execute("SELECT * FROM attendee_types WHERE id = %s AND event_id = %s", (type_id, event_id))
|
||||
attendee_type = cursor.fetchone()
|
||||
|
||||
if not attendee_type:
|
||||
flash('Attendee type not found.')
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return redirect(url_for('manage_attendee_types', event_id=event_id))
|
||||
|
||||
# Check how many attendees are assigned to this type
|
||||
cursor.execute("SELECT COUNT(*) as count FROM attendees WHERE attendee_type_id = %s", (type_id,))
|
||||
attendee_count = cursor.fetchone()['count']
|
||||
|
||||
if request.method == 'POST':
|
||||
name = request.form.get('name', '').strip()
|
||||
price = request.form.get('price', '').strip()
|
||||
|
||||
if not name:
|
||||
flash('Type name is required.')
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return render_template('organizer/edit_attendee_type.html', event=event, attendee_type=attendee_type)
|
||||
|
||||
try:
|
||||
price_value = float(price) if price else 0.00
|
||||
except ValueError:
|
||||
price_value = 0.00
|
||||
|
||||
cursor.execute("""
|
||||
UPDATE attendee_types SET name = %s, price = %s WHERE id = %s
|
||||
""", (name, price_value, type_id))
|
||||
conn.commit()
|
||||
flash(f'Attendee type "{name}" updated successfully!')
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return redirect(url_for('manage_attendee_types', event_id=event_id))
|
||||
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return render_template('organizer/edit_attendee_type.html', event=event, attendee_type=attendee_type, attendee_count=attendee_count)
|
||||
except Error as e:
|
||||
flash(f'Database error: {e}')
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
|
||||
@app.route('/organizer/event/<int:event_id>/attendee-type/<int:type_id>/delete', methods=['POST'])
|
||||
@login_required
|
||||
def delete_attendee_type(event_id, type_id):
|
||||
@@ -2451,10 +2882,11 @@ def manage_event_staff(event_id):
|
||||
return render_template('organizer/event_staff.html', event=event, staff_members=staff_members)
|
||||
|
||||
invite_token = generate_invite_token()
|
||||
staff_code = generate_staff_code()
|
||||
cursor.execute("""
|
||||
INSERT INTO staff (event_id, email, first_name, last_name, invite_token)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
""", (event_id, email, first_name, last_name, invite_token))
|
||||
INSERT INTO staff (event_id, email, first_name, last_name, invite_token, staff_code)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)
|
||||
""", (event_id, email, first_name, last_name, invite_token, staff_code))
|
||||
conn.commit()
|
||||
|
||||
# Get organizer email for sending
|
||||
@@ -2692,10 +3124,11 @@ def batch_add_staff(event_id):
|
||||
added_count = 0
|
||||
for member in source_staff:
|
||||
if member['email'].lower() not in existing_emails:
|
||||
staff_code = generate_staff_code()
|
||||
cursor.execute("""
|
||||
INSERT INTO staff (event_id, first_name, last_name, email, invite_token, invite_used, created_at)
|
||||
VALUES (%s, %s, %s, %s, UUID(), FALSE, NOW())
|
||||
""", (event_id, member['first_name'], member['last_name'], member['email']))
|
||||
INSERT INTO staff (event_id, first_name, last_name, email, invite_token, invite_used, created_at, staff_code)
|
||||
VALUES (%s, %s, %s, %s, UUID(), FALSE, NOW(), %s)
|
||||
""", (event_id, member['first_name'], member['last_name'], member['email'], staff_code))
|
||||
existing_emails.add(member['email'].lower())
|
||||
added_count += 1
|
||||
|
||||
@@ -2776,6 +3209,10 @@ def edit_attendee(attendee_id):
|
||||
conn.close()
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
# Get attendee types for this event
|
||||
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s ORDER BY name", (attendee['event_id'],))
|
||||
attendee_types = cursor.fetchall()
|
||||
|
||||
if request.method == 'POST':
|
||||
first_name = request.form.get('first_name', '').strip()
|
||||
last_name = request.form.get('last_name', '').strip()
|
||||
@@ -2783,12 +3220,13 @@ def edit_attendee(attendee_id):
|
||||
organisation = request.form.get('organisation', '').strip()
|
||||
role = request.form.get('role', '').strip()
|
||||
introduction = request.form.get('introduction', '').strip()
|
||||
attendee_type_id = request.form.get('attendee_type_id', '').strip()
|
||||
|
||||
cursor.execute("""
|
||||
UPDATE attendees
|
||||
SET first_name = %s, last_name = %s, email = %s, organisation = %s, role = %s, introduction = %s
|
||||
SET first_name = %s, last_name = %s, email = %s, organisation = %s, role = %s, introduction = %s, attendee_type_id = %s
|
||||
WHERE id = %s
|
||||
""", (first_name, last_name, email, organisation, role, introduction, attendee_id))
|
||||
""", (first_name, last_name, email, organisation, role, introduction, attendee_type_id if attendee_type_id else None, attendee_id))
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
@@ -2798,7 +3236,7 @@ def edit_attendee(attendee_id):
|
||||
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return render_template('organizer/edit_attendee.html', attendee=attendee)
|
||||
return render_template('organizer/edit_attendee.html', attendee=attendee, attendee_types=attendee_types)
|
||||
except Error as e:
|
||||
flash(f'Database error: {e}')
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
@@ -2883,7 +3321,145 @@ def event_badges(event_id):
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
|
||||
@app.route('/organizer/event/<int:event_id>/stats')
|
||||
@app.route('/organizer/event/<int:event_id>/badges/rectangular/download')
|
||||
@login_required
|
||||
def download_rectangular_badges(event_id):
|
||||
"""Download rectangular badge PDFs for event."""
|
||||
if session.get('user_type') != 'organizer':
|
||||
flash('Access denied.')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
||||
event = cursor.fetchone()
|
||||
|
||||
if not event:
|
||||
flash('Event not found.')
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
|
||||
attendees = cursor.fetchall()
|
||||
|
||||
# Get attendee types for this event
|
||||
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s", (event_id,))
|
||||
attendee_types = cursor.fetchall()
|
||||
attendee_types_map = {at['id']: at for at in attendee_types}
|
||||
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
if not attendees:
|
||||
flash('No attendees found for this event.')
|
||||
return redirect(url_for('event_badges', event_id=event_id))
|
||||
|
||||
# Generate rectangular badge PDF
|
||||
pdf_buffer = generate_rectangular_badges_pdf(attendees, event, attendee_types_map)
|
||||
|
||||
filename = f"badges_rectangular_{event.get('code', event_id)}.pdf"
|
||||
return send_file(
|
||||
pdf_buffer,
|
||||
mimetype='application/pdf',
|
||||
as_attachment=True,
|
||||
download_name=filename
|
||||
)
|
||||
except Error as e:
|
||||
flash(f'Database error: {e}')
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
|
||||
@app.route('/organizer/event/<int:event_id>/attendees/excel/download')
|
||||
@login_required
|
||||
def download_attendees_excel(event_id):
|
||||
"""Download all attendees as Excel sheet."""
|
||||
if session.get('user_type') != 'organizer':
|
||||
flash('Access denied.')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
|
||||
cursor.execute("SELECT * FROM events WHERE id = %s AND organizer_id = %s", (event_id, session['user_id']))
|
||||
event = cursor.fetchone()
|
||||
|
||||
if not event:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
flash('Event not found.')
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
|
||||
cursor.execute("SELECT * FROM attendees WHERE event_id = %s ORDER BY last_name, first_name", (event_id,))
|
||||
attendees = cursor.fetchall()
|
||||
|
||||
# Get attendee types
|
||||
cursor.execute("SELECT * FROM attendee_types WHERE event_id = %s", (event_id,))
|
||||
attendee_types = cursor.fetchall()
|
||||
attendee_types_map = {at['id']: at for at in attendee_types}
|
||||
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
if not attendees:
|
||||
flash('No attendees found.')
|
||||
return redirect(url_for('event_badges', event_id=event_id))
|
||||
|
||||
# Create Excel workbook
|
||||
wb = openpyxl.Workbook()
|
||||
ws = wb.active
|
||||
ws.title = "Attendees"
|
||||
|
||||
# Header styling
|
||||
header_font = Font(bold=True, color="FFFFFF")
|
||||
header_fill = PatternFill("solid", fgColor="333333")
|
||||
header_alignment = Alignment(horizontal="center")
|
||||
|
||||
headers = ['ID', 'Voornaam', 'Achternaam', 'Email', 'Organisatie', 'Rol', 'Type', 'Ingecheckt', 'Aangemeld op']
|
||||
for col, header in enumerate(headers, 1):
|
||||
cell = ws.cell(row=1, column=col, value=header)
|
||||
cell.font = header_font
|
||||
cell.fill = header_fill
|
||||
cell.alignment = header_alignment
|
||||
|
||||
# Data rows
|
||||
for row, attendee in enumerate(attendees, 2):
|
||||
ws.cell(row=row, column=1, value=attendee.get('id'))
|
||||
ws.cell(row=row, column=2, value=attendee.get('first_name', ''))
|
||||
ws.cell(row=row, column=3, value=attendee.get('last_name', ''))
|
||||
ws.cell(row=row, column=4, value=attendee.get('email', ''))
|
||||
ws.cell(row=row, column=5, value=attendee.get('organisation', ''))
|
||||
ws.cell(row=row, column=6, value=attendee.get('role', ''))
|
||||
ws.cell(row=row, column=7, value=attendee_types_map.get(attendee.get('attendee_type_id'), {}).get('name', ''))
|
||||
ws.cell(row=row, column=8, value='Ja' if attendee.get('checked_in') else 'Nee')
|
||||
ws.cell(row=row, column=9, value=str(attendee.get('created_at', ''))[:19] if attendee.get('created_at') else '')
|
||||
|
||||
# Auto-fit column widths
|
||||
for col in ws.columns:
|
||||
max_length = 0
|
||||
column_letter = col[0].column_letter
|
||||
for cell in col:
|
||||
if cell.value:
|
||||
max_length = max(max_length, len(str(cell.value)))
|
||||
ws.column_dimensions[column_letter].width = min(max_length + 2, 30)
|
||||
|
||||
filename = f"attendees_{event.get('code', event_id)}.xlsx"
|
||||
buffer = io.BytesIO()
|
||||
wb.save(buffer)
|
||||
buffer.seek(0)
|
||||
|
||||
return send_file(
|
||||
buffer,
|
||||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||||
as_attachment=True,
|
||||
download_name=filename
|
||||
)
|
||||
except Error as e:
|
||||
flash(f'Database error: {e}')
|
||||
return redirect(url_for('organizer_dashboard'))
|
||||
@login_required
|
||||
def event_stats(event_id):
|
||||
"""Attendance statistics for event."""
|
||||
@@ -3948,6 +4524,13 @@ def payment_page(code):
|
||||
@app.route('/attendee/personal/<token>')
|
||||
def attendee_personal_page(token):
|
||||
"""Personal page accessed via confirmation email link."""
|
||||
client_ip = request.remote_addr
|
||||
|
||||
# Check if IP is blocked
|
||||
if is_ip_blocked_for_personal_page(client_ip):
|
||||
flash(f'Too many invalid attempts. Please try again later (blocked for 30 minutes).')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
cursor = conn.cursor(dictionary=True)
|
||||
@@ -3956,17 +4539,23 @@ def attendee_personal_page(token):
|
||||
cursor.execute("SELECT * FROM attendees WHERE confirmation_token = %s", (token,))
|
||||
attendee = cursor.fetchone()
|
||||
|
||||
# Fallback: also accept attendee_code for old links where token was cleared
|
||||
if not attendee:
|
||||
flash('Invalid or expired link.')
|
||||
cursor.execute("SELECT * FROM attendees WHERE attendee_code = %s", (token,))
|
||||
attendee = cursor.fetchone()
|
||||
|
||||
if not attendee:
|
||||
record_failed_personal_page_attempt(client_ip)
|
||||
flash('Invalid link.')
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return redirect(url_for('index'))
|
||||
|
||||
# Clear confirmation token (one-time use for security)
|
||||
cursor.execute("UPDATE attendees SET confirmation_token = NULL WHERE id = %s", (attendee['id'],))
|
||||
conn.commit()
|
||||
# Clear failed attempts on successful access
|
||||
clear_failed_personal_page_attempts(client_ip)
|
||||
|
||||
# Auto-login the attendee
|
||||
# Note: confirmation_token is kept so the link remains valid indefinitely
|
||||
session['user_id'] = attendee['id']
|
||||
session['user_type'] = 'attendee'
|
||||
session['event_id'] = attendee['event_id']
|
||||
@@ -4030,4 +4619,10 @@ def view_event(event_id):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Ensure staff_code column exists in organizers and staff tables
|
||||
ensure_staff_code_column()
|
||||
ensure_all_organizers_have_staff_code()
|
||||
ensure_staff_staff_code_column()
|
||||
ensure_all_staff_have_staff_code()
|
||||
|
||||
app.run(debug=True, host='0.0.0.0', port=5002)
|
||||
|
||||
Reference in New Issue
Block a user