import requests, json, platform, time, os import pandas as pd # Determine the CSV dump path based on the operating system csv_dump_path = os.path.join('C:', 'Users', 'user', 'Desktop', 'PowerBI docs', 'CSV_FILES') if platform.system() == 'Windows' else '/tmp/' # Create the CSV dump directory if it does not exist os.makedirs(csv_dump_path, exist_ok=True) # Function to transform the data columns def fleetio_column_transform(data): new_data = [] split_columns = ['work_order_line_items', 'labels', 'custom_fields'] for obj in data: for split_column in split_columns: if split_column in obj: split_obj = obj[split_column] if isinstance(split_obj, dict): split_obj = [split_obj] for idx, item in enumerate(split_obj): for field in item: obj[f'{split_column}_{idx}_{field}'] = item[field] obj.pop(split_column) new_data.append(obj) return new_data # Function to transform the data rows def fleetio_row_transform(data): new_data = [] split_row_columns = ['work_order_line_items'] split_columns = ['labels', 'custom_fields'] for obj in data: for split_column_r in split_row_columns: if split_column_r in obj: split_objs = obj[split_column_r] for item in split_objs: local_obj = obj.copy() for field in item: local_obj[f'{split_column_r}_{field}'] = item[field] for split_column_c in split_columns: if split_column_c in local_obj: split_obj = obj[split_column_c] if isinstance(split_obj, dict): split_obj = [split_obj] for idx, item in enumerate(split_obj): for field in item: local_obj[f'{split_column_c}_{idx}_{field}'] = item[field] local_obj.pop(split_column_c, None) local_obj.pop(split_column_r) new_data.append(local_obj) return new_data # Define the resources resources = [ { 'id': 1, 'resource_endpoint': 'https://secure.fleetio.com/api/v1/vehicle_status_changes', 'params': { 'include_archived': '1', 'per_page': '50' # Number of records per page; adjust as needed }, 'excel_name': 'Fleetio_Vehicle_Status_Changes.csv' } ] # FleetioBot class to interact with the API class FleetioBot: def __init__(self, token, account_token, row_split=None): self.resources = resources self.token = token # API token (keep this secure) self.account_token = account_token # Account token self.row_split = row_split # Method to collect data from the API def collect_data(self, resource_id=1, cursor=None): self.res_obj = None for res_obj in self.resources: if res_obj['id'] == resource_id: if self.row_split is not None and 'additional_funcs' in res_obj: if self.row_split: res_obj['additional_func'] = [res_obj['additional_funcs']['row_transform']] else: res_obj['additional_func'] = [res_obj['additional_funcs']['column_transform']] self.res_obj = res_obj break if not self.res_obj: print("Cannot find a proper map for the resource ID. Please check the resources list.") exit() # Set the request parameters and headers try: self.url = self.res_obj['resource_endpoint'] self.params = self.res_obj['params'] except KeyError as e: print(f'Missing key in resources configuration: {e}') exit() # Update the `start_cursor` parameter for cursor-based pagination if cursor: self.params['start_cursor'] = cursor else: self.params.pop('start_cursor', None) # Remove the parameter if not present # Define request headers self.headers = { 'Accept': '/', 'Authorization': f'Token {self.token}', 'Account-Token': f'{self.account_token}' } # Make the API request data = requests.get(url=self.url, headers=self.headers, params=self.params) return data if __name__ == '__main__': # Instantiate FleetioBot with your tokens and the row_split flag fleetio_con = FleetioBot('sss', 'sss', row_split=True) # Loop through resources to collect data for resource in resources: resource_id = resource['id'] data_list = [] # List to hold the complete data set cursor = None # Initial cursor (start_cursor) limit = None # Set a limit if desired while True: # Collect data using cursor-based pagination data = fleetio_con.collect_data(resource_id=resource_id, cursor=cursor) if data.status_code != 200: if data.status_code == 429: retry_after = int(data.headers.get('Retry-After', 0)) sleep_time = retry_after + 5 # Add an extra 5 seconds to the sleep time print(f'Rate limit reached. Sleeping for {sleep_time} seconds...') time.sleep(sleep_time) continue else: print(f'Error {data.status_code}: {data.text}') break # Parse response content as JSON response_content = data.json() # If response content is a list, treat it as the records directly if isinstance(response_content, list): records = response_content elif isinstance(response_content, dict) and 'records' in response_content: records = response_content['records'] else: print('Invalid response format. Unable to extract records.') break # Apply any additional functions from the resources if 'additional_func' in fleetio_con.res_obj: for additional_func in fleetio_con.res_obj['additional_func']: records = additional_func(records) # Extend data list with the records data_list.extend(records) # Check for `next_cursor` in the response next_cursor = response_content.get('next_cursor') if not next_cursor: print(f'End of data reached. Total records: {len(data_list)}') break # Stop the loop if there is no next cursor # Update the cursor for the next iteration cursor = next_cursor # If limit is specified and reached, stop fetching data if limit and len(data_list) >= limit: data_list = data_list[:limit] break # Convert the collected data to a pandas DataFrame and save as CSV df = pd.DataFrame(data_list) df.to_csv(os.path.join(csv_dump_path, fleetio_con.res_obj['excel_name']), index=False)