|
| 1 | +from tableauserverclient.models.user_item import UserItem |
| 2 | +from typing import List, Tuple |
| 3 | +from enum import IntEnum |
| 4 | + |
| 5 | + |
| 6 | + |
| 7 | +class UserCSVObject: |
| 8 | + def __init__(self): |
| 9 | + self.name = None |
| 10 | + self.password = None |
| 11 | + self.fullname = None |
| 12 | + self.license_level = None |
| 13 | + self.admin_level = None |
| 14 | + self.publisher = None |
| 15 | + self.email = None |
| 16 | + self.auth = None |
| 17 | + |
| 18 | + def populate(self, values: List[str]) -> None: |
| 19 | + n_values = len(values) |
| 20 | + self.name = values[0] |
| 21 | + if n_values >= 2: |
| 22 | + self.password = values[1] |
| 23 | + if n_values >= 3: |
| 24 | + self.fullname = values[2] |
| 25 | + if n_values >= 4: |
| 26 | + self.license_level = values[3] |
| 27 | + if n_values >= 5: |
| 28 | + self.admin_level = values[4] |
| 29 | + if n_values >= 6: |
| 30 | + self.publisher = values[5] |
| 31 | + if n_values >= 7: |
| 32 | + self.email = values[6] |
| 33 | + if n_values >= 8: |
| 34 | + self.auth = values[7] |
| 35 | + |
| 36 | + def to_tsc_user(self) -> UserItem: |
| 37 | + site_role = UserCSVImport.evaluate_site_role(self.license_level, self.admin_level, self.publisher) |
| 38 | + if not site_role: |
| 39 | + raise AttributeError("Site role is required") |
| 40 | + user = UserItem(self.name, site_role, self.auth) |
| 41 | + user.email = self.email |
| 42 | + user.fullname = self.fullname |
| 43 | + return user |
| 44 | + |
| 45 | + |
| 46 | + |
| 47 | +class UserCSVImport(object): |
| 48 | + """ |
| 49 | + This class includes hardcoded options and logic for the CSV file format defined for user import |
| 50 | + https://help.tableau.com/current/server/en-us/users_import.htm |
| 51 | + """ |
| 52 | + |
| 53 | + # username, password, display_name, license, admin_level, pu
F438
blishing, email, auth type |
| 54 | + class ColumnType(IntEnum): |
| 55 | + USERNAME = 0 |
| 56 | + PASS = 1 |
| 57 | + DISPLAY_NAME = 2 |
| 58 | + LICENSE = 3 # aka site role |
| 59 | + ADMIN = 4 |
| 60 | + PUBLISHER = 5 |
| 61 | + EMAIL = 6 |
| 62 | + AUTH = 7 |
| 63 | + # version 3.25 and later |
| 64 | + IDP_NAME = 8 |
| 65 | + IDP_ID = 9 |
| 66 | + |
| 67 | + MAX = 7 |
| 68 | + |
| 69 | + # maxColumns = v3.25+ ? 9 : 7 |
| 70 | + |
| 71 | + # Take in a list of strings in expected order |
| 72 | + # and create a user item populated by the given attributes |
| 73 | + @staticmethod |
| 74 | + def create_user_model_from_line(line_values: List[str]) -> "UserItem": |
| 75 | + UserCSVImport._validate_import_line_or_throw(line_values) |
| 76 | + values: List[str] = list(map(lambda x: x.strip(), line_values)) |
| 77 | + user = UserItem(values[UserCSVImport.ColumnType.USERNAME]) |
| 78 | + if len(values) > 1: |
| 79 | + if len(values) > UserCSVImport.ColumnType.MAX: |
| 80 | + raise ValueError("Too many attributes for user import") |
| 81 | + while len(values) <= UserCSVImport.ColumnType.MAX: |
| 82 | + values.append("") |
| 83 | + |
| 84 | + site_role = UserCSVImport._evaluate_site_role( |
| 85 | + values[UserCSVImport.ColumnType.LICENSE], |
| 86 | + values[UserCSVImport.ColumnType.ADMIN], |
| 87 | + values[UserCSVImport.ColumnType.PUBLISHER], |
| 88 | + ) |
| 89 | + if not site_role: |
| 90 | + raise AttributeError("Site role is required") |
| 91 | + |
| 92 | + user._set_values( |
| 93 | + None, # id |
| 94 | + values[UserCSVImport.ColumnType.USERNAME], |
| 95 | + site_role, |
| 96 | + None, # last login |
| 97 | + None, # external auth provider id |
| 98 | + values[UserCSVImport.ColumnType.DISPLAY_NAME], |
| 99 | + values[UserCSVImport.ColumnType.EMAIL], |
| 100 | + values[UserCSVImport.ColumnType.AUTH], |
| 101 | + None, # domain name |
| 102 | + ) |
| 103 | + if values[UserCSVImport.ColumnType.PASS] is not None: |
| 104 | + user.password = values[UserCSVImport.ColumnType.PASS] |
| 105 | + |
| 106 | + # TODO: implement IDP pools |
| 107 | + return user |
| 108 | + |
| 109 | + # helper method: validates an import file and if enabled, creates user models for each valid line |
| 110 | + # result: (users[], valid_lines[], (line, error)[]) |
| 111 | + @staticmethod |
| 112 | + def process_file_for_import(filepath: str, validate_only=False) -> Tuple[List["UserItem"], List[str], List[Tuple[str, Exception]]]: |
| 113 | + n_failures_accepted = 3 |
| 114 | + users: List[UserItem] = [] |
| 115 | + failed: List[Tuple[str, Exception]] = [] |
| 116 | + valid: List[str] = [] |
| 117 | + if not filepath.find("csv"): |
| 118 | + raise ValueError("Only csv files are accepted") |
| 119 | + |
| 120 | + with open(filepath, encoding="utf-8-sig") as csv_file: |
| 121 | + for line in csv_file: |
| 122 | + if line == "": |
| 123 | + continue |
| 124 | + |
| 125 | + |
| 126 | + # print only the username, because next value is password |
| 127 | + # logger.debug("> {}".format(line.split(","))) |
| 128 | + try: |
| 129 | + UserCSVImport._validate_import_line_or_throw(line) |
| 130 | + if not validate_only: |
| 131 | + user: UserItem = UserCSVImport.create_user_model_from_line(line) |
| 132 | + users.append(user) |
| 133 | + valid.append(" ".join(line)) |
| 134 | + except Exception as e: |
| 135 | + failed.append((" ".join(line), e)) |
| 136 | + if len(failed) > n_failures_accepted and not validate_only: |
| 137 | + raise ValueError( |
| 138 | + "More than 3 lines have failed validation. Check the errors and fix your file." |
| 139 | + ) |
| 140 | + return users, valid, failed |
| 141 | + |
| 142 | + # valid: username, domain/username, username@domain, domain/username@email |
| 143 | + @staticmethod |
| 144 | + def _validate_username_or_throw(username) -> None: |
| 145 | + if username is None or username == "" or username.strip(" ") == "": |
| 146 | + raise AttributeError(_("user.input.name.err.empty")) |
| 147 | + if username.find(" ") >= 0: |
| 148 | + raise AttributeError(_("tabcmd.report.error.user.no_spaces_in_username")) |
| 149 | + at_symbol = username.find("@") |
| 150 | + |
| 151 | + # f a user name includes an @ character that represents anything other than a domain separator, |
| 152 | + # you need to refer to the symbol using the hexadecimal format: \0x40 |
| 153 | + if at_symbol >= 0: |
| 154 | + username = username[:at_symbol] + "X" + username[at_symbol + 1 :] |
| 155 | + if username.find("@") >= 0: |
| 156 | + raise AttributeError(_("tabcmd.report.error.user_csv.at_char")) |
| 157 | + |
| 158 | + |
| 159 | + |
| 160 | + # If Tableau Server is configured to use Active Directory authentication, there must be a Password column, |
<
F438
code> | 161 | + # but the column itself should be empty. If the server is using local authentication, you must provide passwords for new users. |
| 162 | + # TODO: check any character/encoding limits for passwords |
| 163 | + @staticmethod |
| 164 | + def _validate_password_or_throw(password) -> None: |
| 165 | + isActiveDirectory = False # TODO: how to get this info? |
| 166 | + isLocalAuth = False # TODO: how to get this info? |
| 167 | + |
| 168 | + if isActiveDirectory and password is not None: |
| 169 | + raise AttributeError("Password must be empty for Active Directory accounts.") |
| 170 | + |
| 171 | + if isLocalAuth and password is None: |
| 172 | + raise AttributeError("Password must be provided for local authentication users.") |
| 173 | + |
| 174 | + |
| 175 | + # Note: The identifier is required if adding a user to an identity pool that uses Active Directory (or LDAP) identity store. |
| 176 | + # The identifier is optional if adding a user to an identity pool that uses the local identity store. |
| 177 | + @staticmethod |
| 178 | + def _validate_idp_identifier_or_throw(identifier) -> None: |
| 179 | + isActiveDirectory = False # TODO: how to get this info? |
| 180 | + if isActiveDirectory and identifier is not None: |
| 181 | + raise AttributeError("Identifier is required for Active Directory identity stores.") |
| 182 | + |
| 183 | + |
| 184 | + # Some fields in the import file are restricted to specific values |
| 185 | + # Iterate through each field and validate the given value against hardcoded constraints |
| 186 | + # Values in here are all CASE INSENSITIVE. So the values entered here are all lowercase |
| 187 | + # and all comparisons must force the input text to lowercase as well. |
| 188 | + @staticmethod |
| 189 | + def _validate_import_line_or_throw(line: str) -> None: |
| 190 | + _valid_attributes: List[List[str]] = [ |
| 191 | + [], |
| 192 | + [], |
| 193 | + [], |
| 194 | + ["creator", "explorer", "viewer", "unlicensed"], # license |
| 195 | + ["system", "site", "none", "no"], # admin |
| 196 | + ["yes", "true", "1", "no", "false", "0"], # publisher |
| 197 | + [], |
| 198 | + [UserItem.Auth.SAML.lower(), |
| 199 | + UserItem.Auth.OpenID.lower(), UserItem.Auth.TableauIDWithMFA.lower(), UserItem.Auth.ServerDefault.lower()], # auth |
| 200 | + [], |
| 201 | + [], |
| 202 | + ] |
| 203 | + |
| 204 | + if line is None or line is False or len(line) == 0 or line == "": |
| 205 | + raise AttributeError("Empty line") |
| 206 | + values: List[str] = list(map(str.strip, line.split(","))) |
| 207 | + |
| 208 | + |
| 209 | + if len(values) > UserCSVImport.ColumnType.MAX: |
| 210 | + raise AttributeError("Too many attributes in line") |
| 211 | + # sometimes usernames are case sensitive |
| 212 | + username = values[UserCSVImport.ColumnType.USERNAME.value] |
| 213 | + # logger.debug("> details - {}".format(username)) |
| 214 | + UserItem.validate_username_or_throw(username) |
| 215 | + if len(values) > UserCSVImport.ColumnType.PASS: |
| 216 | + password = values[UserCSVImport.ColumnType.PASS.value] |
| 217 | + UserCSVImport.validate_password_or_throw(password) |
| 218 | + if len(values) > UserCSVImport.ColumnType.IDP_ID: |
| 219 | + UserCSVImport._validate_idp_identifier_or_throw |
| 220 | + for i in range(2, len(values)): |
| 221 | + # logger.debug("column {}: {}".format(UserCSVImport.ColumnType(i).name, values[i])) |
| 222 | + UserCSVImport._validate_attribute_value(values[i], _valid_attributes[i], UserCSVImport.ColumnType(i).name) |
| 223 | + |
| 224 | + # Given a restricted set of possible values, confirm the item is in that set |
| 225 | + @staticmethod |
| 226 | + def _validate_attribute_value(item: str, possible_values: List[str], column_type) -> None: |
| 227 | + if item is None or item == "": |
| 228 | + # value can be empty for any column except user, which is checked elsewhere |
| 229 | + return |
| 230 | + item = item.strip() |
| 231 | + if item.lower() in possible_values or possible_values == []: |
| 232 | + return |
| 233 | + raise AttributeError( |
| 234 | + "Invalid value {} for {}. Valid values: {}".format(item, column_type, possible_values) |
| 235 | + ) |
| 236 | + |
| 237 | + # https://help.tableau.com/current/server/en-us/csvguidelines.htm#settings_and_site_roles |
| 238 | + # This logic is hardcoded to match the existing rules for import csv files |
| 239 | + @staticmethod |
| 240 | + def _evaluate_site_role(license_level, admin_level, publisher): |
| 241 | + if not license_level or not admin_level or not publisher: |
| 242 | + return "Unlicensed" |
| 243 | + # ignore case everywhere |
| 244 | + license_level = license_level.lower() |
| 245 | + admin_level = admin_level.lower() |
| 246 | + publisher = publisher.lower() |
| 247 | + # don't need to check publisher for system/site admin |
| 248 | + if admin_level == "system": |
| 249 | + site_role = "SiteAdministrator" |
| 250 | + elif admin_level == "site": |
| 251 | + if license_level == "creator": |
| 252 | + site_role = "SiteAdministratorCreator" |
| 253 | + elif license_level == "explorer": |
| 254 | + site_role = "SiteAdministratorExplorer" |
| 255 | + else: |
| 256 | + site_role = "SiteAdministratorExplorer" |
| 257 | + else: # if it wasn't 'system' or 'site' then we can treat it as 'none' |
| 258 | + if publisher == "yes": |
| 259 | + if license_level == "creator": |
| 260 | + site_role = "Creator" |
| 261 | + elif license_level == "explorer": |
| 262 | + site_role = "ExplorerCanPublish" |
| 263 | + else: |
| 264 | + site_role = "Unlicensed" # is this the expected outcome? |
| 265 | + else: # publisher == 'no': |
| 266 | + if license_level == "explorer" or license_level == "creator": |
| 267 | + site_role = "Explorer" |
| 268 | + elif license_level == "viewer": |
| 269 | + site_role = "Viewer" |
| 270 | + else: # if license_level == 'unlicensed' |
| 271 | + site_role = "Unlicensed" |
| 272 | + if site_role is None: |
| 273 | + site_role = "Unlicensed" |
| 274 | + return site_role |
0 commit comments