ROS的Bag切片以及Bag转csv

bag切片

使用plotjuggler软件打开要切片的bag,选择需要切片的话题

图片[1]-ROS的Bag切片以及Bag转csv-九桑

拖动进度条到要录制的时间,点击播放进行,终端手动录制需要切片话题的bag

图片[2]-ROS的Bag切片以及Bag转csv-九桑

终端手动录制需要切片话题的bag

rosbag record -O your_name.bag /sensor_data_motor/motor_pos /sensor_data_motor/motor_vel /sensor_data_motor/motor_cur

rosbag record -O your_name.bag 话题1 话题2 话题3 …… 话题n

bag转csv

说明:转换bag中的所有话题到csv,会创建一个同名文件夹,里面放着每个话题的csv

1.使用python代码来实现

#!/usr/bin/python3

import rosbag
import sys
import csv
import time
import os
import shutil

def main():
    # Verify correct input arguments: 1 or 2
    if len(sys.argv) > 2:
        print("invalid number of arguments: " + str(len(sys.argv)))
        print("should be 2: 'bag2csv.py' and 'bagName'")
        print("or just 1 : 'bag2csv.py'")
        sys.exit(1)
    elif len(sys.argv) == 2:
        listOfBagFiles = [sys.argv[1]]
        numberOfFiles = "1"
        print("reading only 1 bagfile: " + str(listOfBagFiles[0]))
    else:
        listOfBagFiles = [f for f in os.listdir(".") if f.endswith(".bag")]
        numberOfFiles = str(len(listOfBagFiles))
        print("reading all " + numberOfFiles + " bagfiles in current directory: \n")
        for f in listOfBagFiles:
            print(f)
        print("\n press ctrl+c in the next 10 seconds to cancel \n")
        time.sleep(10)

    count = 0
    for bagFile in listOfBagFiles:
        count += 1
        print("reading file " + str(count) + " of " + numberOfFiles + ": " + bagFile)
        
        try:
            # Access bag
            bag = rosbag.Bag(bagFile)
            bagContents = bag.read_messages()
            bagName = bag.filename

            # Create a new directory
            folder = bagName.rstrip(".bag")
            try:
                os.makedirs(folder, exist_ok=True)
            except OSError:
                pass
            shutil.copyfile(bagName, os.path.join(folder, bagName))

            # Get list of topics from the bag
            listOfTopics = []
            for topic, msg, t in bagContents:
                if topic not in listOfTopics:
                    listOfTopics.append(topic)

            for topicName in listOfTopics:
                # Create a new CSV file for each topic
                safe_topic_name = topicName.replace('/', '_slash_')
                filename = os.path.join(folder, safe_topic_name + '.csv')
                
                with open(filename, 'w+', newline='') as csvfile:
                    filewriter = csv.writer(csvfile, delimiter=',')
                    firstIteration = True  # allows header row
                    
                    for subtopic, msg, t in bag.read_messages(topicName):
                        # Parse data from this instant
                        msgString = str(msg)
                        msgList = msgString.split('\n')
                        instantaneousListOfData = []
                        
                        for nameValuePair in msgList:
                            splitPair = nameValuePair.split(':')
                            splitPair = [item.strip() for item in splitPair]
                            if len(splitPair) > 1:
                                instantaneousListOfData.append(splitPair)

                        # Write the first row from the first element of each pair
                        if firstIteration:  # header
                            headers = ["rosbagTimestamp"]  # first column header
                            for pair in instantaneousListOfData:
                                headers.append(pair[0])
                            filewriter.writerow(headers)
                            firstIteration = False
                        
                        # Write the value from each pair to the file
                        values = [str(t)]  # first column will have rosbag timestamp
                        for pair in instantaneousListOfData:
                            if len(pair) > 1:
                                values.append(pair[1])
                        filewriter.writerow(values)
                        
        except Exception as e:
            print("Error processing {}: {}".format(bagFile, str(e)))
        finally:
            bag.close()
            
    print("Done reading all " + numberOfFiles + " bag files.")

if __name__ == '__main__':
    main()

2. 运行说明:

python3 your_name.py your_name.bag

© 版权声明
THE END
喜欢就支持一下吧
点赞15 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容