Browse Source

Chapter 9: User roles and permissions (9a)

master
T. Meissner 6 years ago
parent
commit
e61ae9a2c8
8 changed files with 186 additions and 4 deletions
  1. +19
    -0
      app/decorators.py
  2. +6
    -0
      app/main/__init__.py
  3. +5
    -0
      app/main/errors.py
  4. +77
    -1
      app/models.py
  5. +9
    -0
      app/templates/403.html
  6. +2
    -2
      flasky.py
  7. +32
    -0
      migrations/versions/417960a55273_.py
  8. +36
    -1
      tests/test_user_model.py

+ 19
- 0
app/decorators.py View File

@ -0,0 +1,19 @@
from functools import wraps
from flask import abort
from flask_login import current_user
from .models import Permission
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.can(permission):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
return permission_required(Permission.ADMIN)(f)

+ 6
- 0
app/main/__init__.py View File

@ -5,3 +5,9 @@ main = Blueprint('main', __name__)
from . import views, errors from . import views, errors
from ..models import Permission
@main.context_processor
def inject_permissions():
return dict(Permission=Permission)

+ 5
- 0
app/main/errors.py View File

@ -2,6 +2,11 @@ from flask import render_template
from . import main from . import main
@main.app_errorhandler(403)
def forbidden(e):
return render_template('403.html'), 403
@main.app_errorhandler(404) @main.app_errorhandler(404)
def page_not_found(e): def page_not_found(e):
return render_template('404.html'), 404 return render_template('404.html'), 404


+ 77
- 1
app/models.py View File

@ -2,16 +2,67 @@ from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from itsdangerous import BadSignature from itsdangerous import BadSignature
from flask import current_app from flask import current_app
from flask_login import UserMixin
from flask_login import UserMixin, AnonymousUserMixin
from . import db, login_manager from . import db, login_manager
class Permission:
FOLLOW = 1
COMMENT = 2
WRITE = 4
MODERATE = 8
ADMIN = 16
class Role(db.Model): class Role(db.Model):
__tablename__ = 'roles' __tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True) name = db.Column(db.String(64), unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic') users = db.relationship('User', backref='role', lazy='dynamic')
def __init__(self, **kwargs):
super(Role, self).__init__(**kwargs)
if self.permissions is None:
self.permissions = 0
@staticmethod
def insert_roles():
roles = {
'User': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE],
'Moderator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE],
'Administrator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE,
Permission.ADMIN]
}
default_role = 'User'
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.reset_permissions()
for perm in roles[r]:
role.add_permission(perm)
role.default = (role.name == default_role)
db.session.add(role)
db.session.commit()
def add_permission(self, perm):
if not self.has_permission(perm):
self.permissions += perm
def remove_permission(self, perm):
if self.has_permission(perm):
self.permissions -= perm
def reset_permissions(self):
self.permissions = 0
def has_permission(self, perm):
return self.permissions & perm == perm
def __repr__(self): def __repr__(self):
return '<Role %r>' % self.name return '<Role %r>' % self.name
@ -25,6 +76,14 @@ class User(UserMixin, db.Model):
password_hash = db.Column(db.String(128)) password_hash = db.Column(db.String(128))
confirmed = db.Column(db.Boolean, default=False) confirmed = db.Column(db.Boolean, default=False)
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['FLASKY_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
else:
self.role = Role.query.filter_by(default=True).first()
@property @property
def password(self): def password(self):
raise AttributeError('Password is not a readable attribute') raise AttributeError('Password is not a readable attribute')
@ -92,10 +151,27 @@ class User(UserMixin, db.Model):
db.session.add(self) db.session.add(self)
return True return True
def can(self, perm):
return self.role is not None and self.role.has_permission(perm)
def is_administrator(self):
return self.can(Permission.ADMIN)
def __repr__(self): def __repr__(self):
return '<User %r>' % self.username return '<User %r>' % self.username
class AnonymousUser(AnonymousUserMixin):
def can(self, perm):
return False
def is_administrator(self):
return False
login_manager.anonymous_user = AnonymousUser
@login_manager.user_loader @login_manager.user_loader
def load_user(user_id): def load_user(user_id):
return User.query.get(int(user_id)) return User.query.get(int(user_id))

+ 9
- 0
app/templates/403.html View File

@ -0,0 +1,9 @@
{% extends "base.html" %}
{% block title %}Flasky - Forbidden{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>Forbidden</h1>
</div>
{% endblock %}

+ 2
- 2
flasky.py View File

@ -1,7 +1,7 @@
import os import os
from flask_migrate import Migrate from flask_migrate import Migrate
from app import create_app, db from app import create_app, db
from app.models import User, Role
from app.models import User, Role, Permission
app = create_app(os.getenv('FLASK_CONFIG') or 'default') app = create_app(os.getenv('FLASK_CONFIG') or 'default')
@ -10,7 +10,7 @@ migrate = Migrate(app, db)
@app.shell_context_processor @app.shell_context_processor
def make_shell_context(): def make_shell_context():
return dict(db=db, User=User, Role=Role)
return dict(db=db, User=User, Role=Role, Permission=Permission)
@app.cli.command() @app.cli.command()


+ 32
- 0
migrations/versions/417960a55273_.py View File

@ -0,0 +1,32 @@
"""empty message
Revision ID: 417960a55273
Revises: f8108bd4cd89
Create Date: 2018-11-08 23:29:05.161570
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '417960a55273'
down_revision = 'f8108bd4cd89'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('roles', sa.Column('default', sa.Boolean(), nullable=True))
op.add_column('roles', sa.Column('permissions', sa.Integer(), nullable=True))
op.create_index(op.f('ix_roles_default'), 'roles', ['default'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_roles_default'), table_name='roles')
op.drop_column('roles', 'permissions')
op.drop_column('roles', 'default')
# ### end Alembic commands ###

+ 36
- 1
tests/test_user_model.py View File

@ -1,7 +1,7 @@
import unittest import unittest
import time import time
from app import create_app, db from app import create_app, db
from app.models import User
from app.models import User, AnonymousUser, Role, Permission
class UserModelTestCase(unittest.TestCase): class UserModelTestCase(unittest.TestCase):
@ -10,6 +10,7 @@ class UserModelTestCase(unittest.TestCase):
self.app_context = self.app.app_context() self.app_context = self.app.app_context()
self.app_context.push() self.app_context.push()
db.create_all() db.create_all()
Role.insert_roles()
def tearDown(self): def tearDown(self):
db.session.remove() db.session.remove()
@ -102,3 +103,37 @@ class UserModelTestCase(unittest.TestCase):
token = u2.generate_email_change_token(u1.email) token = u2.generate_email_change_token(u1.email)
self.assertFalse(u2.change_email(token)) self.assertFalse(u2.change_email(token))
self.assertTrue(u2.email == 'dirk@mustermann.de') self.assertTrue(u2.email == 'dirk@mustermann.de')
def test_user_role(self):
u = User(email='max@mustermann.de', password='cat')
self.assertTrue(u.can(Permission.FOLLOW))
self.assertTrue(u.can(Permission.COMMENT))
self.assertTrue(u.can(Permission.WRITE))
self.assertFalse(u.can(Permission.MODERATE))
self.assertFalse(u.can(Permission.ADMIN))
def test_moderator_role(self):
r = Role.query.filter_by(name='Moderator').first()
u = User(email='max@mustermann.de', password='cat', role=r)
self.assertTrue(u.can(Permission.FOLLOW))
self.assertTrue(u.can(Permission.COMMENT))
self.assertTrue(u.can(Permission.WRITE))
self.assertTrue(u.can(Permission.MODERATE))
self.assertFalse(u.can(Permission.ADMIN))
def test_administrator_role(self):
r = Role.query.filter_by(name='Administrator').first()
u = User(email='max@mustermann.de', password='cat', role=r)
self.assertTrue(u.can(Permission.FOLLOW))
self.assertTrue(u.can(Permission.COMMENT))
self.assertTrue(u.can(Permission.WRITE))
self.assertTrue(u.can(Permission.MODERATE))
self.assertTrue(u.can(Permission.ADMIN))
def test_anonymous_user(self):
u = AnonymousUser()
self.assertFalse(u.can(Permission.FOLLOW))
self.assertFalse(u.can(Permission.COMMENT))
self.assertFalse(u.can(Permission.WRITE))
self.assertFalse(u.can(Permission.MODERATE))
self.assertFalse(u.can(Permission.ADMIN))

Loading…
Cancel
Save